metric_engine/engine/
sync.rs1use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{OptionExt, ResultExt, ensure};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24 MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29 pub async fn sync_region(
30 &self,
31 region_id: RegionId,
32 manifest_info: RegionManifestInfo,
33 ) -> Result<SyncManifestResponse> {
34 ensure!(
35 manifest_info.is_metric(),
36 MetricManifestInfoSnafu { region_id }
37 );
38
39 let metadata_region_id = utils::to_metadata_region_id(region_id);
40 let metadata_manifest_version = manifest_info
42 .metadata_manifest_version()
43 .unwrap_or_default();
44 let metadata_flushed_entry_id = manifest_info
45 .metadata_flushed_entry_id()
46 .unwrap_or_default();
47 let metadata_region_manifest =
48 RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id, 0);
49 let metadata_synced = self
50 .mito
51 .sync_region(metadata_region_id, metadata_region_manifest)
52 .await
53 .context(MitoSyncOperationSnafu)?
54 .is_data_synced();
55
56 let data_region_id = utils::to_data_region_id(region_id);
57 let data_manifest_version = manifest_info.data_manifest_version();
58 let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
59 let data_region_manifest =
60 RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id, 0);
61
62 let data_synced = self
63 .mito
64 .sync_region(data_region_id, data_region_manifest)
65 .await
66 .context(MitoSyncOperationSnafu)?
67 .is_data_synced();
68
69 if !metadata_synced {
70 return Ok(SyncManifestResponse::Metric {
71 metadata_synced,
72 data_synced,
73 new_opened_logical_region_ids: vec![],
74 });
75 }
76
77 let now = Instant::now();
78 let physical_region_options = *self
81 .state
82 .read()
83 .unwrap()
84 .physical_region_states()
85 .get(&data_region_id)
86 .context(PhysicalRegionNotFoundSnafu {
87 region_id: data_region_id,
88 })?
89 .options();
90 let new_opened_logical_region_ids = self
91 .recover_states(data_region_id, physical_region_options)
92 .await?;
93 info!(
94 "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
95 data_region_id,
96 now.elapsed(),
97 new_opened_logical_region_ids
98 );
99
100 Ok(SyncManifestResponse::Metric {
101 metadata_synced,
102 data_synced,
103 new_opened_logical_region_ids,
104 })
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::collections::HashMap;
111
112 use api::v1::SemanticType;
113 use common_query::prelude::greptime_timestamp;
114 use common_telemetry::info;
115 use datatypes::data_type::ConcreteDataType;
116 use datatypes::schema::ColumnSchema;
117 use store_api::metadata::ColumnMetadata;
118 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
119 use store_api::region_request::{
120 AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
121 };
122 use store_api::storage::RegionId;
123
124 use crate::metadata_region::MetadataRegion;
125 use crate::test_util::TestEnv;
126
127 #[tokio::test]
128 async fn test_sync_region_with_new_created_logical_regions() {
129 common_telemetry::init_default_ut_logging();
130 let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
131 env.init_metric_region().await;
132
133 info!("creating follower engine");
134 let (_follower_mito, follower_metric) = env.create_follower_engine().await;
136
137 let physical_region_id = env.default_physical_region_id();
138
139 let metric_engine = env.metric();
141 metric_engine
142 .handle_request(
143 env.default_physical_region_id(),
144 RegionRequest::Flush(RegionFlushRequest::default()),
145 )
146 .await
147 .unwrap();
148
149 let response = follower_metric
150 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
151 .await
152 .unwrap();
153 assert!(response.is_metric());
154 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
155 assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
156
157 let response = follower_metric
159 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
160 .await
161 .unwrap();
162 assert!(response.is_metric());
163 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
164 assert!(new_opened_logical_region_ids.is_empty());
165 }
166
167 fn test_alter_logical_region_request() -> RegionAlterRequest {
168 RegionAlterRequest {
169 kind: AlterKind::AddColumns {
170 columns: vec![AddColumn {
171 column_metadata: ColumnMetadata {
172 column_id: 0,
173 semantic_type: SemanticType::Tag,
174 column_schema: ColumnSchema::new(
175 "tag1",
176 ConcreteDataType::string_datatype(),
177 false,
178 ),
179 },
180 location: None,
181 }],
182 },
183 }
184 }
185
186 #[tokio::test]
187 async fn test_sync_region_alter_alter_logical_region() {
188 common_telemetry::init_default_ut_logging();
189 let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
190 env.init_metric_region().await;
191
192 info!("creating follower engine");
193 let physical_region_id = env.default_physical_region_id();
194 let metric_engine = env.metric();
196 metric_engine
197 .handle_request(
198 env.default_physical_region_id(),
199 RegionRequest::Flush(RegionFlushRequest::default()),
200 )
201 .await
202 .unwrap();
203
204 let (follower_mito, follower_metric) = env.create_follower_engine().await;
206 let metric_engine = env.metric();
207 let engine_inner = env.metric().inner;
208 let region_id = env.default_logical_region_id();
209 let request = test_alter_logical_region_request();
210
211 engine_inner
212 .alter_logical_regions(
213 physical_region_id,
214 vec![(region_id, request)],
215 &mut HashMap::new(),
216 )
217 .await
218 .unwrap();
219
220 metric_engine
222 .handle_request(
223 env.default_physical_region_id(),
224 RegionRequest::Flush(RegionFlushRequest::default()),
225 )
226 .await
227 .unwrap();
228
229 let response = follower_metric
231 .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
232 .await
233 .unwrap();
234 assert!(response.is_metric());
235 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
236 assert!(new_opened_logical_region_ids.is_empty());
237
238 let logical_region_id = env.default_logical_region_id();
239 let metadata_region = MetadataRegion::new(follower_mito.clone());
240 let semantic_type = metadata_region
241 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
242 .await
243 .unwrap()
244 .unwrap();
245 assert_eq!(semantic_type, SemanticType::Tag);
246 let timestamp_index = metadata_region
247 .column_semantic_type(physical_region_id, logical_region_id, greptime_timestamp())
248 .await
249 .unwrap()
250 .unwrap();
251 assert_eq!(timestamp_index, SemanticType::Timestamp);
252 }
253}