metric_engine/engine/sync/
region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_error::ext::BoxedError;
18use common_telemetry::info;
19use mito2::manifest::action::RegionEdit;
20use snafu::{OptionExt, ResultExt, ensure};
21use store_api::region_engine::{MitoCopyRegionFromRequest, SyncRegionFromResponse};
22use store_api::storage::RegionId;
23
24use crate::engine::MetricEngineInner;
25use crate::error::{
26    MissingFilesSnafu, MitoCopyRegionFromOperationSnafu, MitoEditRegionSnafu,
27    PhysicalRegionNotFoundSnafu, Result,
28};
29use crate::utils;
30
31impl MetricEngineInner {
32    /// Syncs the logical regions from the source region to the target region in the metric engine.
33    ///
34    /// This operation:
35    /// 1. Copies SST files from source metadata region to target metadata region
36    /// 2. Transforms logical region metadata (updates region numbers to match target)
37    /// 3. Edits target manifest to remove old file entries (copied files)
38    /// 4. Recovers states and returns newly opened logical region IDs
39    ///
40    /// **Note**: Only the metadata region is synced. The data region is not affected.
41    pub(crate) async fn sync_region_from_region(
42        &self,
43        region_id: RegionId,
44        source_region_id: RegionId,
45        parallelism: usize,
46    ) -> Result<SyncRegionFromResponse> {
47        let source_metadata_region_id = utils::to_metadata_region_id(source_region_id);
48        let target_metadata_region_id = utils::to_metadata_region_id(region_id);
49        let target_data_region_id = utils::to_data_region_id(region_id);
50        let source_data_region_id = utils::to_data_region_id(source_region_id);
51        info!(
52            "Syncing region from region {} to region {}, parallelism: {}",
53            source_region_id, region_id, parallelism
54        );
55
56        let res = self
57            .mito
58            .copy_region_from(
59                target_metadata_region_id,
60                MitoCopyRegionFromRequest {
61                    source_region_id: source_metadata_region_id,
62                    parallelism,
63                },
64            )
65            .await
66            .map_err(BoxedError::new)
67            .context(MitoCopyRegionFromOperationSnafu {
68                source_region_id: source_metadata_region_id,
69                target_region_id: target_metadata_region_id,
70            })?;
71
72        if res.copied_file_ids.is_empty() {
73            info!(
74                "No files were copied from source region {} to target region {}, copied file ids are empty",
75                source_metadata_region_id, target_metadata_region_id
76            );
77            return Ok(SyncRegionFromResponse::Metric {
78                metadata_synced: false,
79                data_synced: false,
80                new_opened_logical_region_ids: vec![],
81            });
82        }
83
84        let target_region = self.mito.find_region(target_metadata_region_id).context(
85            PhysicalRegionNotFoundSnafu {
86                region_id: target_metadata_region_id,
87            },
88        )?;
89        let files_to_remove = target_region.file_metas(&res.copied_file_ids).await;
90        let missing_file_ids = res
91            .copied_file_ids
92            .iter()
93            .zip(&files_to_remove)
94            .filter_map(|(file_id, maybe_meta)| {
95                if maybe_meta.is_none() {
96                    Some(*file_id)
97                } else {
98                    None
99                }
100            })
101            .collect::<Vec<_>>();
102        // `copy_region_from` does not trigger compaction,
103        // so there should be no files removed and thus no missing files.
104        ensure!(
105            missing_file_ids.is_empty(),
106            MissingFilesSnafu {
107                region_id: target_metadata_region_id,
108                file_ids: missing_file_ids,
109            }
110        );
111        let files_to_remove = files_to_remove.into_iter().flatten().collect::<Vec<_>>();
112        // Transform the logical region metadata of the target data region.
113        self.metadata_region
114            .transform_logical_region_metadata(target_data_region_id, source_data_region_id)
115            .await?;
116
117        let edit = RegionEdit {
118            files_to_add: vec![],
119            files_to_remove: files_to_remove.clone(),
120            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
121            compaction_time_window: None,
122            flushed_entry_id: None,
123            flushed_sequence: None,
124            committed_sequence: None,
125        };
126        self.mito
127            .edit_region(target_metadata_region_id, edit)
128            .await
129            .map_err(BoxedError::new)
130            .context(MitoEditRegionSnafu {
131                region_id: target_metadata_region_id,
132            })?;
133        info!(
134            "Successfully edit metadata region: {} after syncing from source metadata region: {}, files to remove: {:?}",
135            target_metadata_region_id,
136            source_metadata_region_id,
137            files_to_remove
138                .iter()
139                .map(|meta| meta.file_id)
140                .collect::<Vec<_>>(),
141        );
142
143        let now = Instant::now();
144        // Always recover states from the target metadata region after syncing
145        // from the source metadata region.
146        let physical_region_options = *self
147            .state
148            .read()
149            .unwrap()
150            .physical_region_states()
151            .get(&target_data_region_id)
152            .context(PhysicalRegionNotFoundSnafu {
153                region_id: target_data_region_id,
154            })?
155            .options();
156        let new_opened_logical_region_ids = self
157            .recover_states(target_data_region_id, physical_region_options)
158            .await?;
159        info!(
160            "Sync metadata region from source region {} to target region {}, recover states cost: {:?}, new opened logical region ids: {:?}",
161            source_metadata_region_id,
162            target_metadata_region_id,
163            now.elapsed(),
164            new_opened_logical_region_ids
165        );
166
167        Ok(SyncRegionFromResponse::Metric {
168            metadata_synced: true,
169            data_synced: false,
170            new_opened_logical_region_ids,
171        })
172    }
173}
174
175#[cfg(test)]
176mod tests {
177
178    use common_error::ext::ErrorExt;
179    use common_error::status_code::StatusCode;
180    use common_telemetry::debug;
181    use store_api::metric_engine_consts::{METRIC_ENGINE_NAME, PHYSICAL_TABLE_METADATA_KEY};
182    use store_api::region_engine::{RegionEngine, SyncRegionFromRequest};
183    use store_api::region_request::{
184        BatchRegionDdlRequest, PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest,
185        RegionRequest,
186    };
187    use store_api::storage::RegionId;
188
189    use crate::metadata_region::MetadataRegion;
190    use crate::test_util::{TestEnv, create_logical_region_request};
191
192    async fn assert_logical_table_columns(
193        metadata_region: &MetadataRegion,
194        physical_region_id: RegionId,
195        logical_region_id: RegionId,
196        expected_columns: &[&str],
197    ) {
198        let mut columns = metadata_region
199            .logical_columns(physical_region_id, logical_region_id)
200            .await
201            .unwrap()
202            .into_iter()
203            .map(|(n, _)| n)
204            .collect::<Vec<_>>();
205        columns.sort_unstable();
206        assert_eq!(columns, expected_columns);
207    }
208
209    #[tokio::test]
210    async fn test_sync_region_from_region() {
211        common_telemetry::init_default_ut_logging();
212        let env = TestEnv::new().await;
213        let metric_engine = env.metric();
214        let source_physical_region_id = RegionId::new(1024, 0);
215        let logical_region_id1 = RegionId::new(1025, 0);
216        let logical_region_id2 = RegionId::new(1026, 0);
217        env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
218            .await;
219        let region_create_request1 =
220            create_logical_region_request(&["job"], source_physical_region_id, "logical1");
221        let region_create_request2 =
222            create_logical_region_request(&["host"], source_physical_region_id, "logical2");
223        metric_engine
224            .handle_batch_ddl_requests(BatchRegionDdlRequest::Create(vec![
225                (logical_region_id1, region_create_request1),
226                (logical_region_id2, region_create_request2),
227            ]))
228            .await
229            .unwrap();
230        debug!("Flushing source physical region");
231        metric_engine
232            .handle_request(
233                source_physical_region_id,
234                RegionRequest::Flush(RegionFlushRequest {
235                    row_group_size: None,
236                }),
237            )
238            .await
239            .unwrap();
240        let logical_regions = metric_engine
241            .logical_regions(source_physical_region_id)
242            .await
243            .unwrap();
244        assert!(logical_regions.contains(&logical_region_id1));
245        assert!(logical_regions.contains(&logical_region_id2));
246
247        let target_physical_region_id = RegionId::new(1024, 1);
248        let target_logical_region_id1 = RegionId::new(1025, 1);
249        let target_logical_region_id2 = RegionId::new(1026, 1);
250        // Prepare target physical region
251        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
252            .await;
253        let r = metric_engine
254            .sync_region(
255                target_physical_region_id,
256                SyncRegionFromRequest::FromRegion {
257                    source_region_id: source_physical_region_id,
258                    parallelism: 1,
259                },
260            )
261            .await
262            .unwrap();
263        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
264        assert_eq!(new_opened_logical_region_ids.len(), 2);
265        assert!(new_opened_logical_region_ids.contains(&target_logical_region_id1));
266        assert!(new_opened_logical_region_ids.contains(&target_logical_region_id2));
267        debug!("Sync region from again");
268        assert_logical_table_columns(
269            &env.metadata_region(),
270            target_physical_region_id,
271            target_logical_region_id1,
272            &["greptime_timestamp", "greptime_value", "job"],
273        )
274        .await;
275        assert_logical_table_columns(
276            &env.metadata_region(),
277            target_physical_region_id,
278            target_logical_region_id2,
279            &["greptime_timestamp", "greptime_value", "host"],
280        )
281        .await;
282        let logical_regions = env
283            .metadata_region()
284            .logical_regions(target_physical_region_id)
285            .await
286            .unwrap();
287        assert_eq!(logical_regions.len(), 2);
288        assert!(logical_regions.contains(&target_logical_region_id1));
289        assert!(logical_regions.contains(&target_logical_region_id2));
290
291        // Should be ok to sync region from again.
292        let r = metric_engine
293            .sync_region(
294                target_physical_region_id,
295                SyncRegionFromRequest::FromRegion {
296                    source_region_id: source_physical_region_id,
297                    parallelism: 1,
298                },
299            )
300            .await
301            .unwrap();
302        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
303        assert!(new_opened_logical_region_ids.is_empty());
304
305        // Try to close region and reopen it, should be ok.
306        metric_engine
307            .handle_request(
308                target_physical_region_id,
309                RegionRequest::Close(RegionCloseRequest {}),
310            )
311            .await
312            .unwrap();
313        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
314            .into_iter()
315            .collect();
316        metric_engine
317            .handle_request(
318                target_physical_region_id,
319                RegionRequest::Open(RegionOpenRequest {
320                    engine: METRIC_ENGINE_NAME.to_string(),
321                    table_dir: "/test_dir1".to_string(),
322                    path_type: PathType::Bare,
323                    options: physical_region_option,
324                    skip_wal_replay: false,
325                    checkpoint: None,
326                }),
327            )
328            .await
329            .unwrap();
330        let logical_regions = env
331            .metadata_region()
332            .logical_regions(target_physical_region_id)
333            .await
334            .unwrap();
335        assert_eq!(logical_regions.len(), 2);
336        assert!(logical_regions.contains(&target_logical_region_id1));
337        assert!(logical_regions.contains(&target_logical_region_id2));
338    }
339
340    #[tokio::test]
341    async fn test_sync_region_from_region_with_no_files() {
342        common_telemetry::init_default_ut_logging();
343        let env = TestEnv::new().await;
344        let metric_engine = env.metric();
345        let source_physical_region_id = RegionId::new(1024, 0);
346        env.create_physical_region(source_physical_region_id, "/test_dir1", vec![])
347            .await;
348        let target_physical_region_id = RegionId::new(1024, 1);
349        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
350            .await;
351        let r = metric_engine
352            .sync_region(
353                target_physical_region_id,
354                SyncRegionFromRequest::FromRegion {
355                    source_region_id: source_physical_region_id,
356                    parallelism: 1,
357                },
358            )
359            .await
360            .unwrap();
361        let new_opened_logical_region_ids = r.new_opened_logical_region_ids().unwrap();
362        assert!(new_opened_logical_region_ids.is_empty());
363    }
364
365    #[tokio::test]
366    async fn test_sync_region_from_region_source_not_exist() {
367        common_telemetry::init_default_ut_logging();
368        let env = TestEnv::new().await;
369        let metric_engine = env.metric();
370        let source_physical_region_id = RegionId::new(1024, 0);
371        let target_physical_region_id = RegionId::new(1024, 1);
372        env.create_physical_region(target_physical_region_id, "/test_dir1", vec![])
373            .await;
374        let err = metric_engine
375            .sync_region(
376                target_physical_region_id,
377                SyncRegionFromRequest::FromRegion {
378                    source_region_id: source_physical_region_id,
379                    parallelism: 1,
380                },
381            )
382            .await
383            .unwrap_err();
384        assert_eq!(err.status_code(), StatusCode::InvalidArguments);
385    }
386}