metric_engine/engine/
sync.rs1use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{ensure, OptionExt, ResultExt};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24 MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29 pub async fn sync_region(
30 &self,
31 region_id: RegionId,
32 manifest_info: RegionManifestInfo,
33 ) -> Result<SyncManifestResponse> {
34 ensure!(
35 manifest_info.is_metric(),
36 MetricManifestInfoSnafu { region_id }
37 );
38
39 let metadata_region_id = utils::to_metadata_region_id(region_id);
40 let metadata_manifest_version = manifest_info
42 .metadata_manifest_version()
43 .unwrap_or_default();
44 let metadata_flushed_entry_id = manifest_info
45 .metadata_flushed_entry_id()
46 .unwrap_or_default();
47 let metadata_region_manifest =
48 RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
49 let metadata_synced = self
50 .mito
51 .sync_region(metadata_region_id, metadata_region_manifest)
52 .await
53 .context(MitoSyncOperationSnafu)?
54 .is_data_synced();
55
56 let data_region_id = utils::to_data_region_id(region_id);
57 let data_manifest_version = manifest_info.data_manifest_version();
58 let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
59 let data_region_manifest =
60 RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
61
62 let data_synced = self
63 .mito
64 .sync_region(data_region_id, data_region_manifest)
65 .await
66 .context(MitoSyncOperationSnafu)?
67 .is_data_synced();
68
69 if !metadata_synced {
70 return Ok(SyncManifestResponse::Metric {
71 metadata_synced,
72 data_synced,
73 new_opened_logical_region_ids: vec![],
74 });
75 }
76
77 let now = Instant::now();
78 let physical_region_options = *self
81 .state
82 .read()
83 .unwrap()
84 .physical_region_states()
85 .get(&data_region_id)
86 .context(PhysicalRegionNotFoundSnafu {
87 region_id: data_region_id,
88 })?
89 .options();
90 let new_opened_logical_region_ids = self
91 .recover_states(data_region_id, physical_region_options)
92 .await?;
93 info!(
94 "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
95 data_region_id,
96 now.elapsed(),
97 new_opened_logical_region_ids
98 );
99
100 Ok(SyncManifestResponse::Metric {
101 metadata_synced,
102 data_synced,
103 new_opened_logical_region_ids,
104 })
105 }
106}
107
108#[cfg(test)]
109mod tests {
110 use std::collections::HashMap;
111
112 use api::v1::SemanticType;
113 use common_telemetry::info;
114 use datatypes::data_type::ConcreteDataType;
115 use datatypes::schema::ColumnSchema;
116 use store_api::metadata::ColumnMetadata;
117 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
118 use store_api::region_request::{
119 AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
120 };
121 use store_api::storage::RegionId;
122
123 use crate::metadata_region::MetadataRegion;
124 use crate::test_util::TestEnv;
125
126 #[tokio::test]
127 async fn test_sync_region_with_new_created_logical_regions() {
128 common_telemetry::init_default_ut_logging();
129 let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
130 env.init_metric_region().await;
131
132 info!("creating follower engine");
133 let (_follower_mito, follower_metric) = env.create_follower_engine().await;
135
136 let physical_region_id = env.default_physical_region_id();
137
138 let metric_engine = env.metric();
140 metric_engine
141 .handle_request(
142 env.default_physical_region_id(),
143 RegionRequest::Flush(RegionFlushRequest::default()),
144 )
145 .await
146 .unwrap();
147
148 let response = follower_metric
149 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
150 .await
151 .unwrap();
152 assert!(response.is_metric());
153 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
154 assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
155
156 let response = follower_metric
158 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
159 .await
160 .unwrap();
161 assert!(response.is_metric());
162 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
163 assert!(new_opened_logical_region_ids.is_empty());
164 }
165
166 fn test_alter_logical_region_request() -> RegionAlterRequest {
167 RegionAlterRequest {
168 kind: AlterKind::AddColumns {
169 columns: vec![AddColumn {
170 column_metadata: ColumnMetadata {
171 column_id: 0,
172 semantic_type: SemanticType::Tag,
173 column_schema: ColumnSchema::new(
174 "tag1",
175 ConcreteDataType::string_datatype(),
176 false,
177 ),
178 },
179 location: None,
180 }],
181 },
182 }
183 }
184
185 #[tokio::test]
186 async fn test_sync_region_alter_alter_logical_region() {
187 common_telemetry::init_default_ut_logging();
188 let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
189 env.init_metric_region().await;
190
191 info!("creating follower engine");
192 let physical_region_id = env.default_physical_region_id();
193 let metric_engine = env.metric();
195 metric_engine
196 .handle_request(
197 env.default_physical_region_id(),
198 RegionRequest::Flush(RegionFlushRequest::default()),
199 )
200 .await
201 .unwrap();
202
203 let (follower_mito, follower_metric) = env.create_follower_engine().await;
205 let metric_engine = env.metric();
206 let engine_inner = env.metric().inner;
207 let region_id = env.default_logical_region_id();
208 let request = test_alter_logical_region_request();
209
210 engine_inner
211 .alter_logical_regions(
212 physical_region_id,
213 vec![(region_id, request)],
214 &mut HashMap::new(),
215 )
216 .await
217 .unwrap();
218
219 metric_engine
221 .handle_request(
222 env.default_physical_region_id(),
223 RegionRequest::Flush(RegionFlushRequest::default()),
224 )
225 .await
226 .unwrap();
227
228 let response = follower_metric
230 .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
231 .await
232 .unwrap();
233 assert!(response.is_metric());
234 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
235 assert!(new_opened_logical_region_ids.is_empty());
236
237 let logical_region_id = env.default_logical_region_id();
238 let metadata_region = MetadataRegion::new(follower_mito.clone());
239 let semantic_type = metadata_region
240 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
241 .await
242 .unwrap()
243 .unwrap();
244 assert_eq!(semantic_type, SemanticType::Tag);
245 let timestamp_index = metadata_region
246 .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
247 .await
248 .unwrap()
249 .unwrap();
250 assert_eq!(timestamp_index, SemanticType::Timestamp);
251 }
252}