mito2/worker/
handle_open.rs1use std::sync::Arc;
18
19use common_telemetry::info;
20use object_store::util::join_path;
21use snafu::{OptionExt, ResultExt};
22use store_api::logstore::LogStore;
23use store_api::region_request::RegionOpenRequest;
24use store_api::storage::RegionId;
25use table::requests::STORAGE_KEY;
26
27use crate::error::{
28 ObjectStoreNotFoundSnafu, OpenDalSnafu, OpenRegionSnafu, RegionNotFoundSnafu, Result,
29};
30use crate::region::opener::RegionOpener;
31use crate::request::OptionOutputTx;
32use crate::wal::entry_distributor::WalEntryReceiver;
33use crate::worker::handle_drop::remove_region_dir_once;
34use crate::worker::{RegionWorkerLoop, DROPPING_MARKER_FILE};
35
36impl<S: LogStore> RegionWorkerLoop<S> {
37 async fn check_and_cleanup_region(
38 &self,
39 region_id: RegionId,
40 request: &RegionOpenRequest,
41 ) -> Result<()> {
42 let object_store = if let Some(storage_name) = request.options.get(STORAGE_KEY) {
43 self.object_store_manager
44 .find(storage_name)
45 .context(ObjectStoreNotFoundSnafu {
46 object_store: storage_name.to_string(),
47 })?
48 } else {
49 self.object_store_manager.default_object_store()
50 };
51 if !self.dropping_regions.is_region_exists(region_id)
53 && object_store
54 .exists(&join_path(&request.region_dir, DROPPING_MARKER_FILE))
55 .await
56 .context(OpenDalSnafu)?
57 {
58 let result = remove_region_dir_once(&request.region_dir, object_store, true).await;
59 info!(
60 "Region {} is dropped, worker: {}, result: {:?}",
61 region_id, self.id, result
62 );
63 return RegionNotFoundSnafu { region_id }.fail();
64 }
65
66 Ok(())
67 }
68
69 pub(crate) async fn handle_open_request(
70 &mut self,
71 region_id: RegionId,
72 request: RegionOpenRequest,
73 wal_entry_receiver: Option<WalEntryReceiver>,
74 sender: OptionOutputTx,
75 ) {
76 if self.regions.is_region_exists(region_id) {
77 sender.send(Ok(0));
78 return;
79 }
80 let Some(sender) = self
81 .opening_regions
82 .wait_for_opening_region(region_id, sender)
83 else {
84 return;
85 };
86 if let Err(err) = self.check_and_cleanup_region(region_id, &request).await {
87 sender.send(Err(err));
88 return;
89 }
90 info!("Try to open region {}, worker: {}", region_id, self.id);
91
92 let opener = match RegionOpener::new(
94 region_id,
95 &request.region_dir,
96 self.memtable_builder_provider.clone(),
97 self.object_store_manager.clone(),
98 self.purge_scheduler.clone(),
99 self.puffin_manager_factory.clone(),
100 self.intermediate_manager.clone(),
101 self.time_provider.clone(),
102 )
103 .skip_wal_replay(request.skip_wal_replay)
104 .cache(Some(self.cache_manager.clone()))
105 .wal_entry_reader(wal_entry_receiver.map(|receiver| Box::new(receiver) as _))
106 .parse_options(request.options)
107 {
108 Ok(opener) => opener,
109 Err(err) => {
110 sender.send(Err(err));
111 return;
112 }
113 };
114
115 let regions = self.regions.clone();
116 let wal = self.wal.clone();
117 let config = self.config.clone();
118 let opening_regions = self.opening_regions.clone();
119 let region_count = self.region_count.clone();
120 let worker_id = self.id;
121 opening_regions.insert_sender(region_id, sender);
122 common_runtime::spawn_global(async move {
123 match opener.open(&config, &wal).await {
124 Ok(region) => {
125 info!("Region {} is opened, worker: {}", region_id, worker_id);
126 region_count.inc();
127
128 regions.insert_region(Arc::new(region));
130
131 let senders = opening_regions.remove_sender(region_id);
132 for sender in senders {
133 sender.send(Ok(0));
134 }
135 }
136 Err(err) => {
137 let senders = opening_regions.remove_sender(region_id);
138 let err = Arc::new(err);
139 for sender in senders {
140 sender.send(Err(err.clone()).context(OpenRegionSnafu));
141 }
142 }
143 }
144 });
145 }
146}