mito2/worker/
handle_open.rs1use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::util::join_path;
21use snafu::{OptionExt, ResultExt};
22use store_api::logstore::LogStore;
23use store_api::region_request::RegionOpenRequest;
24use store_api::storage::RegionId;
25use table::requests::STORAGE_KEY;
26
27use crate::error::{
28 ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
29};
30use crate::region::opener::RegionOpener;
31use crate::request::OptionOutputTx;
32use crate::sst::location::region_dir_from_table_dir;
33use crate::wal::entry_distributor::WalEntryReceiver;
34use crate::worker::handle_drop::remove_region_dir_once;
35use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
36
37impl<S: LogStore> RegionWorkerLoop<S> {
38 async fn check_and_cleanup_region(
39 &self,
40 region_id: RegionId,
41 request: &RegionOpenRequest,
42 ) -> Result<()> {
43 let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
44 self.object_store_manager
45 .find(storage_name)
46 .context(ObjectStoreNotFoundSnafu {
47 object_store: storage_name.to_string(),
48 })?
49 } else {
50 self.object_store_manager.default_object_store()
51 };
52 let region_dir =
54 region_dir_from_table_dir(&request.table_dir, region_id, request.path_type);
55 if !self.dropping_regions.is_region_exists(region_id)
56 && object_store
57 .exists(&join_path(®ion_dir, DROPPING_MARKER_FILE))
58 .await
59 .context(OpenDalSnafu)?
60 {
61 let result = remove_region_dir_once(®ion_dir, object_store, true).await;
62 info!(
63 "Region {} is dropped, worker: {}, result: {:?}",
64 region_id, self.id, result
65 );
66 return RegionNotFoundSnafu { region_id }.fail();
67 }
68
69 Ok(())
70 }
71
72 pub(crate) async fn handle_open_request(
73 &mut self,
74 region_id: RegionId,
75 request: RegionOpenRequest,
76 wal_entry_receiver: Option<WalEntryReceiver>,
77 sender: OptionOutputTx,
78 ) {
79 if self.regions.is_region_exists(region_id) {
80 sender.send(Ok(0));
81 return;
82 }
83 let Some(sender) = self
84 .opening_regions
85 .wait_for_opening_region(region_id, sender)
86 else {
87 return;
88 };
89 if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
90 sender.send(Err(err));
91 return;
92 }
93 info!("Try to open region {}, worker: {}", region_id, self.id);
94
95 let opener = match RegionOpener::new(
97 region_id,
98 &request.table_dir,
99 request.path_type,
100 self.memtable_builder_provider.clone(),
101 self.object_store_manager.clone(),
102 self.purge_scheduler.clone(),
103 self.puffin_manager_factory.clone(),
104 self.intermediate_manager.clone(),
105 self.time_provider.clone(),
106 )
107 .skip_wal_replay(request.skip_wal_replay)
108 .cache(Some(self.cache_manager.clone()))
109 .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
110 .parse_options(request.options)
111 {
112 Ok(opener) => opener,
113 Err(err) => {
114 sender.send(Err(err));
115 return;
116 }
117 };
118
119 let regions = self.regions.clone();
120 let wal = self.wal.clone();
121 let config = self.config.clone();
122 let opening_regions = self.opening_regions.clone();
123 let region_count = self.region_count.clone();
124 let worker_id = self.id;
125 opening_regions.insert_sender(region_id, sender);
126 common_runtime::spawn_global(async move {
127 match opener.open(&config, &wal).await {
128 Ok(region) => {
129 info!("Region {} is opened, worker: {}", region_id, worker_id);
130 region_count.inc();
131
132 regions.insert_region(Arc::new(region));
134
135 let senders = opening_regions.remove_sender(region_id);
136 for sender in senders {
137 sender.send(Ok(0));
138 }
139 }
140 Err(err) => {
141 let senders = opening_regions.remove_sender(region_id);
142 let err = Arc::new(err);
143 for sender in senders {
144 sender.send(Err(err.clone()).context(OpenRegionSnafu));
145 }
146 }
147 }
148 });
149 }
150}