Skip to content

Commit edc9b5d

Browse files
authored
Merge pull request #10718 from zhang2014/chore/read_finished
chore(cluster): close flight receiver in event
2 parents ae0effb + 3f7fa57 commit edc9b5d

File tree

4 files changed

+10
-1
lines changed

4 files changed

+10
-1
lines changed

src/query/service/src/api/rpc/exchange/exchange_source_reader.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,8 @@ impl Processor for ExchangeSourceReader {
6666

6767
if self.output.is_finished() {
6868
if !self.finished {
69-
return Ok(Event::Async);
69+
self.finished = true;
70+
self.flight_receiver.close();
7071
}
7172

7273
return Ok(Event::Finished);

src/query/service/src/api/rpc/flight_client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,8 @@ impl FlightClient {
8181
break;
8282
}
8383
}
84+
85+
tx.close();
8486
}
8587
});
8688

src/query/service/src/api/rpc/flight_service.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,8 @@ impl FlightService for DatabendQueryFlightService {
146146
.create_session(SessionType::FlightRPC)
147147
.await?;
148148
let ctx = session.create_query_context().await?;
149+
// Keep query id
150+
ctx.set_id(init_query_fragments_plan.executor_packet.query_id.clone());
149151

150152
let spawner = ctx.clone();
151153
match_join_handle(spawner.spawn(async move {

src/query/service/src/sessions/query_ctx.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,10 @@ impl QueryContext {
200200
self.shared.set_affect(affect)
201201
}
202202

203+
pub fn set_id(&self, id: String) {
204+
*self.shared.init_query_id.write() = id;
205+
}
206+
203207
pub fn set_executor(&self, weak_ptr: Weak<PipelineExecutor>) {
204208
self.shared.set_executor(weak_ptr)
205209
}

0 commit comments

Comments
 (0)