metric_engine/engine/
open.rs1use std::collections::HashSet;
18
19use common_telemetry::info;
20use mito2::engine::MITO_ENGINE_NAME;
21use object_store::util::join_dir;
22use snafu::{OptionExt, ResultExt};
23use store_api::codec::PrimaryKeyEncoding;
24use store_api::metric_engine_consts::{DATA_REGION_SUBDIR, METADATA_REGION_SUBDIR};
25use store_api::region_engine::{BatchResponses, RegionEngine};
26use store_api::region_request::{AffectedRows, RegionOpenRequest, RegionRequest};
27use store_api::storage::RegionId;
28
29use crate::engine::create::region_options_for_metadata_region;
30use crate::engine::options::{set_data_region_options, PhysicalRegionOptions};
31use crate::engine::MetricEngineInner;
32use crate::error::{
33 BatchOpenMitoRegionSnafu, OpenMitoRegionSnafu, PhysicalRegionNotFoundSnafu, Result,
34};
35use crate::metrics::{LOGICAL_REGION_COUNT, PHYSICAL_REGION_COUNT};
36use crate::utils;
37
38impl MetricEngineInner {
39 pub async fn handle_batch_open_requests(
40 &self,
41 parallelism: usize,
42 requests: Vec<(RegionId, RegionOpenRequest)>,
43 ) -> Result<BatchResponses> {
44 let mut all_requests = Vec::with_capacity(requests.len() * 2);
46 let mut physical_region_ids = Vec::with_capacity(requests.len());
47 let mut data_region_ids = HashSet::with_capacity(requests.len());
48
49 for (region_id, request) in requests {
50 if !request.is_physical_table() {
51 continue;
52 }
53 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
54 let metadata_region_id = utils::to_metadata_region_id(region_id);
55 let data_region_id = utils::to_data_region_id(region_id);
56 let (open_metadata_region_request, open_data_region_request) =
57 self.transform_open_physical_region_request(request);
58 all_requests.push((metadata_region_id, open_metadata_region_request));
59 all_requests.push((data_region_id, open_data_region_request));
60 physical_region_ids.push((region_id, physical_region_options));
61 data_region_ids.insert(data_region_id);
62 }
63
64 let results = self
65 .mito
66 .handle_batch_open_requests(parallelism, all_requests)
67 .await
68 .context(BatchOpenMitoRegionSnafu {})?
69 .into_iter()
70 .filter(|(region_id, _)| data_region_ids.contains(region_id))
71 .collect::<Vec<_>>();
72
73 for (physical_region_id, physical_region_options) in physical_region_ids {
74 let primary_key_encoding = self
75 .mito
76 .get_primary_key_encoding(physical_region_id)
77 .context(PhysicalRegionNotFoundSnafu {
78 region_id: physical_region_id,
79 })?;
80 self.recover_states(
81 physical_region_id,
82 primary_key_encoding,
83 physical_region_options,
84 )
85 .await?;
86 }
87
88 Ok(results)
89 }
90
91 pub async fn open_region(
101 &self,
102 region_id: RegionId,
103 request: RegionOpenRequest,
104 ) -> Result<AffectedRows> {
105 if request.is_physical_table() {
106 let physical_region_options = PhysicalRegionOptions::try_from(&request.options)?;
108 self.open_physical_region(region_id, request).await?;
109 let data_region_id = utils::to_data_region_id(region_id);
110 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
111 PhysicalRegionNotFoundSnafu {
112 region_id: data_region_id,
113 },
114 )?;
115 self.recover_states(region_id, primary_key_encoding, physical_region_options)
116 .await?;
117
118 Ok(0)
119 } else {
120 Ok(0)
125 }
126 }
127
128 fn transform_open_physical_region_request(
134 &self,
135 request: RegionOpenRequest,
136 ) -> (RegionOpenRequest, RegionOpenRequest) {
137 let metadata_region_dir = join_dir(&request.region_dir, METADATA_REGION_SUBDIR);
138 let data_region_dir = join_dir(&request.region_dir, DATA_REGION_SUBDIR);
139
140 let metadata_region_options = region_options_for_metadata_region(request.options.clone());
141 let open_metadata_region_request = RegionOpenRequest {
142 region_dir: metadata_region_dir,
143 options: metadata_region_options,
144 engine: MITO_ENGINE_NAME.to_string(),
145 skip_wal_replay: request.skip_wal_replay,
146 };
147
148 let mut data_region_options = request.options;
149 set_data_region_options(
150 &mut data_region_options,
151 self.config.experimental_sparse_primary_key_encoding,
152 );
153 let open_data_region_request = RegionOpenRequest {
154 region_dir: data_region_dir,
155 options: data_region_options,
156 engine: MITO_ENGINE_NAME.to_string(),
157 skip_wal_replay: request.skip_wal_replay,
158 };
159
160 (open_metadata_region_request, open_data_region_request)
161 }
162
163 async fn open_physical_region(
165 &self,
166 region_id: RegionId,
167 request: RegionOpenRequest,
168 ) -> Result<AffectedRows> {
169 let metadata_region_id = utils::to_metadata_region_id(region_id);
170 let data_region_id = utils::to_data_region_id(region_id);
171 let (open_metadata_region_request, open_data_region_request) =
172 self.transform_open_physical_region_request(request);
173
174 self.mito
175 .handle_request(
176 metadata_region_id,
177 RegionRequest::Open(open_metadata_region_request),
178 )
179 .await
180 .with_context(|_| OpenMitoRegionSnafu {
181 region_type: "metadata",
182 })?;
183 self.mito
184 .handle_request(
185 data_region_id,
186 RegionRequest::Open(open_data_region_request),
187 )
188 .await
189 .with_context(|_| OpenMitoRegionSnafu {
190 region_type: "data",
191 })?;
192
193 info!("Opened physical metric region {region_id}");
194 PHYSICAL_REGION_COUNT.inc();
195
196 Ok(0)
197 }
198
199 pub(crate) async fn recover_states(
208 &self,
209 physical_region_id: RegionId,
210 primary_key_encoding: PrimaryKeyEncoding,
211 physical_region_options: PhysicalRegionOptions,
212 ) -> Result<Vec<RegionId>> {
213 let logical_regions = self
215 .metadata_region
216 .logical_regions(physical_region_id)
217 .await?;
218 let physical_columns = self
219 .data_region
220 .physical_columns(physical_region_id)
221 .await?;
222
223 {
224 let mut state = self.state.write().unwrap();
225 let physical_columns = physical_columns
227 .into_iter()
228 .map(|col| (col.column_schema.name, col.column_id))
229 .collect();
230 state.add_physical_region(
231 physical_region_id,
232 physical_columns,
233 primary_key_encoding,
234 physical_region_options,
235 );
236 for logical_region_id in &logical_regions {
238 state.add_logical_region(physical_region_id, *logical_region_id);
239 }
240 }
241
242 let mut opened_logical_region_ids = Vec::new();
243 for logical_region_id in logical_regions {
246 if self
247 .metadata_region
248 .open_logical_region(logical_region_id)
249 .await
250 {
251 opened_logical_region_ids.push(logical_region_id);
252 }
253 }
254
255 LOGICAL_REGION_COUNT.add(opened_logical_region_ids.len() as i64);
256
257 Ok(opened_logical_region_ids)
258 }
259}
260
261