1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40 METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57pub struct MetadataRegion {
70 pub(crate) mito: MitoEngine,
71 cache: Cache<RegionId, RegionMetadataCacheEntry>,
76 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85 key_values: Arc<BTreeMap<String, String>>,
86 size: usize,
87}
88
89const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95 pub fn new(mito: MitoEngine) -> Self {
96 let cache = Cache::builder()
97 .max_capacity(MAX_CACHE_SIZE)
98 .eviction_policy(EvictionPolicy::lru())
101 .time_to_live(CACHE_TTL)
102 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103 .build();
104 Self {
105 mito,
106 cache,
107 logical_region_lock: RwLock::new(HashMap::new()),
108 }
109 }
110
111 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115 match self
116 .logical_region_lock
117 .write()
118 .await
119 .entry(logical_region_id)
120 {
121 Entry::Occupied(_) => false,
122 Entry::Vacant(vacant_entry) => {
123 vacant_entry.insert(Arc::new(RwLock::new(())));
124 true
125 }
126 }
127 }
128
129 pub async fn read_lock_logical_region(
131 &self,
132 logical_region_id: RegionId,
133 ) -> Result<OwnedRwLockReadGuard<()>> {
134 let lock = self
135 .logical_region_lock
136 .read()
137 .await
138 .get(&logical_region_id)
139 .context(LogicalRegionNotFoundSnafu {
140 region_id: logical_region_id,
141 })?
142 .clone();
143 Ok(RwLock::read_owned(lock).await)
144 }
145
146 pub async fn write_lock_logical_region(
148 &self,
149 logical_region_id: RegionId,
150 ) -> Result<OwnedRwLockWriteGuard<()>> {
151 let lock = self
152 .logical_region_lock
153 .read()
154 .await
155 .get(&logical_region_id)
156 .context(LogicalRegionNotFoundSnafu {
157 region_id: logical_region_id,
158 })?
159 .clone();
160 Ok(RwLock::write_owned(lock).await)
161 }
162
163 pub async fn remove_logical_region(
167 &self,
168 physical_region_id: RegionId,
169 logical_region_id: RegionId,
170 ) -> Result<()> {
171 let region_id = utils::to_metadata_region_id(physical_region_id);
173 let region_key = Self::concat_region_key(logical_region_id);
174
175 let logical_columns = self
177 .logical_columns(physical_region_id, logical_region_id)
178 .await?;
179 let mut column_keys = logical_columns
180 .into_iter()
181 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182 .collect::<Vec<_>>();
183
184 column_keys.push(region_key);
186 self.delete(region_id, &column_keys).await?;
187
188 self.logical_region_lock
189 .write()
190 .await
191 .remove(&logical_region_id);
192
193 Ok(())
194 }
195
196 pub async fn logical_columns(
200 &self,
201 physical_region_id: RegionId,
202 logical_region_id: RegionId,
203 ) -> Result<Vec<(String, ColumnMetadata)>> {
204 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207 let mut columns = vec![];
208 for (k, v) in self
209 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
210 .await?
211 {
212 if !k.starts_with(®ion_column_prefix) {
213 continue;
214 }
215 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217 let column_metadata = Self::deserialize_column_metadata(&v)?;
218 columns.push((column_name, column_metadata));
219 }
220
221 Ok(columns)
222 }
223
224 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228 let mut regions = vec![];
229 for k in self
230 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231 .await?
232 {
233 if !k.starts_with(REGION_PREFIX) {
234 continue;
235 }
236 let region_id = Self::parse_region_key(&k).unwrap();
238 let region_id = region_id.parse::<u64>().unwrap().into();
239 regions.push(region_id);
240 }
241
242 Ok(regions)
243 }
244}
245
246impl MetadataRegion {
248 pub fn concat_region_key(region_id: RegionId) -> String {
249 format!("{REGION_PREFIX}{}", region_id.as_u64())
250 }
251
252 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255 format!(
256 "{COLUMN_PREFIX}{}_{}",
257 region_id.as_u64(),
258 encoded_column_name
259 )
260 }
261
262 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265 }
266
267 pub fn parse_region_key(key: &str) -> Option<&str> {
268 key.strip_prefix(REGION_PREFIX)
269 }
270
271 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274 let mut iter = stripped.split('_');
275
276 let region_id_raw = iter.next().unwrap();
277 let region_id = region_id_raw
278 .parse::<u64>()
279 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280 .into();
281
282 let encoded_column_name = iter.next().unwrap();
283 let column_name = STANDARD_NO_PAD
284 .decode(encoded_column_name)
285 .context(DecodeColumnValueSnafu)?;
286
287 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288 } else {
289 Ok(None)
290 }
291 }
292
293 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294 serde_json::to_string(column_metadata).unwrap()
295 }
296
297 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299 raw: column_metadata,
300 })
301 }
302}
303
304pub fn decode_batch_stream<T: Send + 'static>(
306 mut record_batch_stream: SendableRecordBatchStream,
307 decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309 let stream = try_stream! {
310 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311 for item in decode(batch) {
312 yield item;
313 }
314 }
315 };
316 Box::pin(stream)
317}
318
319fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321 let keys = batch.iter_column_as_string(0);
322 let values = batch.iter_column_as_string(1);
323 keys.zip(values)
324 .filter_map(|(k, v)| match (k, v) {
325 (Some(k), Some(v)) => Some((k, v)),
326 (Some(k), None) => Some((k, "".to_string())),
327 (None, _) => None,
328 })
329 .collect::<Vec<_>>()
330}
331
332fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334 batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337impl MetadataRegion {
341 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344 let projection = if key_only {
345 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346 } else {
347 vec![
348 METADATA_SCHEMA_KEY_COLUMN_INDEX,
349 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350 ]
351 };
352 ScanRequest {
353 projection: Some(projection),
354 filters: vec![filter_expr],
355 ..Default::default()
356 }
357 }
358
359 fn build_read_request() -> ScanRequest {
360 let projection = vec![
361 METADATA_SCHEMA_KEY_COLUMN_INDEX,
362 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
363 ];
364 ScanRequest {
365 projection: Some(projection),
366 ..Default::default()
367 }
368 }
369
370 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
371 let scan_req = MetadataRegion::build_read_request();
372 let record_batch_stream = self
373 .mito
374 .scan_to_stream(metadata_region_id, scan_req)
375 .await
376 .context(MitoReadOperationSnafu)?;
377
378 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
379 .try_collect::<BTreeMap<_, _>>()
380 .await?;
381 let mut size = 0;
382 for (k, v) in kv.iter() {
383 size += k.len();
384 size += v.len();
385 }
386 let kv = Arc::new(kv);
387 Ok(RegionMetadataCacheEntry {
388 key_values: kv,
389 size,
390 })
391 }
392
393 async fn get_all_with_prefix(
394 &self,
395 metadata_region_id: RegionId,
396 prefix: &str,
397 ) -> Result<HashMap<String, String>> {
398 let region_metadata = self
399 .cache
400 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
401 .await
402 .context(CacheGetSnafu)?;
403
404 let mut result = HashMap::new();
405 get_all_with_prefix(®ion_metadata, prefix, |k, v| {
406 result.insert(k.to_string(), v.to_string());
407 Ok(())
408 })?;
409 Ok(result)
410 }
411
412 pub async fn get_all_key_with_prefix(
413 &self,
414 region_id: RegionId,
415 prefix: &str,
416 ) -> Result<Vec<String>> {
417 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
418 let record_batch_stream = self
419 .mito
420 .scan_to_stream(region_id, scan_req)
421 .await
422 .context(MitoReadOperationSnafu)?;
423
424 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
425 .try_collect::<Vec<_>>()
426 .await
427 }
428
429 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
432 let delete_request = Self::build_delete_request(keys);
433 self.mito
434 .handle_request(
435 metadata_region_id,
436 store_api::region_request::RegionRequest::Delete(delete_request),
437 )
438 .await
439 .context(MitoWriteOperationSnafu)?;
440 self.cache.invalidate(&metadata_region_id).await;
442
443 Ok(())
444 }
445
446 pub(crate) fn build_put_request_from_iter(
447 kv: impl Iterator<Item = (String, String)>,
448 ) -> RegionPutRequest {
449 let cols = vec![
450 ColumnSchema {
451 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452 datatype: ColumnDataType::TimestampMillisecond as _,
453 semantic_type: SemanticType::Timestamp as _,
454 ..Default::default()
455 },
456 ColumnSchema {
457 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458 datatype: ColumnDataType::String as _,
459 semantic_type: SemanticType::Tag as _,
460 ..Default::default()
461 },
462 ColumnSchema {
463 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
464 datatype: ColumnDataType::String as _,
465 semantic_type: SemanticType::Field as _,
466 ..Default::default()
467 },
468 ];
469 let rows = Rows {
470 schema: cols,
471 rows: kv
472 .into_iter()
473 .map(|(key, value)| {
474 row(vec![
475 ValueData::TimestampMillisecondValue(0),
476 ValueData::StringValue(key),
477 ValueData::StringValue(value),
478 ])
479 })
480 .collect(),
481 };
482
483 RegionPutRequest {
484 rows,
485 hint: None,
486 partition_expr_version: None,
487 }
488 }
489
490 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
491 let cols = vec![
492 ColumnSchema {
493 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
494 datatype: ColumnDataType::TimestampMillisecond as _,
495 semantic_type: SemanticType::Timestamp as _,
496 ..Default::default()
497 },
498 ColumnSchema {
499 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
500 datatype: ColumnDataType::String as _,
501 semantic_type: SemanticType::Tag as _,
502 ..Default::default()
503 },
504 ];
505 let rows = keys
506 .iter()
507 .map(|key| {
508 row(vec![
509 ValueData::TimestampMillisecondValue(0),
510 ValueData::StringValue(key.clone()),
511 ])
512 })
513 .collect();
514 let rows = Rows { schema: cols, rows };
515
516 RegionDeleteRequest {
517 rows,
518 hint: None,
519 partition_expr_version: None,
520 }
521 }
522
523 pub async fn add_logical_regions(
525 &self,
526 physical_region_id: RegionId,
527 write_region_id: bool,
528 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
529 ) -> Result<()> {
530 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
531 let iter = logical_regions
532 .into_iter()
533 .flat_map(|(logical_region_id, column_metadatas)| {
534 if write_region_id {
535 Some((
536 MetadataRegion::concat_region_key(logical_region_id),
537 String::new(),
538 ))
539 } else {
540 None
541 }
542 .into_iter()
543 .chain(column_metadatas.into_iter().map(
544 move |(name, column_metadata)| {
545 (
546 MetadataRegion::concat_column_key(logical_region_id, name),
547 MetadataRegion::serialize_column_metadata(column_metadata),
548 )
549 },
550 ))
551 })
552 .collect::<Vec<_>>();
553
554 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
555 self.mito
556 .handle_request(
557 metadata_region_id,
558 store_api::region_request::RegionRequest::Put(put_request),
559 )
560 .await
561 .context(MitoWriteOperationSnafu)?;
562 self.cache.invalidate(&metadata_region_id).await;
564
565 Ok(())
566 }
567
568 pub async fn transform_logical_region_metadata(
577 &self,
578 physical_region_id: RegionId,
579 source_region_id: RegionId,
580 ) -> Result<()> {
581 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
582 let data_region_id = utils::to_data_region_id(physical_region_id);
583 let logical_regions = self
584 .logical_regions(data_region_id)
585 .await?
586 .into_iter()
587 .filter(|r| r.region_number() == source_region_id.region_number())
588 .collect::<Vec<_>>();
589 if logical_regions.is_empty() {
590 info!(
591 "No logical regions found from source region {}, physical region id: {}",
592 source_region_id, physical_region_id,
593 );
594 return Ok(());
595 }
596
597 let metadata = self.load_all(metadata_region_id).await?;
598 let mut output = Vec::new();
599 for logical_region_id in &logical_regions {
600 let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
601 get_all_with_prefix(&metadata, &prefix, |k, v| {
602 let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
604 let new_key = MetadataRegion::concat_column_key(
606 RegionId::new(
607 src_logical_region_id.table_id(),
608 data_region_id.region_number(),
609 ),
610 &column_name,
611 );
612 output.push((new_key, v.to_string()));
613 Ok(())
614 })?;
615
616 let new_key = MetadataRegion::concat_region_key(RegionId::new(
617 logical_region_id.table_id(),
618 data_region_id.region_number(),
619 ));
620 output.push((new_key, String::new()));
621 }
622
623 if output.is_empty() {
624 warn!(
625 "No logical regions metadata found from source region {}, physical region id: {}",
626 source_region_id, physical_region_id
627 );
628 return Ok(());
629 }
630
631 debug!(
632 "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
633 data_region_id,
634 source_region_id,
635 output.len(),
636 );
637
638 let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
639 self.mito
640 .handle_request(
641 metadata_region_id,
642 store_api::region_request::RegionRequest::Put(put_request),
643 )
644 .await
645 .context(MitoWriteOperationSnafu)?;
646 info!(
647 "Transformed {} logical regions metadata to physical region {}, source region: {}",
648 logical_regions.len(),
649 data_region_id,
650 source_region_id
651 );
652 self.cache.invalidate(&metadata_region_id).await;
653 Ok(())
654 }
655}
656
657fn get_all_with_prefix(
658 region_metadata: &RegionMetadataCacheEntry,
659 prefix: &str,
660 mut callback: impl FnMut(&str, &str) -> Result<()>,
661) -> Result<()> {
662 let range = region_metadata.key_values.range(prefix.to_string()..);
663 for (k, v) in range {
664 if !k.starts_with(prefix) {
665 break;
666 }
667 callback(k, v)?;
668 }
669 Ok(())
670}
671
672#[cfg(test)]
673impl MetadataRegion {
674 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
677 use datatypes::arrow::array::{Array, AsArray};
678
679 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
680 .eq(datafusion::prelude::lit(key));
681
682 let scan_req = ScanRequest {
683 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
684 filters: vec![filter_expr],
685 ..Default::default()
686 };
687 let record_batch_stream = self
688 .mito
689 .scan_to_stream(region_id, scan_req)
690 .await
691 .context(MitoReadOperationSnafu)?;
692 let scan_result = common_recordbatch::util::collect(record_batch_stream)
693 .await
694 .context(CollectRecordBatchStreamSnafu)?;
695
696 let Some(first_batch) = scan_result.first() else {
697 return Ok(None);
698 };
699
700 let column = first_batch.column(0);
701 let column = column.as_string::<i32>();
702 let val = column.is_valid(0).then(|| column.value(0).to_string());
703
704 Ok(val)
705 }
706
707 pub async fn column_semantic_type(
709 &self,
710 physical_region_id: RegionId,
711 logical_region_id: RegionId,
712 column_name: &str,
713 ) -> Result<Option<SemanticType>> {
714 let region_id = utils::to_metadata_region_id(physical_region_id);
715 let column_key = Self::concat_column_key(logical_region_id, column_name);
716 let semantic_type = self.get(region_id, &column_key).await?;
717 semantic_type
718 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
719 .transpose()
720 }
721}
722
723#[cfg(test)]
724mod test {
725 use datatypes::data_type::ConcreteDataType;
726 use datatypes::schema::ColumnSchema;
727
728 use super::*;
729 use crate::test_util::TestEnv;
730 use crate::utils::to_metadata_region_id;
731
732 #[test]
733 fn test_concat_table_key() {
734 let region_id = RegionId::new(1234, 7844);
735 let expected = "__region_5299989651108".to_string();
736 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
737 }
738
739 #[test]
740 fn test_concat_column_key() {
741 let region_id = RegionId::new(8489, 9184);
742 let column_name = "my_column";
743 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
744 assert_eq!(
745 MetadataRegion::concat_column_key(region_id, column_name),
746 expected
747 );
748 }
749
750 #[test]
751 fn test_parse_table_key() {
752 let region_id = RegionId::new(87474, 10607);
753 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
754 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
755
756 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
757 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
758 }
759
760 #[test]
761 fn test_parse_valid_column_key() {
762 let region_id = RegionId::new(176, 910);
763 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
764 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
765
766 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
767 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
768 }
769
770 #[test]
771 fn test_parse_invalid_column_key() {
772 let key = "__column_asdfasd_????";
773 let result = MetadataRegion::parse_column_key(key);
774 assert!(result.is_err());
775 }
776
777 #[test]
778 fn test_serialize_column_metadata() {
779 let semantic_type = SemanticType::Tag;
780 let column_metadata = ColumnMetadata {
781 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
782 semantic_type,
783 column_id: 5,
784 };
785 let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
786 let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
787 assert_eq!(
788 MetadataRegion::serialize_column_metadata(&column_metadata),
789 new_fmt
790 );
791 assert_eq!(
793 MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
794 column_metadata
795 );
796 assert_eq!(
797 MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
798 column_metadata
799 );
800
801 let semantic_type = "\"Invalid Column Metadata\"";
802 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
803 }
804
805 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
806 HashMap::from([
807 (
808 "label1".to_string(),
809 ColumnMetadata {
810 column_schema: ColumnSchema::new(
811 "label1".to_string(),
812 ConcreteDataType::string_datatype(),
813 false,
814 ),
815 semantic_type: SemanticType::Tag,
816 column_id: 5,
817 },
818 ),
819 (
820 "label2".to_string(),
821 ColumnMetadata {
822 column_schema: ColumnSchema::new(
823 "label2".to_string(),
824 ConcreteDataType::string_datatype(),
825 false,
826 ),
827 semantic_type: SemanticType::Tag,
828 column_id: 5,
829 },
830 ),
831 ])
832 }
833
834 #[tokio::test]
835 async fn add_logical_regions_to_meta_region() {
836 let env = TestEnv::new().await;
837 env.init_metric_region().await;
838 let metadata_region = env.metadata_region();
839 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
840 let column_metadatas = test_column_metadatas();
841 let logical_region_id = RegionId::new(1024, 1);
842
843 let iter = vec![(
844 logical_region_id,
845 column_metadatas
846 .iter()
847 .map(|(k, v)| (k.as_str(), v))
848 .collect::<HashMap<_, _>>(),
849 )];
850 metadata_region
851 .add_logical_regions(physical_region_id, true, iter.into_iter())
852 .await
853 .unwrap();
854 let iter = vec![(
856 logical_region_id,
857 column_metadatas
858 .iter()
859 .map(|(k, v)| (k.as_str(), v))
860 .collect::<HashMap<_, _>>(),
861 )];
862 metadata_region
863 .add_logical_regions(physical_region_id, true, iter.into_iter())
864 .await
865 .unwrap();
866
867 let logical_regions = metadata_region
869 .logical_regions(physical_region_id)
870 .await
871 .unwrap();
872 assert_eq!(logical_regions.len(), 2);
873
874 let logical_columns = metadata_region
876 .logical_columns(physical_region_id, logical_region_id)
877 .await
878 .unwrap()
879 .into_iter()
880 .collect::<HashMap<_, _>>();
881 assert_eq!(logical_columns.len(), 2);
882 assert_eq!(column_metadatas, logical_columns);
883 }
884}