metric_engine/engine/sync/
region.rs1use std::time::Instant;
16
17use common_error::ext::BoxedError;
18use common_telemetry::info;
19use mito2::manifest::action::RegionEdit;
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::region_engine::{MitoCopyRegionFromRequest, SyncRegionFromResponse};
22use store_api::storage::RegionId;
23
24use crate::engine::MetricEngineInner;
25use crate::error::{
26 MissingFilesSnafu, MitoCopyRegionFromOperationSnafu, MitoEditRegionSnafu,
27 PhysicalRegionNotFoundSnafu, Result,
28};
29use crate::utils;
30
31impl MetricEngineInner {
32 pub(crate) async fn sync_region_from_region(
42 &self,
43 region_id: RegionId,
44 source_region_id: RegionId,
45 parallelism: usize,
46 ) -> Result<SyncRegionFromResponse> {
47 let source_metadata_region_id = utils::to_metadata_region_id(source_region_id);
48 let target_metadata_region_id = utils::to_metadata_region_id(region_id);
49 let target_data_region_id = utils::to_data_region_id(region_id);
50 let source_data_region_id = utils::to_data_region_id(source_region_id);
51 info!(
52 "Syncing region from region {} to region {}, parallelism: {}",
53 source_region_id, region_id, parallelism
54 );
55
56 let res = self
57 .mito
58 .copy_region_from(
59 target_metadata_region_id,
60 MitoCopyRegionFromRequest {
61 source_region_id: source_metadata_region_id,
62 parallelism,
63 },
64 )
65 .await
66 .map_err(BoxedError::new)
67 .context(MitoCopyRegionFromOperationSnafu {
68 source_region_id: source_metadata_region_id,
69 target_region_id: target_metadata_region_id,
70 })?;
71
72 if res.copied_file_ids.is_empty() {
73 info!(
74 "No files were copied from source region {} to target region {}, copied file ids are empty",
75 source_metadata_region_id, target_metadata_region_id
76 );
77 return Ok(SyncRegionFromResponse::Metric {
78 metadata_synced: false,
79 data_synced: false,
80 new_opened_logical_region_ids: vec![],
81 });
82 }
83
84 let target_region = self.mito.find_region(target_metadata_region_id).context(
85 PhysicalRegionNotFoundSnafu {
86 region_id: target_metadata_region_id,
87 },
88 )?;
89 let files_to_remove = target_region.file_metas(&res.copied_file_ids).await;
90 let missing_file_ids = res
91 .copied_file_ids
92 .iter()
93 .zip(&files_to_remove)
94 .filter_map(|(file_id, maybe_meta)| {
95 if maybe_meta.is_none() {
96 Some(*file_id)
97 } else {
98 None
99 }
100 })
101 .collect::<Vec<_>>();
102 ensure!(
105 missing_file_ids.is_empty(),
106 MissingFilesSnafu {
107 region_id: target_metadata_region_id,
108 file_ids: missing_file_ids,
109 }
110 );
111 let files_to_remove = files_to_remove.into_iter().flatten().collect::<Vec<_>>();
112 self.metadata_region
114 .transform_logical_region_metadata(target_data_region_id, source_data_region_id)
115 .await?;
116
117 let edit = RegionEdit {
118 files_to_add: vec![],
119 files_to_remove: files_to_remove.clone(),
120 timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
121 compaction_time_window: None,
122 flushed_entry_id: None,
123 flushed_sequence: None,
124 committed_sequence: None,
125 };
126 self.mito
127 .edit_region(target_metadata_region_id, edit)
128 .await
129 .map_err(BoxedError::new)
130 .context(MitoEditRegionSnafu {
131 region_id: target_metadata_region_id,
132 })?;
133 info!(
134 "Successfully edit metadata region: {} after syncing from source metadata region: {}, files to remove: {:?}",
135 target_metadata_region_id,
136 source_metadata_region_id,
137 files_to_remove
138 .iter()
139 .map(|meta| meta.file_id)
140 .collect::<Vec<_>>(),
141 );
142
143 let now = Instant::now();
144 let physical_region_options = *self
147 .state
148 .read()
149 .unwrap()
150 .physical_region_states()
151 .get(&target_data_region_id)
152 .context(PhysicalRegionNotFoundSnafu {
153 region_id: target_data_region_id,
154 })?
155 .options();
156 let new_opened_logical_region_ids = self
157 .recover_states(target_data_region_id, physical_region_options)
158 .await?;
159 info!(
160 "Sync metadata region from source region {} to target region {}, recover states cost: {:?}, new opened logical region ids: {:?}",
161 source_metadata_region_id,
162 target_metadata_region_id,
163 now.elapsed(),
164 new_opened_logical_region_ids
165 );
166
167 Ok(SyncRegionFromResponse::Metric {
168 metadata_synced: true,
169 data_synced: false,
170 new_opened_logical_region_ids,
171 })
172 }
173}
174
175#[cfg(test)]
176mod tests {
177
178 use common_error::ext::ErrorExt;
179 use common_error::status_code::StatusCode;
180 use common_telemetry::debug;
181 use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
182 use store_api::region_engine::{RegionEngine, SyncRegionFromRequest};
183 use store_api::region_request::{
184 BatchRegionDdlRequest, PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest,
185 RegionRequest,
186 };
187 use store_api::storage::RegionId;
188
189 use crate::metadata_region::MetadataRegion;
190 use crate::test_util::{TestEnv, create_logical_region_request};
191
192 async fn assert_logical_table_columns(
193 metadata_region: &MetadataRegion,
194 physical_region_id: RegionId,
195 logical_region_id: RegionId,
196 expected_columns: &[&str],
197 ) {
198 let mut columns = metadata_region
199 .logical_columns(physical_region_id, logical_region_id)
200 .await
201 .unwrap()
202 .into_iter()
203 .map(|(n, _)| n)
204 .collect::<Vec<_>>();
205 columns.sort_unstable();
206 assert_eq!(columns, expected_columns);
207 }
208
209 #[tokio::test]
210 async fn test_sync_region_from_region() {
211 common_telemetry::init_default_ut_logging();
212 let env = TestEnv::new().await;
213 let metric_engine = env.metric();
214 let source_physical_region_id = RegionId::new(1024, 0);
215 let logical_region_id1 = RegionId::new(1025, 0);
216 let logical_region_id2 = RegionId::new(1026, 0);
217 env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
218 .await;
219 let region_create_request1 =
220 create_logical_region_request(&["job"], source_physical_region_id, "logical1");
221 let region_create_request2 =
222 create_logical_region_request(&["host"], source_physical_region_id, "logical2");
223 metric_engine
224 .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
225 (logical_region_id1, region_create_request1),
226 (logical_region_id2, region_create_request2),
227 ]))
228 .await
229 .unwrap();
230 debug!("Flushing source physical region");
231 metric_engine
232 .handle_request(
233 source_physical_region_id,
234 RegionRequest::Flush(RegionFlushRequest {
235 row_group_size: None,
236 }),
237 )
238 .await
239 .unwrap();
240 let logical_regions = metric_engine
241 .logical_regions(source_physical_region_id)
242 .await
243 .unwrap();
244 assert!(logical_regions.contains(&logical_region_id1));
245 assert!(logical_regions.contains(&logical_region_id2));
246
247 let target_physical_region_id = RegionId::new(1024, 1);
248 let target_logical_region_id1 = RegionId::new(1025, 1);
249 let target_logical_region_id2 = RegionId::new(1026, 1);
250 env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
252 .await;
253 let r = metric_engine
254 .sync_region(
255 target_physical_region_id,
256 SyncRegionFromRequest::FromRegion {
257 source_region_id: source_physical_region_id,
258 parallelism: 1,
259 },
260 )
261 .await
262 .unwrap();
263 let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
264 assert_eq!(new_opened_logical_region_ids.len(), 2);
265 assert!(new_opened_logical_region_ids.contains(&target_logical_region_id1));
266 assert!(new_opened_logical_region_ids.contains(&target_logical_region_id2));
267 debug!("Sync region from again");
268 assert_logical_table_columns(
269 &env.metadata_region(),
270 target_physical_region_id,
271 target_logical_region_id1,
272 &["greptime_timestamp", "greptime_value", "job"],
273 )
274 .await;
275 assert_logical_table_columns(
276 &env.metadata_region(),
277 target_physical_region_id,
278 target_logical_region_id2,
279 &["greptime_timestamp", "greptime_value", "host"],
280 )
281 .await;
282 let logical_regions = env
283 .metadata_region()
284 .logical_regions(target_physical_region_id)
285 .await
286 .unwrap();
287 assert_eq!(logical_regions.len(), 2);
288 assert!(logical_regions.contains(&target_logical_region_id1));
289 assert!(logical_regions.contains(&target_logical_region_id2));
290
291 let r = metric_engine
293 .sync_region(
294 target_physical_region_id,
295 SyncRegionFromRequest::FromRegion {
296 source_region_id: source_physical_region_id,
297 parallelism: 1,
298 },
299 )
300 .await
301 .unwrap();
302 let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
303 assert!(new_opened_logical_region_ids.is_empty());
304
305 metric_engine
307 .handle_request(
308 target_physical_region_id,
309 RegionRequest::Close(RegionCloseRequest {}),
310 )
311 .await
312 .unwrap();
313 let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
314 .into_iter()
315 .collect();
316 metric_engine
317 .handle_request(
318 target_physical_region_id,
319 RegionRequest::Open(RegionOpenRequest {
320 engine: METRIC_ENGINE_NAME.to_string(),
321 table_dir: "/test_dir1".to_string(),
322 path_type: PathType::Bare,
323 options: physical_region_option,
324 skip_wal_replay: false,
325 checkpoint: None,
326 }),
327 )
328 .await
329 .unwrap();
330 let logical_regions = env
331 .metadata_region()
332 .logical_regions(target_physical_region_id)
333 .await
334 .unwrap();
335 assert_eq!(logical_regions.len(), 2);
336 assert!(logical_regions.contains(&target_logical_region_id1));
337 assert!(logical_regions.contains(&target_logical_region_id2));
338 }
339
340 #[tokio::test]
341 async fn test_sync_region_from_region_with_no_files() {
342 common_telemetry::init_default_ut_logging();
343 let env = TestEnv::new().await;
344 let metric_engine = env.metric();
345 let source_physical_region_id = RegionId::new(1024, 0);
346 env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
347 .await;
348 let target_physical_region_id = RegionId::new(1024, 1);
349 env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
350 .await;
351 let r = metric_engine
352 .sync_region(
353 target_physical_region_id,
354 SyncRegionFromRequest::FromRegion {
355 source_region_id: source_physical_region_id,
356 parallelism: 1,
357 },
358 )
359 .await
360 .unwrap();
361 let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
362 assert!(new_opened_logical_region_ids.is_empty());
363 }
364
365 #[tokio::test]
366 async fn test_sync_region_from_region_source_not_exist() {
367 common_telemetry::init_default_ut_logging();
368 let env = TestEnv::new().await;
369 let metric_engine = env.metric();
370 let source_physical_region_id = RegionId::new(1024, 0);
371 let target_physical_region_id = RegionId::new(1024, 1);
372 env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
373 .await;
374 let err = metric_engine
375 .sync_region(
376 target_physical_region_id,
377 SyncRegionFromRequest::FromRegion {
378 source_region_id: source_physical_region_id,
379 parallelism: 1,
380 },
381 )
382 .await
383 .unwrap_err();
384 assert_eq!(err.status_code(), StatusCode::InvalidArguments);
385 }
386}