metric_engine/engine/sync/
manifest.rs1use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncRegionFromResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24 MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29 pub async fn sync_region_from_manifest(
36 &self,
37 region_id: RegionId,
38 manifest_info: RegionManifestInfo,
39 ) -> Result<SyncRegionFromResponse> {
40 ensure!(
41 manifest_info.is_metric(),
42 MetricManifestInfoSnafu { region_id }
43 );
44
45 let metadata_region_id = utils::to_metadata_region_id(region_id);
46 let metadata_manifest_version = manifest_info
48 .metadata_manifest_version()
49 .unwrap_or_default();
50 let metadata_flushed_entry_id = manifest_info
51 .metadata_flushed_entry_id()
52 .unwrap_or_default();
53 let metadata_region_manifest =
54 RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
55 let metadata_synced = self
56 .mito
57 .sync_region(metadata_region_id, metadata_region_manifest.into())
58 .await
59 .context(MitoSyncOperationSnafu)?
60 .is_data_synced();
61
62 let data_region_id = utils::to_data_region_id(region_id);
63 let data_manifest_version = manifest_info.data_manifest_version();
64 let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
65 let data_region_manifest =
66 RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
67
68 let data_synced = self
69 .mito
70 .sync_region(data_region_id, data_region_manifest.into())
71 .await
72 .context(MitoSyncOperationSnafu)?
73 .is_data_synced();
74
75 if !metadata_synced {
76 return Ok(SyncRegionFromResponse::Metric {
77 metadata_synced,
78 data_synced,
79 new_opened_logical_region_ids: vec![],
80 });
81 }
82
83 let now = Instant::now();
84 let physical_region_options = *self
87 .state
88 .read()
89 .unwrap()
90 .physical_region_states()
91 .get(&data_region_id)
92 .context(PhysicalRegionNotFoundSnafu {
93 region_id: data_region_id,
94 })?
95 .options();
96 let new_opened_logical_region_ids = self
97 .recover_states(data_region_id, physical_region_options)
98 .await?;
99 info!(
100 "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
101 data_region_id,
102 now.elapsed(),
103 new_opened_logical_region_ids
104 );
105
106 Ok(SyncRegionFromResponse::Metric {
107 metadata_synced,
108 data_synced,
109 new_opened_logical_region_ids,
110 })
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::collections::HashMap;
117
118 use api::v1::SemanticType;
119 use common_query::prelude::greptime_timestamp;
120 use common_telemetry::info;
121 use datatypes::data_type::ConcreteDataType;
122 use datatypes::schema::ColumnSchema;
123 use store_api::metadata::ColumnMetadata;
124 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
125 use store_api::region_request::{
126 AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
127 };
128 use store_api::storage::RegionId;
129
130 use crate::metadata_region::MetadataRegion;
131 use crate::test_util::TestEnv;
132
133 #[tokio::test]
134 async fn test_sync_region_with_new_created_logical_regions() {
135 common_telemetry::init_default_ut_logging();
136 let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
137 env.init_metric_region().await;
138
139 info!("creating follower engine");
140 let (_follower_mito, follower_metric) = env.create_follower_engine().await;
142
143 let physical_region_id = env.default_physical_region_id();
144
145 let metric_engine = env.metric();
147 metric_engine
148 .handle_request(
149 env.default_physical_region_id(),
150 RegionRequest::Flush(RegionFlushRequest::default()),
151 )
152 .await
153 .unwrap();
154
155 let response = follower_metric
156 .sync_region(
157 physical_region_id,
158 RegionManifestInfo::metric(1, 0, 1, 0).into(),
159 )
160 .await
161 .unwrap();
162 assert!(response.is_metric());
163 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
164 assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
165
166 let response = follower_metric
168 .sync_region(
169 physical_region_id,
170 RegionManifestInfo::metric(1, 0, 1, 0).into(),
171 )
172 .await
173 .unwrap();
174 assert!(response.is_metric());
175 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
176 assert!(new_opened_logical_region_ids.is_empty());
177 }
178
179 fn test_alter_logical_region_request() -> RegionAlterRequest {
180 RegionAlterRequest {
181 kind: AlterKind::AddColumns {
182 columns: vec![AddColumn {
183 column_metadata: ColumnMetadata {
184 column_id: 0,
185 semantic_type: SemanticType::Tag,
186 column_schema: ColumnSchema::new(
187 "tag1",
188 ConcreteDataType::string_datatype(),
189 false,
190 ),
191 },
192 location: None,
193 }],
194 },
195 }
196 }
197
198 #[tokio::test]
199 async fn test_sync_region_alter_alter_logical_region() {
200 common_telemetry::init_default_ut_logging();
201 let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
202 env.init_metric_region().await;
203
204 info!("creating follower engine");
205 let physical_region_id = env.default_physical_region_id();
206 let metric_engine = env.metric();
208 metric_engine
209 .handle_request(
210 env.default_physical_region_id(),
211 RegionRequest::Flush(RegionFlushRequest::default()),
212 )
213 .await
214 .unwrap();
215
216 let (follower_mito, follower_metric) = env.create_follower_engine().await;
218 let metric_engine = env.metric();
219 let engine_inner = env.metric().inner;
220 let region_id = env.default_logical_region_id();
221 let request = test_alter_logical_region_request();
222
223 engine_inner
224 .alter_logical_regions(
225 physical_region_id,
226 vec![(region_id, request)],
227 &mut HashMap::new(),
228 )
229 .await
230 .unwrap();
231
232 metric_engine
234 .handle_request(
235 env.default_physical_region_id(),
236 RegionRequest::Flush(RegionFlushRequest::default()),
237 )
238 .await
239 .unwrap();
240
241 let response = follower_metric
243 .sync_region(
244 physical_region_id,
245 RegionManifestInfo::metric(2, 0, 2, 0).into(),
246 )
247 .await
248 .unwrap();
249 assert!(response.is_metric());
250 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
251 assert!(new_opened_logical_region_ids.is_empty());
252
253 let logical_region_id = env.default_logical_region_id();
254 let metadata_region = MetadataRegion::new(follower_mito.clone());
255 let semantic_type = metadata_region
256 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
257 .await
258 .unwrap()
259 .unwrap();
260 assert_eq!(semantic_type, SemanticType::Tag);
261 let timestamp_index = metadata_region
262 .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
263 .await
264 .unwrap()
265 .unwrap();
266 assert_eq!(timestamp_index, SemanticType::Timestamp);
267 }
268}