metric_engine/engine/
sync.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{ensure, OptionExt, ResultExt};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24    MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29    pub async fn sync_region(
30        &self,
31        region_id: RegionId,
32        manifest_info: RegionManifestInfo,
33    ) -> Result<SyncManifestResponse> {
34        ensure!(
35            manifest_info.is_metric(),
36            MetricManifestInfoSnafu { region_id }
37        );
38
39        let metadata_region_id = utils::to_metadata_region_id(region_id);
40        // checked by ensure above
41        let metadata_manifest_version = manifest_info
42            .metadata_manifest_version()
43            .unwrap_or_default();
44        let metadata_flushed_entry_id = manifest_info
45            .metadata_flushed_entry_id()
46            .unwrap_or_default();
47        let metadata_region_manifest =
48            RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
49        let metadata_synced = self
50            .mito
51            .sync_region(metadata_region_id, metadata_region_manifest)
52            .await
53            .context(MitoSyncOperationSnafu)?
54            .is_data_synced();
55
56        let data_region_id = utils::to_data_region_id(region_id);
57        let data_manifest_version = manifest_info.data_manifest_version();
58        let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
59        let data_region_manifest =
60            RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
61
62        let data_synced = self
63            .mito
64            .sync_region(data_region_id, data_region_manifest)
65            .await
66            .context(MitoSyncOperationSnafu)?
67            .is_data_synced();
68
69        if !metadata_synced {
70            return Ok(SyncManifestResponse::Metric {
71                metadata_synced,
72                data_synced,
73                new_opened_logical_region_ids: vec![],
74            });
75        }
76
77        let now = Instant::now();
78        // Recovers the states from the metadata region
79        // if the metadata manifest version is updated.
80        let physical_region_options = *self
81            .state
82            .read()
83            .unwrap()
84            .physical_region_states()
85            .get(&data_region_id)
86            .context(PhysicalRegionNotFoundSnafu {
87                region_id: data_region_id,
88            })?
89            .options();
90        let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
91            PhysicalRegionNotFoundSnafu {
92                region_id: data_region_id,
93            },
94        )?;
95        let new_opened_logical_region_ids = self
96            .recover_states(
97                data_region_id,
98                primary_key_encoding,
99                physical_region_options,
100            )
101            .await?;
102        info!(
103                "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
104                data_region_id,
105                now.elapsed(),
106                new_opened_logical_region_ids
107            );
108
109        Ok(SyncManifestResponse::Metric {
110            metadata_synced,
111            data_synced,
112            new_opened_logical_region_ids,
113        })
114    }
115}
116
117#[cfg(test)]
118mod tests {
119    use std::collections::HashMap;
120
121    use api::v1::SemanticType;
122    use common_telemetry::info;
123    use datatypes::data_type::ConcreteDataType;
124    use datatypes::schema::ColumnSchema;
125    use store_api::metadata::ColumnMetadata;
126    use store_api::region_engine::{RegionEngine, RegionManifestInfo};
127    use store_api::region_request::{
128        AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
129    };
130    use store_api::storage::RegionId;
131
132    use crate::metadata_region::MetadataRegion;
133    use crate::test_util::TestEnv;
134
135    #[tokio::test]
136    async fn test_sync_region_with_new_created_logical_regions() {
137        common_telemetry::init_default_ut_logging();
138        let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
139        env.init_metric_region().await;
140
141        info!("creating follower engine");
142        // Create a follower engine.
143        let (_follower_mito, follower_metric) = env.create_follower_engine().await;
144
145        let physical_region_id = env.default_physical_region_id();
146
147        // Flushes the physical region
148        let metric_engine = env.metric();
149        metric_engine
150            .handle_request(
151                env.default_physical_region_id(),
152                RegionRequest::Flush(RegionFlushRequest::default()),
153            )
154            .await
155            .unwrap();
156
157        let response = follower_metric
158            .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
159            .await
160            .unwrap();
161        assert!(response.is_metric());
162        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
163        assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
164
165        // Sync again, no new logical region should be opened
166        let response = follower_metric
167            .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
168            .await
169            .unwrap();
170        assert!(response.is_metric());
171        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
172        assert!(new_opened_logical_region_ids.is_empty());
173    }
174
175    fn test_alter_logical_region_request() -> RegionAlterRequest {
176        RegionAlterRequest {
177            kind: AlterKind::AddColumns {
178                columns: vec![AddColumn {
179                    column_metadata: ColumnMetadata {
180                        column_id: 0,
181                        semantic_type: SemanticType::Tag,
182                        column_schema: ColumnSchema::new(
183                            "tag1",
184                            ConcreteDataType::string_datatype(),
185                            false,
186                        ),
187                    },
188                    location: None,
189                }],
190            },
191        }
192    }
193
194    #[tokio::test]
195    async fn test_sync_region_alter_alter_logical_region() {
196        common_telemetry::init_default_ut_logging();
197        let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
198        env.init_metric_region().await;
199
200        info!("creating follower engine");
201        let physical_region_id = env.default_physical_region_id();
202        // Flushes the physical region
203        let metric_engine = env.metric();
204        metric_engine
205            .handle_request(
206                env.default_physical_region_id(),
207                RegionRequest::Flush(RegionFlushRequest::default()),
208            )
209            .await
210            .unwrap();
211
212        // Create a follower engine.
213        let (follower_mito, follower_metric) = env.create_follower_engine().await;
214        let metric_engine = env.metric();
215        let engine_inner = env.metric().inner;
216        let region_id = env.default_logical_region_id();
217        let request = test_alter_logical_region_request();
218
219        engine_inner
220            .alter_logical_regions(
221                physical_region_id,
222                vec![(region_id, request)],
223                &mut HashMap::new(),
224            )
225            .await
226            .unwrap();
227
228        // Flushes the physical region
229        metric_engine
230            .handle_request(
231                env.default_physical_region_id(),
232                RegionRequest::Flush(RegionFlushRequest::default()),
233            )
234            .await
235            .unwrap();
236
237        // Sync the follower engine
238        let response = follower_metric
239            .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
240            .await
241            .unwrap();
242        assert!(response.is_metric());
243        let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
244        assert!(new_opened_logical_region_ids.is_empty());
245
246        let logical_region_id = env.default_logical_region_id();
247        let metadata_region = MetadataRegion::new(follower_mito.clone());
248        let semantic_type = metadata_region
249            .column_semantic_type(physical_region_id, logical_region_id, "tag1")
250            .await
251            .unwrap()
252            .unwrap();
253        assert_eq!(semantic_type, SemanticType::Tag);
254        let timestamp_index = metadata_region
255            .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
256            .await
257            .unwrap()
258            .unwrap();
259        assert_eq!(timestamp_index, SemanticType::Timestamp);
260    }
261}