metric_engine/engine/
sync.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{ensure, OptionExt, ResultExt};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24    MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29    pub async fn sync_region(
30        &self,
31        region_id: RegionId,
32        manifest_info: RegionManifestInfo,
33    ) -> Result<SyncManifestResponse> {
34        ensure!(
35            manifest_info.is_metric(),
36            MetricManifestInfoSnafu { region_id }
37        );
38
39        let metadata_region_id = utils::to_metadata_region_id(region_id);
40        // checked by ensure above
41        let metadata_manifest_version = manifest_info
42            .metadata_manifest_version()
43            .unwrap_or_default();
44        let metadata_flushed_entry_id = manifest_info
45            .metadata_flushed_entry_id()
46            .unwrap_or_default();
47        let metadata_region_manifest =
48            RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
49        let metadata_synced = self
50            .mito
51            .sync_region(metadata_region_id, metadata_region_manifest)
52            .await
53            .context(MitoSyncOperationSnafu)?
54            .is_data_synced();
55
56        let data_region_id = utils::to_data_region_id(region_id);
57        let data_manifest_version = manifest_info.data_manifest_version();
58        let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
59        let data_region_manifest =
60            RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
61
62        let data_synced = self
63            .mito
64            .sync_region(data_region_id, data_region_manifest)
65            .await
66            .context(MitoSyncOperationSnafu)?
67            .is_data_synced();
68
69        if !metadata_synced {
70            return Ok(SyncManifestResponse::Metric {
71                metadata_synced,
72                data_synced,
73                new_opened_logical_region_ids: vec![],
74            });
75        }
76
77        let now = Instant::now();
78        // Recovers the states from the metadata region
79        // if the metadata manifest version is updated.
80        let physical_region_options = *self
81            .state
82            .read()
83            .unwrap()
84            .physical_region_states()
85            .get(&data_region_id)
86            .context(PhysicalRegionNotFoundSnafu {
87                region_id: data_region_id,
88            })?
89            .options();
90        let new_opened_logical_region_ids = self
91            .recover_states(data_region_id, physical_region_options)
92            .await?;
93        info!(
94                "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
95                data_region_id,
96                now.elapsed(),
97                new_opened_logical_region_ids
98            );
99
100        Ok(SyncManifestResponse::Metric {
101            metadata_synced,
102            data_synced,
103            new_opened_logical_region_ids,
104        })
105    }
106}
107
108#[cfg(test)]
109mod tests {
110    use std::collections::HashMap;
111
112    use api::v1::SemanticType;
113    use common_telemetry::info;
114    use datatypes::data_type::ConcreteDataType;
115    use datatypes::schema::ColumnSchema;
116    use store_api::metadata::ColumnMetadata;
117    use store_api::region_engine::{RegionEngine, RegionManifestInfo};
118    use store_api::region_request::{
119        AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
120    };
121    use store_api::storage::RegionId;
122
123    use crate::metadata_region::MetadataRegion;
124    use crate::test_util::TestEnv;
125
126    #[tokio::test]
127    async fn test_sync_region_with_new_created_logical_regions() {
128        common_telemetry::init_default_ut_logging();
129        let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
130        env.init_metric_region().await;
131
132        info!("creating follower engine");
133        // Create a follower engine.
134        let (_follower_mito, follower_metric) = env.create_follower_engine().await;
135
136        let physical_region_id = env.default_physical_region_id();
137
138        // Flushes the physical region
139        let metric_engine = env.metric();
140        metric_engine
141            .handle_request(
142                env.default_physical_region_id(),
143                RegionRequest::Flush(RegionFlushRequest::default()),
144            )
145            .await
146            .unwrap();
147
148        let response = follower_metric
149            .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
150            .await
151            .unwrap();
152        assert!(response.is_metric());
153        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
154        assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
155
156        // Sync again, no new logical region should be opened
157        let response = follower_metric
158            .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
159            .await
160            .unwrap();
161        assert!(response.is_metric());
162        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
163        assert!(new_opened_logical_region_ids.is_empty());
164    }
165
166    fn test_alter_logical_region_request() -> RegionAlterRequest {
167        RegionAlterRequest {
168            kind: AlterKind::AddColumns {
169                columns: vec![AddColumn {
170                    column_metadata: ColumnMetadata {
171                        column_id: 0,
172                        semantic_type: SemanticType::Tag,
173                        column_schema: ColumnSchema::new(
174                            "tag1",
175                            ConcreteDataType::string_datatype(),
176                            false,
177                        ),
178                    },
179                    location: None,
180                }],
181            },
182        }
183    }
184
185    #[tokio::test]
186    async fn test_sync_region_alter_alter_logical_region() {
187        common_telemetry::init_default_ut_logging();
188        let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
189        env.init_metric_region().await;
190
191        info!("creating follower engine");
192        let physical_region_id = env.default_physical_region_id();
193        // Flushes the physical region
194        let metric_engine = env.metric();
195        metric_engine
196            .handle_request(
197                env.default_physical_region_id(),
198                RegionRequest::Flush(RegionFlushRequest::default()),
199            )
200            .await
201            .unwrap();
202
203        // Create a follower engine.
204        let (follower_mito, follower_metric) = env.create_follower_engine().await;
205        let metric_engine = env.metric();
206        let engine_inner = env.metric().inner;
207        let region_id = env.default_logical_region_id();
208        let request = test_alter_logical_region_request();
209
210        engine_inner
211            .alter_logical_regions(
212                physical_region_id,
213                vec![(region_id, request)],
214                &mut HashMap::new(),
215            )
216            .await
217            .unwrap();
218
219        // Flushes the physical region
220        metric_engine
221            .handle_request(
222                env.default_physical_region_id(),
223                RegionRequest::Flush(RegionFlushRequest::default()),
224            )
225            .await
226            .unwrap();
227
228        // Sync the follower engine
229        let response = follower_metric
230            .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
231            .await
232            .unwrap();
233        assert!(response.is_metric());
234        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
235        assert!(new_opened_logical_region_ids.is_empty());
236
237        let logical_region_id = env.default_logical_region_id();
238        let metadata_region = MetadataRegion::new(follower_mito.clone());
239        let semantic_type = metadata_region
240            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
241            .await
242            .unwrap()
243            .unwrap();
244        assert_eq!(semantic_type, SemanticType::Tag);
245        let timestamp_index = metadata_region
246            .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
247            .await
248            .unwrap()
249            .unwrap();
250        assert_eq!(timestamp_index, SemanticType::Timestamp);
251    }
252}