mito2/worker/
handle_open.rs1use std::sync::Arc;
18use std::time::Instant;
19
20use common_telemetry::info;
21use object_store::util::join_path;
22use snafu::{OptionExt, ResultExt};
23use store_api::logstore::LogStore;
24use store_api::region_request::RegionOpenRequest;
25use store_api::storage::RegionId;
26use table::requests::STORAGE_KEY;
27
28use crate::error::{
29 ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
30};
31use crate::region::opener::{RegionOpener, sanitize_open_request_options};
32use crate::request::OptionOutputTx;
33use crate::sst::location::region_dir_from_table_dir;
34use crate::wal::entry_distributor::WalEntryReceiver;
35use crate::worker::handle_drop::remove_region_dir_once;
36use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 async fn check_and_cleanup_region(
40 &self,
41 region_id: RegionId,
42 request: &RegionOpenRequest,
43 ) -> Result<()> {
44 let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
45 self.object_store_manager
46 .find(storage_name)
47 .with_context(|| ObjectStoreNotFoundSnafu {
48 object_store: storage_name.clone(),
49 })?
50 } else {
51 self.object_store_manager.default_object_store()
52 };
53 let region_dir =
55 region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
56 if !self.dropping_regions.is_region_exists(region_id)
57 && object_store
58 .exists(&join_path(®ion_dir, DROPPING_MARKER_FILE))
59 .await
60 .context(OpenDalSnafu)?
61 {
62 let result = remove_region_dir_once(®ion_dir, object_store, true).await;
63 info!(
64 "Region {} is dropped, worker: {}, result: {:?}",
65 region_id, self.id, result
66 );
67 return RegionNotFoundSnafu { region_id }.fail();
68 }
69
70 Ok(())
71 }
72
73 pub(crate) async fn handle_open_request(
74 &mut self,
75 region_id: RegionId,
76 mut request: RegionOpenRequest,
77 wal_entry_receiver: Option<WalEntryReceiver>,
78 sender: OptionOutputTx,
79 ) {
80 if self.regions.is_region_exists(region_id) {
81 sender.send(Ok(0));
82 return;
83 }
84 let Some(sender) = self
85 .opening_regions
86 .wait_for_opening_region(region_id, sender)
87 else {
88 return;
89 };
90 if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
91 sender.send(Err(err));
92 return;
93 }
94 info!("Try to open region {}, worker: {}", region_id, self.id);
95 sanitize_open_request_options(&mut request.options);
96
97 let opener = match RegionOpener::new(
99 region_id,
100 &request.table_dir,
101 request.path_type,
102 self.memtable_builder_provider.clone(),
103 self.object_store_manager.clone(),
104 self.purge_scheduler.clone(),
105 self.puffin_manager_factory.clone(),
106 self.intermediate_manager.clone(),
107 self.time_provider.clone(),
108 self.file_ref_manager.clone(),
109 self.partition_expr_fetcher.clone(),
110 )
111 .skip_wal_replay(request.skip_wal_replay)
112 .cache(Some(self.cache_manager.clone()))
113 .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
114 .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
115 .parse_options(request.options)
116 {
117 Ok(opener) => opener,
118 Err(err) => {
119 sender.send(Err(err));
120 return;
121 }
122 };
123
124 let now = Instant::now();
125 let regions = self.regions.clone();
126 let wal = self.wal.clone();
127 let config = self.config.clone();
128 let opening_regions = self.opening_regions.clone();
129 let region_count = self.region_count.clone();
130 let worker_id = self.id;
131 opening_regions.insert_sender(region_id, sender);
132 common_runtime::spawn_global(async move {
133 match opener.open(&config, &wal).await {
134 Ok(region) => {
135 info!(
136 "Region {} is opened, worker: {}, elapsed: {:?}",
137 region_id,
138 worker_id,
139 now.elapsed()
140 );
141 region_count.inc();
142
143 regions.insert_region(region);
145
146 let senders = opening_regions.remove_sender(region_id);
147 for sender in senders {
148 sender.send(Ok(0));
149 }
150 }
151 Err(err) => {
152 let senders = opening_regions.remove_sender(region_id);
153 let err = Arc::new(err);
154 for sender in senders {
155 sender.send(Err(err.clone()).context(OpenRegionSnafu));
156 }
157 }
158 }
159 });
160 }
161}