mito2/worker/
handle_close.rs1use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::logstore::provider::Provider;
20use store_api::region_request::RegionFlushRequest;
21use store_api::storage::RegionId;
22
23use crate::flush::FlushReason;
24use crate::request::OptionOutputTx;
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28 pub(crate) async fn handle_close_request(
29 &mut self,
30 region_id: RegionId,
31 sender: OptionOutputTx,
32 ) {
33 let Some(region) = self.regions.get_region(region_id) else {
34 sender.send(Ok(0));
35 return;
36 };
37
38 info!("Try to close region {}, worker: {}", region_id, self.id);
39
40 if region.provider == Provider::Noop
43 && !region
44 .version_control
45 .current()
46 .version
47 .memtables
48 .is_empty()
49 {
50 info!("Region {} has pending data, waiting for flush", region_id);
51 self.handle_flush_request(
52 region_id,
53 RegionFlushRequest {
54 row_group_size: None,
55 },
56 Some(FlushReason::Closing),
57 sender,
58 );
59 return;
60 }
61
62 self.remove_region(region_id).await;
64 info!("Region {} closed, worker: {}", region_id, self.id);
65 sender.send(Ok(0))
66 }
67
68 pub(crate) async fn remove_region(&mut self, region_id: RegionId) {
70 let Some(region) = self.regions.remove_region(region_id) else {
71 return;
72 };
73 region.stop().await;
74 self.flush_scheduler.on_region_closed(region_id);
76 self.compaction_scheduler.on_region_closed(region_id);
78 self.index_build_scheduler.on_region_closed(region_id).await;
80 self.region_count.dec();
81 }
82}