1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40 METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57pub struct MetadataRegion {
70 pub(crate) mito: MitoEngine,
71 cache: Cache<RegionId, RegionMetadataCacheEntry>,
76 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85 key_values: Arc<BTreeMap<String, String>>,
86 size: usize,
87}
88
89const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95 pub fn new(mito: MitoEngine) -> Self {
96 let cache = Cache::builder()
97 .max_capacity(MAX_CACHE_SIZE)
98 .eviction_policy(EvictionPolicy::lru())
101 .time_to_live(CACHE_TTL)
102 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103 .build();
104 Self {
105 mito,
106 cache,
107 logical_region_lock: RwLock::new(HashMap::new()),
108 }
109 }
110
111 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115 match self
116 .logical_region_lock
117 .write()
118 .await
119 .entry(logical_region_id)
120 {
121 Entry::Occupied(_) => false,
122 Entry::Vacant(vacant_entry) => {
123 vacant_entry.insert(Arc::new(RwLock::new(())));
124 true
125 }
126 }
127 }
128
129 pub async fn read_lock_logical_region(
131 &self,
132 logical_region_id: RegionId,
133 ) -> Result<OwnedRwLockReadGuard<()>> {
134 let lock = self
135 .logical_region_lock
136 .read()
137 .await
138 .get(&logical_region_id)
139 .context(LogicalRegionNotFoundSnafu {
140 region_id: logical_region_id,
141 })?
142 .clone();
143 Ok(RwLock::read_owned(lock).await)
144 }
145
146 pub async fn write_lock_logical_region(
148 &self,
149 logical_region_id: RegionId,
150 ) -> Result<OwnedRwLockWriteGuard<()>> {
151 let lock = self
152 .logical_region_lock
153 .read()
154 .await
155 .get(&logical_region_id)
156 .context(LogicalRegionNotFoundSnafu {
157 region_id: logical_region_id,
158 })?
159 .clone();
160 Ok(RwLock::write_owned(lock).await)
161 }
162
163 pub async fn remove_logical_region(
167 &self,
168 physical_region_id: RegionId,
169 logical_region_id: RegionId,
170 ) -> Result<()> {
171 let region_id = utils::to_metadata_region_id(physical_region_id);
173 let region_key = Self::concat_region_key(logical_region_id);
174
175 let logical_columns = self
177 .logical_columns(physical_region_id, logical_region_id)
178 .await?;
179 let mut column_keys = logical_columns
180 .into_iter()
181 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182 .collect::<Vec<_>>();
183
184 column_keys.push(region_key);
186 self.delete(region_id, &column_keys).await?;
187
188 self.logical_region_lock
189 .write()
190 .await
191 .remove(&logical_region_id);
192
193 Ok(())
194 }
195
196 pub async fn logical_columns(
200 &self,
201 physical_region_id: RegionId,
202 logical_region_id: RegionId,
203 ) -> Result<Vec<(String, ColumnMetadata)>> {
204 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207 let mut columns = vec![];
208 for (k, v) in self
209 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
210 .await?
211 {
212 if !k.starts_with(®ion_column_prefix) {
213 continue;
214 }
215 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217 let column_metadata = Self::deserialize_column_metadata(&v)?;
218 columns.push((column_name, column_metadata));
219 }
220
221 Ok(columns)
222 }
223
224 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228 let mut regions = vec![];
229 for k in self
230 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231 .await?
232 {
233 if !k.starts_with(REGION_PREFIX) {
234 continue;
235 }
236 let region_id = Self::parse_region_key(&k).unwrap();
238 let region_id = region_id.parse::<u64>().unwrap().into();
239 regions.push(region_id);
240 }
241
242 Ok(regions)
243 }
244}
245
246impl MetadataRegion {
248 pub fn concat_region_key(region_id: RegionId) -> String {
249 format!("{REGION_PREFIX}{}", region_id.as_u64())
250 }
251
252 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255 format!(
256 "{COLUMN_PREFIX}{}_{}",
257 region_id.as_u64(),
258 encoded_column_name
259 )
260 }
261
262 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265 }
266
267 pub fn parse_region_key(key: &str) -> Option<&str> {
268 key.strip_prefix(REGION_PREFIX)
269 }
270
271 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274 let mut iter = stripped.split('_');
275
276 let region_id_raw = iter.next().unwrap();
277 let region_id = region_id_raw
278 .parse::<u64>()
279 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280 .into();
281
282 let encoded_column_name = iter.next().unwrap();
283 let column_name = STANDARD_NO_PAD
284 .decode(encoded_column_name)
285 .context(DecodeColumnValueSnafu)?;
286
287 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288 } else {
289 Ok(None)
290 }
291 }
292
293 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294 serde_json::to_string(column_metadata).unwrap()
295 }
296
297 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299 raw: column_metadata,
300 })
301 }
302}
303
304pub fn decode_batch_stream<T: Send + 'static>(
306 mut record_batch_stream: SendableRecordBatchStream,
307 decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309 let stream = try_stream! {
310 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311 for item in decode(batch) {
312 yield item;
313 }
314 }
315 };
316 Box::pin(stream)
317}
318
319fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321 let keys = batch.iter_column_as_string(0);
322 let values = batch.iter_column_as_string(1);
323 keys.zip(values)
324 .filter_map(|(k, v)| match (k, v) {
325 (Some(k), Some(v)) => Some((k, v)),
326 (Some(k), None) => Some((k, "".to_string())),
327 (None, _) => None,
328 })
329 .collect::<Vec<_>>()
330}
331
332fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334 batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337impl MetadataRegion {
341 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344 let projection = if key_only {
345 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346 } else {
347 vec![
348 METADATA_SCHEMA_KEY_COLUMN_INDEX,
349 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350 ]
351 };
352 ScanRequest {
353 projection: Some(projection),
354 filters: vec![filter_expr],
355 ..Default::default()
356 }
357 }
358
359 fn build_read_request() -> ScanRequest {
360 let projection = vec![
361 METADATA_SCHEMA_KEY_COLUMN_INDEX,
362 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
363 ];
364 ScanRequest {
365 projection: Some(projection),
366 ..Default::default()
367 }
368 }
369
370 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
371 let scan_req = MetadataRegion::build_read_request();
372 let record_batch_stream = self
373 .mito
374 .scan_to_stream(metadata_region_id, scan_req)
375 .await
376 .context(MitoReadOperationSnafu)?;
377
378 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
379 .try_collect::<BTreeMap<_, _>>()
380 .await?;
381 let mut size = 0;
382 for (k, v) in kv.iter() {
383 size += k.len();
384 size += v.len();
385 }
386 let kv = Arc::new(kv);
387 Ok(RegionMetadataCacheEntry {
388 key_values: kv,
389 size,
390 })
391 }
392
393 async fn get_all_with_prefix(
394 &self,
395 metadata_region_id: RegionId,
396 prefix: &str,
397 ) -> Result<HashMap<String, String>> {
398 let region_metadata = self
399 .cache
400 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
401 .await
402 .context(CacheGetSnafu)?;
403
404 let mut result = HashMap::new();
405 get_all_with_prefix(®ion_metadata, prefix, |k, v| {
406 result.insert(k.to_string(), v.to_string());
407 Ok(())
408 })?;
409 Ok(result)
410 }
411
412 pub async fn get_all_key_with_prefix(
413 &self,
414 region_id: RegionId,
415 prefix: &str,
416 ) -> Result<Vec<String>> {
417 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
418 let record_batch_stream = self
419 .mito
420 .scan_to_stream(region_id, scan_req)
421 .await
422 .context(MitoReadOperationSnafu)?;
423
424 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
425 .try_collect::<Vec<_>>()
426 .await
427 }
428
429 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
432 let delete_request = Self::build_delete_request(keys);
433 self.mito
434 .handle_request(
435 metadata_region_id,
436 store_api::region_request::RegionRequest::Delete(delete_request),
437 )
438 .await
439 .context(MitoWriteOperationSnafu)?;
440 self.cache.invalidate(&metadata_region_id).await;
442
443 Ok(())
444 }
445
446 pub(crate) fn build_put_request_from_iter(
447 kv: impl Iterator<Item = (String, String)>,
448 ) -> RegionPutRequest {
449 let cols = vec![
450 ColumnSchema {
451 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452 datatype: ColumnDataType::TimestampMillisecond as _,
453 semantic_type: SemanticType::Timestamp as _,
454 ..Default::default()
455 },
456 ColumnSchema {
457 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458 datatype: ColumnDataType::String as _,
459 semantic_type: SemanticType::Tag as _,
460 ..Default::default()
461 },
462 ColumnSchema {
463 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
464 datatype: ColumnDataType::String as _,
465 semantic_type: SemanticType::Field as _,
466 ..Default::default()
467 },
468 ];
469 let rows = Rows {
470 schema: cols,
471 rows: kv
472 .into_iter()
473 .map(|(key, value)| {
474 row(vec![
475 ValueData::TimestampMillisecondValue(0),
476 ValueData::StringValue(key),
477 ValueData::StringValue(value),
478 ])
479 })
480 .collect(),
481 };
482
483 RegionPutRequest { rows, hint: None }
484 }
485
486 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
487 let cols = vec![
488 ColumnSchema {
489 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
490 datatype: ColumnDataType::TimestampMillisecond as _,
491 semantic_type: SemanticType::Timestamp as _,
492 ..Default::default()
493 },
494 ColumnSchema {
495 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
496 datatype: ColumnDataType::String as _,
497 semantic_type: SemanticType::Tag as _,
498 ..Default::default()
499 },
500 ];
501 let rows = keys
502 .iter()
503 .map(|key| {
504 row(vec![
505 ValueData::TimestampMillisecondValue(0),
506 ValueData::StringValue(key.clone()),
507 ])
508 })
509 .collect();
510 let rows = Rows { schema: cols, rows };
511
512 RegionDeleteRequest { rows, hint: None }
513 }
514
515 pub async fn add_logical_regions(
517 &self,
518 physical_region_id: RegionId,
519 write_region_id: bool,
520 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
521 ) -> Result<()> {
522 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
523 let iter = logical_regions
524 .into_iter()
525 .flat_map(|(logical_region_id, column_metadatas)| {
526 if write_region_id {
527 Some((
528 MetadataRegion::concat_region_key(logical_region_id),
529 String::new(),
530 ))
531 } else {
532 None
533 }
534 .into_iter()
535 .chain(column_metadatas.into_iter().map(
536 move |(name, column_metadata)| {
537 (
538 MetadataRegion::concat_column_key(logical_region_id, name),
539 MetadataRegion::serialize_column_metadata(column_metadata),
540 )
541 },
542 ))
543 })
544 .collect::<Vec<_>>();
545
546 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
547 self.mito
548 .handle_request(
549 metadata_region_id,
550 store_api::region_request::RegionRequest::Put(put_request),
551 )
552 .await
553 .context(MitoWriteOperationSnafu)?;
554 self.cache.invalidate(&metadata_region_id).await;
556
557 Ok(())
558 }
559
560 pub async fn transform_logical_region_metadata(
569 &self,
570 physical_region_id: RegionId,
571 source_region_id: RegionId,
572 ) -> Result<()> {
573 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
574 let data_region_id = utils::to_data_region_id(physical_region_id);
575 let logical_regions = self
576 .logical_regions(data_region_id)
577 .await?
578 .into_iter()
579 .filter(|r| r.region_number() == source_region_id.region_number())
580 .collect::<Vec<_>>();
581 if logical_regions.is_empty() {
582 info!(
583 "No logical regions found from source region {}, physical region id: {}",
584 source_region_id, physical_region_id,
585 );
586 return Ok(());
587 }
588
589 let metadata = self.load_all(metadata_region_id).await?;
590 let mut output = Vec::new();
591 for logical_region_id in &logical_regions {
592 let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
593 get_all_with_prefix(&metadata, &prefix, |k, v| {
594 let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
596 let new_key = MetadataRegion::concat_column_key(
598 RegionId::new(
599 src_logical_region_id.table_id(),
600 data_region_id.region_number(),
601 ),
602 &column_name,
603 );
604 output.push((new_key, v.to_string()));
605 Ok(())
606 })?;
607
608 let new_key = MetadataRegion::concat_region_key(RegionId::new(
609 logical_region_id.table_id(),
610 data_region_id.region_number(),
611 ));
612 output.push((new_key, String::new()));
613 }
614
615 if output.is_empty() {
616 warn!(
617 "No logical regions metadata found from source region {}, physical region id: {}",
618 source_region_id, physical_region_id
619 );
620 return Ok(());
621 }
622
623 debug!(
624 "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
625 data_region_id,
626 source_region_id,
627 output.len(),
628 );
629
630 let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
631 self.mito
632 .handle_request(
633 metadata_region_id,
634 store_api::region_request::RegionRequest::Put(put_request),
635 )
636 .await
637 .context(MitoWriteOperationSnafu)?;
638 info!(
639 "Transformed {} logical regions metadata to physical region {}, source region: {}",
640 logical_regions.len(),
641 data_region_id,
642 source_region_id
643 );
644 self.cache.invalidate(&metadata_region_id).await;
645 Ok(())
646 }
647}
648
649fn get_all_with_prefix(
650 region_metadata: &RegionMetadataCacheEntry,
651 prefix: &str,
652 mut callback: impl FnMut(&str, &str) -> Result<()>,
653) -> Result<()> {
654 let range = region_metadata.key_values.range(prefix.to_string()..);
655 for (k, v) in range {
656 if !k.starts_with(prefix) {
657 break;
658 }
659 callback(k, v)?;
660 }
661 Ok(())
662}
663
664#[cfg(test)]
665impl MetadataRegion {
666 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
669 use datatypes::arrow::array::{Array, AsArray};
670
671 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
672 .eq(datafusion::prelude::lit(key));
673
674 let scan_req = ScanRequest {
675 projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
676 filters: vec![filter_expr],
677 ..Default::default()
678 };
679 let record_batch_stream = self
680 .mito
681 .scan_to_stream(region_id, scan_req)
682 .await
683 .context(MitoReadOperationSnafu)?;
684 let scan_result = common_recordbatch::util::collect(record_batch_stream)
685 .await
686 .context(CollectRecordBatchStreamSnafu)?;
687
688 let Some(first_batch) = scan_result.first() else {
689 return Ok(None);
690 };
691
692 let column = first_batch.column(0);
693 let column = column.as_string::<i32>();
694 let val = column.is_valid(0).then(|| column.value(0).to_string());
695
696 Ok(val)
697 }
698
699 pub async fn column_semantic_type(
701 &self,
702 physical_region_id: RegionId,
703 logical_region_id: RegionId,
704 column_name: &str,
705 ) -> Result<Option<SemanticType>> {
706 let region_id = utils::to_metadata_region_id(physical_region_id);
707 let column_key = Self::concat_column_key(logical_region_id, column_name);
708 let semantic_type = self.get(region_id, &column_key).await?;
709 semantic_type
710 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
711 .transpose()
712 }
713}
714
715#[cfg(test)]
716mod test {
717 use datatypes::data_type::ConcreteDataType;
718 use datatypes::schema::ColumnSchema;
719
720 use super::*;
721 use crate::test_util::TestEnv;
722 use crate::utils::to_metadata_region_id;
723
724 #[test]
725 fn test_concat_table_key() {
726 let region_id = RegionId::new(1234, 7844);
727 let expected = "__region_5299989651108".to_string();
728 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
729 }
730
731 #[test]
732 fn test_concat_column_key() {
733 let region_id = RegionId::new(8489, 9184);
734 let column_name = "my_column";
735 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
736 assert_eq!(
737 MetadataRegion::concat_column_key(region_id, column_name),
738 expected
739 );
740 }
741
742 #[test]
743 fn test_parse_table_key() {
744 let region_id = RegionId::new(87474, 10607);
745 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
746 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
747
748 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
749 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
750 }
751
752 #[test]
753 fn test_parse_valid_column_key() {
754 let region_id = RegionId::new(176, 910);
755 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
756 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
757
758 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
759 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
760 }
761
762 #[test]
763 fn test_parse_invalid_column_key() {
764 let key = "__column_asdfasd_????";
765 let result = MetadataRegion::parse_column_key(key);
766 assert!(result.is_err());
767 }
768
769 #[test]
770 fn test_serialize_column_metadata() {
771 let semantic_type = SemanticType::Tag;
772 let column_metadata = ColumnMetadata {
773 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
774 semantic_type,
775 column_id: 5,
776 };
777 let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
778 let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
779 assert_eq!(
780 MetadataRegion::serialize_column_metadata(&column_metadata),
781 new_fmt
782 );
783 assert_eq!(
785 MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
786 column_metadata
787 );
788 assert_eq!(
789 MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
790 column_metadata
791 );
792
793 let semantic_type = "\"Invalid Column Metadata\"";
794 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
795 }
796
797 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
798 HashMap::from([
799 (
800 "label1".to_string(),
801 ColumnMetadata {
802 column_schema: ColumnSchema::new(
803 "label1".to_string(),
804 ConcreteDataType::string_datatype(),
805 false,
806 ),
807 semantic_type: SemanticType::Tag,
808 column_id: 5,
809 },
810 ),
811 (
812 "label2".to_string(),
813 ColumnMetadata {
814 column_schema: ColumnSchema::new(
815 "label2".to_string(),
816 ConcreteDataType::string_datatype(),
817 false,
818 ),
819 semantic_type: SemanticType::Tag,
820 column_id: 5,
821 },
822 ),
823 ])
824 }
825
826 #[tokio::test]
827 async fn add_logical_regions_to_meta_region() {
828 let env = TestEnv::new().await;
829 env.init_metric_region().await;
830 let metadata_region = env.metadata_region();
831 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
832 let column_metadatas = test_column_metadatas();
833 let logical_region_id = RegionId::new(1024, 1);
834
835 let iter = vec![(
836 logical_region_id,
837 column_metadatas
838 .iter()
839 .map(|(k, v)| (k.as_str(), v))
840 .collect::<HashMap<_, _>>(),
841 )];
842 metadata_region
843 .add_logical_regions(physical_region_id, true, iter.into_iter())
844 .await
845 .unwrap();
846 let iter = vec![(
848 logical_region_id,
849 column_metadatas
850 .iter()
851 .map(|(k, v)| (k.as_str(), v))
852 .collect::<HashMap<_, _>>(),
853 )];
854 metadata_region
855 .add_logical_regions(physical_region_id, true, iter.into_iter())
856 .await
857 .unwrap();
858
859 let logical_regions = metadata_region
861 .logical_regions(physical_region_id)
862 .await
863 .unwrap();
864 assert_eq!(logical_regions.len(), 2);
865
866 let logical_columns = metadata_region
868 .logical_columns(physical_region_id, logical_region_id)
869 .await
870 .unwrap()
871 .into_iter()
872 .collect::<HashMap<_, _>>();
873 assert_eq!(logical_columns.len(), 2);
874 assert_eq!(column_metadatas, logical_columns);
875 }
876}