metric_engine/engine/sync/
manifest.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncRegionFromResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24    MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29    /// Syncs the region from the given manifest information (leader-follower scenario).
30    ///
31    /// This operation:
32    /// 1. Syncs the metadata region manifest to the target version.
33    /// 2. Syncs the data region manifest to the target version.
34    /// 3. Recovers states and returns newly opened logical regions (if metadata was synced)
35    pub async fn sync_region_from_manifest(
36        &self,
37        region_id: RegionId,
38        manifest_info: RegionManifestInfo,
39    ) -> Result<SyncRegionFromResponse> {
40        ensure!(
41            manifest_info.is_metric(),
42            MetricManifestInfoSnafu { region_id }
43        );
44
45        let metadata_region_id = utils::to_metadata_region_id(region_id);
46        // checked by ensure above
47        let metadata_manifest_version = manifest_info
48            .metadata_manifest_version()
49            .unwrap_or_default();
50        let metadata_flushed_entry_id = manifest_info
51            .metadata_flushed_entry_id()
52            .unwrap_or_default();
53        let metadata_region_manifest =
54            RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
55        let metadata_synced = self
56            .mito
57            .sync_region(metadata_region_id, metadata_region_manifest.into())
58            .await
59            .context(MitoSyncOperationSnafu)?
60            .is_data_synced();
61
62        let data_region_id = utils::to_data_region_id(region_id);
63        let data_manifest_version = manifest_info.data_manifest_version();
64        let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
65        let data_region_manifest =
66            RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
67
68        let data_synced = self
69            .mito
70            .sync_region(data_region_id, data_region_manifest.into())
71            .await
72            .context(MitoSyncOperationSnafu)?
73            .is_data_synced();
74
75        if !metadata_synced {
76            return Ok(SyncRegionFromResponse::Metric {
77                metadata_synced,
78                data_synced,
79                new_opened_logical_region_ids: vec![],
80            });
81        }
82
83        let now = Instant::now();
84        // Recovers the states from the metadata region
85        // if the metadata manifest version is updated.
86        let physical_region_options = *self
87            .state
88            .read()
89            .unwrap()
90            .physical_region_states()
91            .get(&data_region_id)
92            .context(PhysicalRegionNotFoundSnafu {
93                region_id: data_region_id,
94            })?
95            .options();
96        let new_opened_logical_region_ids = self
97            .recover_states(data_region_id, physical_region_options)
98            .await?;
99        info!(
100            "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
101            data_region_id,
102            now.elapsed(),
103            new_opened_logical_region_ids
104        );
105
106        Ok(SyncRegionFromResponse::Metric {
107            metadata_synced,
108            data_synced,
109            new_opened_logical_region_ids,
110        })
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::collections::HashMap;
117
118    use api::v1::SemanticType;
119    use common_query::prelude::greptime_timestamp;
120    use common_telemetry::info;
121    use datatypes::data_type::ConcreteDataType;
122    use datatypes::schema::ColumnSchema;
123    use store_api::metadata::ColumnMetadata;
124    use store_api::region_engine::{RegionEngine, RegionManifestInfo};
125    use store_api::region_request::{
126        AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
127    };
128    use store_api::storage::RegionId;
129
130    use crate::metadata_region::MetadataRegion;
131    use crate::test_util::TestEnv;
132
133    #[tokio::test]
134    async fn test_sync_region_with_new_created_logical_regions() {
135        common_telemetry::init_default_ut_logging();
136        let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
137        env.init_metric_region().await;
138
139        info!("creating follower engine");
140        // Create a follower engine.
141        let (_follower_mito, follower_metric) = env.create_follower_engine().await;
142
143        let physical_region_id = env.default_physical_region_id();
144
145        // Flushes the physical region
146        let metric_engine = env.metric();
147        metric_engine
148            .handle_request(
149                env.default_physical_region_id(),
150                RegionRequest::Flush(RegionFlushRequest::default()),
151            )
152            .await
153            .unwrap();
154
155        let response = follower_metric
156            .sync_region(
157                physical_region_id,
158                RegionManifestInfo::metric(1, 0, 1, 0).into(),
159            )
160            .await
161            .unwrap();
162        assert!(response.is_metric());
163        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
164        assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
165
166        // Sync again, no new logical region should be opened
167        let response = follower_metric
168            .sync_region(
169                physical_region_id,
170                RegionManifestInfo::metric(1, 0, 1, 0).into(),
171            )
172            .await
173            .unwrap();
174        assert!(response.is_metric());
175        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
176        assert!(new_opened_logical_region_ids.is_empty());
177    }
178
179    fn test_alter_logical_region_request() -> RegionAlterRequest {
180        RegionAlterRequest {
181            kind: AlterKind::AddColumns {
182                columns: vec![AddColumn {
183                    column_metadata: ColumnMetadata {
184                        column_id: 0,
185                        semantic_type: SemanticType::Tag,
186                        column_schema: ColumnSchema::new(
187                            "tag1",
188                            ConcreteDataType::string_datatype(),
189                            false,
190                        ),
191                    },
192                    location: None,
193                }],
194            },
195        }
196    }
197
198    #[tokio::test]
199    async fn test_sync_region_alter_alter_logical_region() {
200        common_telemetry::init_default_ut_logging();
201        let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
202        env.init_metric_region().await;
203
204        info!("creating follower engine");
205        let physical_region_id = env.default_physical_region_id();
206        // Flushes the physical region
207        let metric_engine = env.metric();
208        metric_engine
209            .handle_request(
210                env.default_physical_region_id(),
211                RegionRequest::Flush(RegionFlushRequest::default()),
212            )
213            .await
214            .unwrap();
215
216        // Create a follower engine.
217        let (follower_mito, follower_metric) = env.create_follower_engine().await;
218        let metric_engine = env.metric();
219        let engine_inner = env.metric().inner;
220        let region_id = env.default_logical_region_id();
221        let request = test_alter_logical_region_request();
222
223        engine_inner
224            .alter_logical_regions(
225                physical_region_id,
226                vec![(region_id, request)],
227                &mut HashMap::new(),
228            )
229            .await
230            .unwrap();
231
232        // Flushes the physical region
233        metric_engine
234            .handle_request(
235                env.default_physical_region_id(),
236                RegionRequest::Flush(RegionFlushRequest::default()),
237            )
238            .await
239            .unwrap();
240
241        // Sync the follower engine
242        let response = follower_metric
243            .sync_region(
244                physical_region_id,
245                RegionManifestInfo::metric(2, 0, 2, 0).into(),
246            )
247            .await
248            .unwrap();
249        assert!(response.is_metric());
250        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
251        assert!(new_opened_logical_region_ids.is_empty());
252
253        let logical_region_id = env.default_logical_region_id();
254        let metadata_region = MetadataRegion::new(follower_mito.clone());
255        let semantic_type = metadata_region
256            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
257            .await
258            .unwrap()
259            .unwrap();
260        assert_eq!(semantic_type, SemanticType::Tag);
261        let timestamp_index = metadata_region
262            .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
263            .await
264            .unwrap()
265            .unwrap();
266        assert_eq!(timestamp_index, SemanticType::Timestamp);
267    }
268}