Skip to main content

metric_engine/engine/sync/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_error::ext::BoxedError;
18use common_telemetry::info;
19use mito2::manifest::action::RegionEdit;
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::region_engine::{MitoCopyRegionFromRequest, SyncRegionFromResponse};
22use store_api::storage::RegionId;
23
24use crate::engine::MetricEngineInner;
25use crate::error::{
26    MissingFilesSnafu, MitoCopyRegionFromOperationSnafu, MitoEditRegionSnafu,
27    PhysicalRegionNotFoundSnafu, Result,
28};
29use crate::utils;
30
31impl MetricEngineInner {
32    /// Syncs the logical regions from the source region to the target region in the metric engine.
33    ///
34    /// This operation:
35    /// 1. Copies SST files from source metadata region to target metadata region
36    /// 2. Transforms logical region metadata (updates region numbers to match target)
37    /// 3. Edits target manifest to remove old file entries (copied files)
38    /// 4. Recovers states and returns newly opened logical region IDs
39    ///
40    /// **Note**: Only the metadata region is synced. The data region is not affected.
41    pub(crate) async fn sync_region_from_region(
42        &self,
43        region_id: RegionId,
44        source_region_id: RegionId,
45        parallelism: usize,
46    ) -> Result<SyncRegionFromResponse> {
47        let source_metadata_region_id = utils::to_metadata_region_id(source_region_id);
48        let target_metadata_region_id = utils::to_metadata_region_id(region_id);
49        let target_data_region_id = utils::to_data_region_id(region_id);
50        let source_data_region_id = utils::to_data_region_id(source_region_id);
51        info!(
52            "Syncing region from region {} to region {}, parallelism: {}",
53            source_region_id, region_id, parallelism
54        );
55
56        let res = self
57            .mito
58            .copy_region_from(
59                target_metadata_region_id,
60                MitoCopyRegionFromRequest {
61                    source_region_id: source_metadata_region_id,
62                    parallelism,
63                },
64            )
65            .await
66            .map_err(BoxedError::new)
67            .context(MitoCopyRegionFromOperationSnafu {
68                source_region_id: source_metadata_region_id,
69                target_region_id: target_metadata_region_id,
70            })?;
71
72        if res.copied_file_ids.is_empty() {
73            info!(
74                "No files were copied from source region {} to target region {}, copied file ids are empty",
75                source_metadata_region_id, target_metadata_region_id
76            );
77            return Ok(SyncRegionFromResponse::Metric {
78                metadata_synced: false,
79                data_synced: false,
80                new_opened_logical_region_ids: vec![],
81            });
82        }
83
84        let target_region = self.mito.find_region(target_metadata_region_id).context(
85            PhysicalRegionNotFoundSnafu {
86                region_id: target_metadata_region_id,
87            },
88        )?;
89        let files_to_remove = target_region.file_metas(&res.copied_file_ids).await;
90        let missing_file_ids = res
91            .copied_file_ids
92            .iter()
93            .zip(&files_to_remove)
94            .filter_map(|(file_id, maybe_meta)| {
95                if maybe_meta.is_none() {
96                    Some(*file_id)
97                } else {
98                    None
99                }
100            })
101            .collect::<Vec<_>>();
102        // `copy_region_from` does not trigger compaction,
103        // so there should be no files removed and thus no missing files.
104        ensure!(
105            missing_file_ids.is_empty(),
106            MissingFilesSnafu {
107                region_id: target_metadata_region_id,
108                file_ids: missing_file_ids,
109            }
110        );
111        let files_to_remove = files_to_remove.into_iter().flatten().collect::<Vec<_>>();
112        // Transform the logical region metadata of the target data region.
113        self.metadata_region
114            .transform_logical_region_metadata(target_data_region_id, source_data_region_id)
115            .await?;
116
117        let edit = RegionEdit {
118            files_to_add: vec![],
119            files_to_remove: files_to_remove.clone(),
120            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
121            compaction_time_window: None,
122            flushed_entry_id: None,
123            flushed_sequence: None,
124            committed_sequence: None,
125        };
126        self.mito
127            .edit_region(target_metadata_region_id, edit)
128            .await
129            .map_err(BoxedError::new)
130            .context(MitoEditRegionSnafu {
131                region_id: target_metadata_region_id,
132            })?;
133        info!(
134            "Successfully edit metadata region: {} after syncing from source metadata region: {}, files to remove: {:?}",
135            target_metadata_region_id,
136            source_metadata_region_id,
137            files_to_remove
138                .iter()
139                .map(|meta| meta.file_id)
140                .collect::<Vec<_>>(),
141        );
142
143        let now = Instant::now();
144        // Always recover states from the target metadata region after syncing
145        // from the source metadata region.
146        let physical_region_options = *self
147            .state
148            .read()
149            .unwrap()
150            .physical_region_states()
151            .get(&target_data_region_id)
152            .context(PhysicalRegionNotFoundSnafu {
153                region_id: target_data_region_id,
154            })?
155            .options();
156        let new_opened_logical_region_ids = self
157            .recover_states(target_data_region_id, physical_region_options)
158            .await?;
159        info!(
160            "Sync metadata region from source region {} to target region {}, recover states cost: {:?}, new opened logical region ids: {:?}",
161            source_metadata_region_id,
162            target_metadata_region_id,
163            now.elapsed(),
164            new_opened_logical_region_ids
165        );
166
167        Ok(SyncRegionFromResponse::Metric {
168            metadata_synced: true,
169            data_synced: false,
170            new_opened_logical_region_ids,
171        })
172    }
173}
174
175#[cfg(test)]
176mod tests {
177
178    use common_error::ext::ErrorExt;
179    use common_error::status_code::StatusCode;
180    use common_telemetry::debug;
181    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
182    use store_api::region_engine::{RegionEngine, SyncRegionFromRequest};
183    use store_api::region_request::{
184        BatchRegionDdlRequest, PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest,
185        RegionRequest,
186    };
187    use store_api::storage::RegionId;
188
189    use crate::metadata_region::MetadataRegion;
190    use crate::test_util::{TestEnv, create_logical_region_request};
191
192    async fn assert_logical_table_columns(
193        metadata_region: &MetadataRegion,
194        physical_region_id: RegionId,
195        logical_region_id: RegionId,
196        expected_columns: &[&str],
197    ) {
198        let mut columns = metadata_region
199            .logical_columns(physical_region_id, logical_region_id)
200            .await
201            .unwrap()
202            .into_iter()
203            .map(|(n, _)| n)
204            .collect::<Vec<_>>();
205        columns.sort_unstable();
206        assert_eq!(columns, expected_columns);
207    }
208
209    #[tokio::test]
210    async fn test_sync_region_from_region() {
211        common_telemetry::init_default_ut_logging();
212        let env = TestEnv::new().await;
213        let metric_engine = env.metric();
214        let source_physical_region_id = RegionId::new(1024, 0);
215        let logical_region_id1 = RegionId::new(1025, 0);
216        let logical_region_id2 = RegionId::new(1026, 0);
217        env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
218            .await;
219        let region_create_request1 =
220            create_logical_region_request(&["job"], source_physical_region_id, "logical1");
221        let region_create_request2 =
222            create_logical_region_request(&["host"], source_physical_region_id, "logical2");
223        metric_engine
224            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
225                (logical_region_id1, region_create_request1),
226                (logical_region_id2, region_create_request2),
227            ]))
228            .await
229            .unwrap();
230        debug!("Flushing source physical region");
231        metric_engine
232            .handle_request(
233                source_physical_region_id,
234                RegionRequest::Flush(RegionFlushRequest::default()),
235            )
236            .await
237            .unwrap();
238        let logical_regions = metric_engine
239            .logical_regions(source_physical_region_id)
240            .await
241            .unwrap();
242        assert!(logical_regions.contains(&logical_region_id1));
243        assert!(logical_regions.contains(&logical_region_id2));
244
245        let target_physical_region_id = RegionId::new(1024, 1);
246        let target_logical_region_id1 = RegionId::new(1025, 1);
247        let target_logical_region_id2 = RegionId::new(1026, 1);
248        // Prepare target physical region
249        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
250            .await;
251        let r = metric_engine
252            .sync_region(
253                target_physical_region_id,
254                SyncRegionFromRequest::FromRegion {
255                    source_region_id: source_physical_region_id,
256                    parallelism: 1,
257                },
258            )
259            .await
260            .unwrap();
261        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
262        assert_eq!(new_opened_logical_region_ids.len(), 2);
263        assert!(new_opened_logical_region_ids.contains(&target_logical_region_id1));
264        assert!(new_opened_logical_region_ids.contains(&target_logical_region_id2));
265        debug!("Sync region from again");
266        assert_logical_table_columns(
267            &env.metadata_region(),
268            target_physical_region_id,
269            target_logical_region_id1,
270            &["greptime_timestamp", "greptime_value", "job"],
271        )
272        .await;
273        assert_logical_table_columns(
274            &env.metadata_region(),
275            target_physical_region_id,
276            target_logical_region_id2,
277            &["greptime_timestamp", "greptime_value", "host"],
278        )
279        .await;
280        let logical_regions = env
281            .metadata_region()
282            .logical_regions(target_physical_region_id)
283            .await
284            .unwrap();
285        assert_eq!(logical_regions.len(), 2);
286        assert!(logical_regions.contains(&target_logical_region_id1));
287        assert!(logical_regions.contains(&target_logical_region_id2));
288
289        // Should be ok to sync region from again.
290        let r = metric_engine
291            .sync_region(
292                target_physical_region_id,
293                SyncRegionFromRequest::FromRegion {
294                    source_region_id: source_physical_region_id,
295                    parallelism: 1,
296                },
297            )
298            .await
299            .unwrap();
300        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
301        assert!(new_opened_logical_region_ids.is_empty());
302
303        // Try to close region and reopen it, should be ok.
304        metric_engine
305            .handle_request(
306                target_physical_region_id,
307                RegionRequest::Close(RegionCloseRequest {}),
308            )
309            .await
310            .unwrap();
311        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
312            .into_iter()
313            .collect();
314        metric_engine
315            .handle_request(
316                target_physical_region_id,
317                RegionRequest::Open(RegionOpenRequest {
318                    engine: METRIC_ENGINE_NAME.to_string(),
319                    table_dir: "/test_dir1".to_string(),
320                    path_type: PathType::Bare,
321                    options: physical_region_option,
322                    skip_wal_replay: false,
323                    checkpoint: None,
324                }),
325            )
326            .await
327            .unwrap();
328        let logical_regions = env
329            .metadata_region()
330            .logical_regions(target_physical_region_id)
331            .await
332            .unwrap();
333        assert_eq!(logical_regions.len(), 2);
334        assert!(logical_regions.contains(&target_logical_region_id1));
335        assert!(logical_regions.contains(&target_logical_region_id2));
336    }
337
338    #[tokio::test]
339    async fn test_sync_region_from_region_with_no_files() {
340        common_telemetry::init_default_ut_logging();
341        let env = TestEnv::new().await;
342        let metric_engine = env.metric();
343        let source_physical_region_id = RegionId::new(1024, 0);
344        env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
345            .await;
346        let target_physical_region_id = RegionId::new(1024, 1);
347        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
348            .await;
349        let r = metric_engine
350            .sync_region(
351                target_physical_region_id,
352                SyncRegionFromRequest::FromRegion {
353                    source_region_id: source_physical_region_id,
354                    parallelism: 1,
355                },
356            )
357            .await
358            .unwrap();
359        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
360        assert!(new_opened_logical_region_ids.is_empty());
361    }
362
363    #[tokio::test]
364    async fn test_sync_region_from_region_source_not_exist() {
365        common_telemetry::init_default_ut_logging();
366        let env = TestEnv::new().await;
367        let metric_engine = env.metric();
368        let source_physical_region_id = RegionId::new(1024, 0);
369        let target_physical_region_id = RegionId::new(1024, 1);
370        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
371            .await;
372        let err = metric_engine
373            .sync_region(
374                target_physical_region_id,
375                SyncRegionFromRequest::FromRegion {
376                    source_region_id: source_physical_region_id,
377                    parallelism: 1,
378                },
379            )
380            .await
381            .unwrap_err();
382        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
383    }
384}