mito2/worker/
handle_open.rs1use std::sync::Arc;
18use std::time::Instant;
19
20use common_telemetry::info;
21use object_store::util::join_path;
22use snafu::{OptionExt, ResultExt};
23use store_api::logstore::LogStore;
24use store_api::region_request::RegionOpenRequest;
25use store_api::storage::RegionId;
26use table::requests::STORAGE_KEY;
27
28use crate::error::{
29 ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
30};
31use crate::region::opener::RegionOpener;
32use crate::request::OptionOutputTx;
33use crate::sst::location::region_dir_from_table_dir;
34use crate::wal::entry_distributor::WalEntryReceiver;
35use crate::worker::handle_drop::remove_region_dir_once;
36use crate::worker::{DROPPING_MARKER_FILE, RegionWorkerLoop};
37
38impl<S: LogStore> RegionWorkerLoop<S> {
39 async fn check_and_cleanup_region(
40 &self,
41 region_id: RegionId,
42 request: &RegionOpenRequest,
43 ) -> Result<()> {
44 let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
45 self.object_store_manager
46 .find(storage_name)
47 .with_context(|| ObjectStoreNotFoundSnafu {
48 object_store: storage_name.clone(),
49 })?
50 } else {
51 self.object_store_manager.default_object_store()
52 };
53 let region_dir =
55 region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
56 if !self.dropping_regions.is_region_exists(region_id)
57 && object_store
58 .exists(&join_path(®ion_dir, DROPPING_MARKER_FILE))
59 .await
60 .context(OpenDalSnafu)?
61 {
62 let result = remove_region_dir_once(®ion_dir, object_store, true).await;
63 info!(
64 "Region {} is dropped, worker: {}, result: {:?}",
65 region_id, self.id, result
66 );
67 return RegionNotFoundSnafu { region_id }.fail();
68 }
69
70 Ok(())
71 }
72
73 pub(crate) async fn handle_open_request(
74 &mut self,
75 region_id: RegionId,
76 request: RegionOpenRequest,
77 wal_entry_receiver: Option<WalEntryReceiver>,
78 sender: OptionOutputTx,
79 ) {
80 if self.regions.is_region_exists(region_id) {
81 sender.send(Ok(0));
82 return;
83 }
84 let Some(sender) = self
85 .opening_regions
86 .wait_for_opening_region(region_id, sender)
87 else {
88 return;
89 };
90 if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
91 sender.send(Err(err));
92 return;
93 }
94 info!("Try to open region {}, worker: {}", region_id, self.id);
95
96 let opener = match RegionOpener::new(
98 region_id,
99 &request.table_dir,
100 request.path_type,
101 self.memtable_builder_provider.clone(),
102 self.object_store_manager.clone(),
103 self.purge_scheduler.clone(),
104 self.puffin_manager_factory.clone(),
105 self.intermediate_manager.clone(),
106 self.time_provider.clone(),
107 self.file_ref_manager.clone(),
108 self.partition_expr_fetcher.clone(),
109 )
110 .skip_wal_replay(request.skip_wal_replay)
111 .cache(Some(self.cache_manager.clone()))
112 .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
113 .replay_checkpoint(request.checkpoint.map(|checkpoint| checkpoint.entry_id))
114 .parse_options(request.options)
115 {
116 Ok(opener) => opener,
117 Err(err) => {
118 sender.send(Err(err));
119 return;
120 }
121 };
122
123 let now = Instant::now();
124 let regions = self.regions.clone();
125 let wal = self.wal.clone();
126 let config = self.config.clone();
127 let opening_regions = self.opening_regions.clone();
128 let region_count = self.region_count.clone();
129 let worker_id = self.id;
130 opening_regions.insert_sender(region_id, sender);
131 common_runtime::spawn_global(async move {
132 match opener.open(&config, &wal).await {
133 Ok(region) => {
134 info!(
135 "Region {} is opened, worker: {}, elapsed: {:?}",
136 region_id,
137 worker_id,
138 now.elapsed()
139 );
140 region_count.inc();
141
142 regions.insert_region(region);
144
145 let senders = opening_regions.remove_sender(region_id);
146 for sender in senders {
147 sender.send(Ok(0));
148 }
149 }
150 Err(err) => {
151 let senders = opening_regions.remove_sender(region_id);
152 let err = Arc::new(err);
153 for sender in senders {
154 sender.send(Err(err.clone()).context(OpenRegionSnafu));
155 }
156 }
157 }
158 });
159 }
160}