mito2/worker/
handle_close.rs1use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::logstore::provider::Provider;
20use store_api::region_request::RegionFlushRequest;
21use store_api::storage::RegionId;
22
23use crate::flush::FlushReason;
24use crate::request::OptionOutputTx;
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28 pub(crate) async fn handle_close_request(
29 &mut self,
30 region_id: RegionId,
31 sender: OptionOutputTx,
32 ) {
33 let Some(region) = self.regions.get_region(region_id) else {
34 sender.send(Ok(0));
35 return;
36 };
37
38 info!("Try to close region {}, worker: {}", region_id, self.id);
39
40 if region.provider == Provider::Noop
43 && !region
44 .version_control
45 .current()
46 .version
47 .memtables
48 .is_empty()
49 {
50 info!("Region {} has pending data, waiting for flush", region_id);
51 self.handle_flush_request(
52 region_id,
53 RegionFlushRequest::default(),
54 Some(FlushReason::Closing),
55 sender,
56 );
57 return;
58 }
59
60 self.remove_region(region_id).await;
62 info!("Region {} closed, worker: {}", region_id, self.id);
63 sender.send(Ok(0))
64 }
65
66 pub(crate) async fn remove_region(&mut self, region_id: RegionId) {
68 let Some(region) = self.regions.remove_region(region_id) else {
69 return;
70 };
71 region.stop().await;
72 self.fail_region_stalled_requests_as_not_found(®ion_id);
73 self.reject_region_edit_queue_as_not_found(region_id);
74 self.flush_scheduler.on_region_closed(region_id);
76 self.compaction_scheduler.on_region_closed(region_id);
78 self.index_build_scheduler.on_region_closed(region_id).await;
80 self.region_count.dec();
81 }
82}