metric_engine/engine/
sync.rs1use std::time::Instant;
16
17use common_telemetry::info;
18use snafu::{ensure, OptionExt, ResultExt};
19use store_api::region_engine::{RegionEngine, RegionManifestInfo, SyncManifestResponse};
20use store_api::storage::RegionId;
21
22use crate::engine::MetricEngineInner;
23use crate::error::{
24 MetricManifestInfoSnafu, MitoSyncOperationSnafu, PhysicalRegionNotFoundSnafu, Result,
25};
26use crate::utils;
27
28impl MetricEngineInner {
29 pub async fn sync_region(
30 &self,
31 region_id: RegionId,
32 manifest_info: RegionManifestInfo,
33 ) -> Result<SyncManifestResponse> {
34 ensure!(
35 manifest_info.is_metric(),
36 MetricManifestInfoSnafu { region_id }
37 );
38
39 let metadata_region_id = utils::to_metadata_region_id(region_id);
40 let metadata_manifest_version = manifest_info
42 .metadata_manifest_version()
43 .unwrap_or_default();
44 let metadata_flushed_entry_id = manifest_info
45 .metadata_flushed_entry_id()
46 .unwrap_or_default();
47 let metadata_region_manifest =
48 RegionManifestInfo::mito(metadata_manifest_version, metadata_flushed_entry_id);
49 let metadata_synced = self
50 .mito
51 .sync_region(metadata_region_id, metadata_region_manifest)
52 .await
53 .context(MitoSyncOperationSnafu)?
54 .is_data_synced();
55
56 let data_region_id = utils::to_data_region_id(region_id);
57 let data_manifest_version = manifest_info.data_manifest_version();
58 let data_flushed_entry_id = manifest_info.data_flushed_entry_id();
59 let data_region_manifest =
60 RegionManifestInfo::mito(data_manifest_version, data_flushed_entry_id);
61
62 let data_synced = self
63 .mito
64 .sync_region(data_region_id, data_region_manifest)
65 .await
66 .context(MitoSyncOperationSnafu)?
67 .is_data_synced();
68
69 if !metadata_synced {
70 return Ok(SyncManifestResponse::Metric {
71 metadata_synced,
72 data_synced,
73 new_opened_logical_region_ids: vec![],
74 });
75 }
76
77 let now = Instant::now();
78 let physical_region_options = *self
81 .state
82 .read()
83 .unwrap()
84 .physical_region_states()
85 .get(&data_region_id)
86 .context(PhysicalRegionNotFoundSnafu {
87 region_id: data_region_id,
88 })?
89 .options();
90 let primary_key_encoding = self.mito.get_primary_key_encoding(data_region_id).context(
91 PhysicalRegionNotFoundSnafu {
92 region_id: data_region_id,
93 },
94 )?;
95 let new_opened_logical_region_ids = self
96 .recover_states(
97 data_region_id,
98 primary_key_encoding,
99 physical_region_options,
100 )
101 .await?;
102 info!(
103 "Sync metadata region for physical region {}, cost: {:?}, new opened logical region ids: {:?}",
104 data_region_id,
105 now.elapsed(),
106 new_opened_logical_region_ids
107 );
108
109 Ok(SyncManifestResponse::Metric {
110 metadata_synced,
111 data_synced,
112 new_opened_logical_region_ids,
113 })
114 }
115}
116
117#[cfg(test)]
118mod tests {
119 use std::collections::HashMap;
120
121 use api::v1::SemanticType;
122 use common_telemetry::info;
123 use datatypes::data_type::ConcreteDataType;
124 use datatypes::schema::ColumnSchema;
125 use store_api::metadata::ColumnMetadata;
126 use store_api::region_engine::{RegionEngine, RegionManifestInfo};
127 use store_api::region_request::{
128 AddColumn, AlterKind, RegionAlterRequest, RegionFlushRequest, RegionRequest,
129 };
130 use store_api::storage::RegionId;
131
132 use crate::metadata_region::MetadataRegion;
133 use crate::test_util::TestEnv;
134
135 #[tokio::test]
136 async fn test_sync_region_with_new_created_logical_regions() {
137 common_telemetry::init_default_ut_logging();
138 let mut env = TestEnv::with_prefix("sync_with_new_created_logical_regions").await;
139 env.init_metric_region().await;
140
141 info!("creating follower engine");
142 let (_follower_mito, follower_metric) = env.create_follower_engine().await;
144
145 let physical_region_id = env.default_physical_region_id();
146
147 let metric_engine = env.metric();
149 metric_engine
150 .handle_request(
151 env.default_physical_region_id(),
152 RegionRequest::Flush(RegionFlushRequest::default()),
153 )
154 .await
155 .unwrap();
156
157 let response = follower_metric
158 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
159 .await
160 .unwrap();
161 assert!(response.is_metric());
162 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
163 assert_eq!(new_opened_logical_region_ids, vec![RegionId::new(3, 2)]);
164
165 let response = follower_metric
167 .sync_region(physical_region_id, RegionManifestInfo::metric(1, 0, 1, 0))
168 .await
169 .unwrap();
170 assert!(response.is_metric());
171 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
172 assert!(new_opened_logical_region_ids.is_empty());
173 }
174
175 fn test_alter_logical_region_request() -> RegionAlterRequest {
176 RegionAlterRequest {
177 kind: AlterKind::AddColumns {
178 columns: vec![AddColumn {
179 column_metadata: ColumnMetadata {
180 column_id: 0,
181 semantic_type: SemanticType::Tag,
182 column_schema: ColumnSchema::new(
183 "tag1",
184 ConcreteDataType::string_datatype(),
185 false,
186 ),
187 },
188 location: None,
189 }],
190 },
191 }
192 }
193
194 #[tokio::test]
195 async fn test_sync_region_alter_alter_logical_region() {
196 common_telemetry::init_default_ut_logging();
197 let mut env = TestEnv::with_prefix("sync_region_alter_alter_logical_region").await;
198 env.init_metric_region().await;
199
200 info!("creating follower engine");
201 let physical_region_id = env.default_physical_region_id();
202 let metric_engine = env.metric();
204 metric_engine
205 .handle_request(
206 env.default_physical_region_id(),
207 RegionRequest::Flush(RegionFlushRequest::default()),
208 )
209 .await
210 .unwrap();
211
212 let (follower_mito, follower_metric) = env.create_follower_engine().await;
214 let metric_engine = env.metric();
215 let engine_inner = env.metric().inner;
216 let region_id = env.default_logical_region_id();
217 let request = test_alter_logical_region_request();
218
219 engine_inner
220 .alter_logical_regions(
221 physical_region_id,
222 vec![(region_id, request)],
223 &mut HashMap::new(),
224 )
225 .await
226 .unwrap();
227
228 metric_engine
230 .handle_request(
231 env.default_physical_region_id(),
232 RegionRequest::Flush(RegionFlushRequest::default()),
233 )
234 .await
235 .unwrap();
236
237 let response = follower_metric
239 .sync_region(physical_region_id, RegionManifestInfo::metric(2, 0, 2, 0))
240 .await
241 .unwrap();
242 assert!(response.is_metric());
243 let new_opened_logical_region_ids = response.new_opened_logical_region_ids().unwrap();
244 assert!(new_opened_logical_region_ids.is_empty());
245
246 let logical_region_id = env.default_logical_region_id();
247 let metadata_region = MetadataRegion::new(follower_mito.clone());
248 let semantic_type = metadata_region
249 .column_semantic_type(physical_region_id, logical_region_id, "tag1")
250 .await
251 .unwrap()
252 .unwrap();
253 assert_eq!(semantic_type, SemanticType::Tag);
254 let timestamp_index = metadata_region
255 .column_semantic_type(physical_region_id, logical_region_id, "greptime_timestamp")
256 .await
257 .unwrap()
258 .unwrap();
259 assert_eq!(timestamp_index, SemanticType::Timestamp);
260 }
261}