1use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38 METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39 METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40 METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48 CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49 DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50 MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57pub struct MetadataRegion {
70 pub(crate) mito: MitoEngine,
71 cache: Cache<RegionId, RegionMetadataCacheEntry>,
76 logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85 key_values: Arc<BTreeMap<String, String>>,
86 size: usize,
87}
88
89const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95 pub fn new(mito: MitoEngine) -> Self {
96 let cache = Cache::builder()
97 .max_capacity(MAX_CACHE_SIZE)
98 .eviction_policy(EvictionPolicy::lru())
101 .time_to_live(CACHE_TTL)
102 .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103 .build();
104 Self {
105 mito,
106 cache,
107 logical_region_lock: RwLock::new(HashMap::new()),
108 }
109 }
110
111 pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115 match self
116 .logical_region_lock
117 .write()
118 .await
119 .entry(logical_region_id)
120 {
121 Entry::Occupied(_) => false,
122 Entry::Vacant(vacant_entry) => {
123 vacant_entry.insert(Arc::new(RwLock::new(())));
124 true
125 }
126 }
127 }
128
129 pub async fn read_lock_logical_region(
131 &self,
132 logical_region_id: RegionId,
133 ) -> Result<OwnedRwLockReadGuard<()>> {
134 let lock = self
135 .logical_region_lock
136 .read()
137 .await
138 .get(&logical_region_id)
139 .context(LogicalRegionNotFoundSnafu {
140 region_id: logical_region_id,
141 })?
142 .clone();
143 Ok(RwLock::read_owned(lock).await)
144 }
145
146 pub async fn write_lock_logical_region(
148 &self,
149 logical_region_id: RegionId,
150 ) -> Result<OwnedRwLockWriteGuard<()>> {
151 let lock = self
152 .logical_region_lock
153 .read()
154 .await
155 .get(&logical_region_id)
156 .context(LogicalRegionNotFoundSnafu {
157 region_id: logical_region_id,
158 })?
159 .clone();
160 Ok(RwLock::write_owned(lock).await)
161 }
162
163 pub async fn remove_logical_region(
167 &self,
168 physical_region_id: RegionId,
169 logical_region_id: RegionId,
170 ) -> Result<()> {
171 let region_id = utils::to_metadata_region_id(physical_region_id);
173 let region_key = Self::concat_region_key(logical_region_id);
174
175 let logical_columns = self
177 .logical_columns(physical_region_id, logical_region_id)
178 .await?;
179 let mut column_keys = logical_columns
180 .into_iter()
181 .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182 .collect::<Vec<_>>();
183
184 column_keys.push(region_key);
186 self.delete(region_id, &column_keys).await?;
187
188 self.logical_region_lock
189 .write()
190 .await
191 .remove(&logical_region_id);
192
193 Ok(())
194 }
195
196 pub async fn logical_columns(
200 &self,
201 physical_region_id: RegionId,
202 logical_region_id: RegionId,
203 ) -> Result<Vec<(String, ColumnMetadata)>> {
204 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205 let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207 let mut columns = vec![];
208 for (k, v) in self
209 .get_all_with_prefix(metadata_region_id, ®ion_column_prefix)
210 .await?
211 {
212 if !k.starts_with(®ion_column_prefix) {
213 continue;
214 }
215 let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217 let column_metadata = Self::deserialize_column_metadata(&v)?;
218 columns.push((column_name, column_metadata));
219 }
220
221 Ok(columns)
222 }
223
224 pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228 let mut regions = vec![];
229 for k in self
230 .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231 .await?
232 {
233 if !k.starts_with(REGION_PREFIX) {
234 continue;
235 }
236 let region_id = Self::parse_region_key(&k).unwrap();
238 let region_id = region_id.parse::<u64>().unwrap().into();
239 regions.push(region_id);
240 }
241
242 Ok(regions)
243 }
244}
245
246impl MetadataRegion {
248 pub fn concat_region_key(region_id: RegionId) -> String {
249 format!("{REGION_PREFIX}{}", region_id.as_u64())
250 }
251
252 pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254 let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255 format!(
256 "{COLUMN_PREFIX}{}_{}",
257 region_id.as_u64(),
258 encoded_column_name
259 )
260 }
261
262 pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264 format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265 }
266
267 pub fn parse_region_key(key: &str) -> Option<&str> {
268 key.strip_prefix(REGION_PREFIX)
269 }
270
271 pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273 if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274 let mut iter = stripped.split('_');
275
276 let region_id_raw = iter.next().unwrap();
277 let region_id = region_id_raw
278 .parse::<u64>()
279 .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280 .into();
281
282 let encoded_column_name = iter.next().unwrap();
283 let column_name = STANDARD_NO_PAD
284 .decode(encoded_column_name)
285 .context(DecodeColumnValueSnafu)?;
286
287 Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288 } else {
289 Ok(None)
290 }
291 }
292
293 pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294 serde_json::to_string(column_metadata).unwrap()
295 }
296
297 pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298 serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299 raw: column_metadata,
300 })
301 }
302}
303
304pub fn decode_batch_stream<T: Send + 'static>(
306 mut record_batch_stream: SendableRecordBatchStream,
307 decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309 let stream = try_stream! {
310 while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311 for item in decode(batch) {
312 yield item;
313 }
314 }
315 };
316 Box::pin(stream)
317}
318
319fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321 let keys = batch.iter_column_as_string(0);
322 let values = batch.iter_column_as_string(1);
323 keys.zip(values)
324 .filter_map(|(k, v)| match (k, v) {
325 (Some(k), Some(v)) => Some((k, v)),
326 (Some(k), None) => Some((k, "".to_string())),
327 (None, _) => None,
328 })
329 .collect::<Vec<_>>()
330}
331
332fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334 batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337impl MetadataRegion {
341 fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342 let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344 let projection = if key_only {
345 vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346 } else {
347 vec![
348 METADATA_SCHEMA_KEY_COLUMN_INDEX,
349 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350 ]
351 };
352 let projection_input = Some(projection.into());
353 ScanRequest {
354 projection_input,
355 filters: vec![filter_expr],
356 ..Default::default()
357 }
358 }
359
360 fn build_read_request() -> ScanRequest {
361 let projection = vec![
362 METADATA_SCHEMA_KEY_COLUMN_INDEX,
363 METADATA_SCHEMA_VALUE_COLUMN_INDEX,
364 ];
365 let projection_input = Some(projection.into());
366 ScanRequest {
367 projection_input,
368 ..Default::default()
369 }
370 }
371
372 async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
373 let scan_req = MetadataRegion::build_read_request();
374 let record_batch_stream = self
375 .mito
376 .scan_to_stream(metadata_region_id, scan_req)
377 .await
378 .context(MitoReadOperationSnafu)?;
379
380 let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
381 .try_collect::<BTreeMap<_, _>>()
382 .await?;
383 let mut size = 0;
384 for (k, v) in kv.iter() {
385 size += k.len();
386 size += v.len();
387 }
388 let kv = Arc::new(kv);
389 Ok(RegionMetadataCacheEntry {
390 key_values: kv,
391 size,
392 })
393 }
394
395 async fn get_all_with_prefix(
396 &self,
397 metadata_region_id: RegionId,
398 prefix: &str,
399 ) -> Result<HashMap<String, String>> {
400 let region_metadata = self
401 .cache
402 .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
403 .await
404 .context(CacheGetSnafu)?;
405
406 let mut result = HashMap::new();
407 get_all_with_prefix(®ion_metadata, prefix, |k, v| {
408 result.insert(k.to_string(), v.to_string());
409 Ok(())
410 })?;
411 Ok(result)
412 }
413
414 pub async fn get_all_key_with_prefix(
415 &self,
416 region_id: RegionId,
417 prefix: &str,
418 ) -> Result<Vec<String>> {
419 let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
420 let record_batch_stream = self
421 .mito
422 .scan_to_stream(region_id, scan_req)
423 .await
424 .context(MitoReadOperationSnafu)?;
425
426 decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
427 .try_collect::<Vec<_>>()
428 .await
429 }
430
431 async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
434 let delete_request = Self::build_delete_request(keys);
435 self.mito
436 .handle_request(
437 metadata_region_id,
438 store_api::region_request::RegionRequest::Delete(delete_request),
439 )
440 .await
441 .context(MitoWriteOperationSnafu)?;
442 self.cache.invalidate(&metadata_region_id).await;
444
445 Ok(())
446 }
447
448 pub(crate) fn build_put_request_from_iter(
449 kv: impl Iterator<Item = (String, String)>,
450 ) -> RegionPutRequest {
451 let cols = vec![
452 ColumnSchema {
453 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
454 datatype: ColumnDataType::TimestampMillisecond as _,
455 semantic_type: SemanticType::Timestamp as _,
456 ..Default::default()
457 },
458 ColumnSchema {
459 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
460 datatype: ColumnDataType::String as _,
461 semantic_type: SemanticType::Tag as _,
462 ..Default::default()
463 },
464 ColumnSchema {
465 column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
466 datatype: ColumnDataType::String as _,
467 semantic_type: SemanticType::Field as _,
468 ..Default::default()
469 },
470 ];
471 let rows = Rows {
472 schema: cols,
473 rows: kv
474 .into_iter()
475 .map(|(key, value)| {
476 row(vec![
477 ValueData::TimestampMillisecondValue(0),
478 ValueData::StringValue(key),
479 ValueData::StringValue(value),
480 ])
481 })
482 .collect(),
483 };
484
485 RegionPutRequest {
486 rows,
487 hint: None,
488 partition_expr_version: None,
489 }
490 }
491
492 fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
493 let cols = vec![
494 ColumnSchema {
495 column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
496 datatype: ColumnDataType::TimestampMillisecond as _,
497 semantic_type: SemanticType::Timestamp as _,
498 ..Default::default()
499 },
500 ColumnSchema {
501 column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
502 datatype: ColumnDataType::String as _,
503 semantic_type: SemanticType::Tag as _,
504 ..Default::default()
505 },
506 ];
507 let rows = keys
508 .iter()
509 .map(|key| {
510 row(vec![
511 ValueData::TimestampMillisecondValue(0),
512 ValueData::StringValue(key.clone()),
513 ])
514 })
515 .collect();
516 let rows = Rows { schema: cols, rows };
517
518 RegionDeleteRequest {
519 rows,
520 hint: None,
521 partition_expr_version: None,
522 }
523 }
524
525 pub async fn add_logical_regions(
527 &self,
528 physical_region_id: RegionId,
529 write_region_id: bool,
530 logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
531 ) -> Result<()> {
532 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
533 let iter = logical_regions
534 .into_iter()
535 .flat_map(|(logical_region_id, column_metadatas)| {
536 if write_region_id {
537 Some((
538 MetadataRegion::concat_region_key(logical_region_id),
539 String::new(),
540 ))
541 } else {
542 None
543 }
544 .into_iter()
545 .chain(column_metadatas.into_iter().map(
546 move |(name, column_metadata)| {
547 (
548 MetadataRegion::concat_column_key(logical_region_id, name),
549 MetadataRegion::serialize_column_metadata(column_metadata),
550 )
551 },
552 ))
553 })
554 .collect::<Vec<_>>();
555
556 let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
557 self.mito
558 .handle_request(
559 metadata_region_id,
560 store_api::region_request::RegionRequest::Put(put_request),
561 )
562 .await
563 .context(MitoWriteOperationSnafu)?;
564 self.cache.invalidate(&metadata_region_id).await;
566
567 Ok(())
568 }
569
570 pub async fn transform_logical_region_metadata(
579 &self,
580 physical_region_id: RegionId,
581 source_region_id: RegionId,
582 ) -> Result<()> {
583 let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
584 let data_region_id = utils::to_data_region_id(physical_region_id);
585 let logical_regions = self
586 .logical_regions(data_region_id)
587 .await?
588 .into_iter()
589 .filter(|r| r.region_number() == source_region_id.region_number())
590 .collect::<Vec<_>>();
591 if logical_regions.is_empty() {
592 info!(
593 "No logical regions found from source region {}, physical region id: {}",
594 source_region_id, physical_region_id,
595 );
596 return Ok(());
597 }
598
599 let metadata = self.load_all(metadata_region_id).await?;
600 let mut output = Vec::new();
601 for logical_region_id in &logical_regions {
602 let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
603 get_all_with_prefix(&metadata, &prefix, |k, v| {
604 let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
606 let new_key = MetadataRegion::concat_column_key(
608 RegionId::new(
609 src_logical_region_id.table_id(),
610 data_region_id.region_number(),
611 ),
612 &column_name,
613 );
614 output.push((new_key, v.to_string()));
615 Ok(())
616 })?;
617
618 let new_key = MetadataRegion::concat_region_key(RegionId::new(
619 logical_region_id.table_id(),
620 data_region_id.region_number(),
621 ));
622 output.push((new_key, String::new()));
623 }
624
625 if output.is_empty() {
626 warn!(
627 "No logical regions metadata found from source region {}, physical region id: {}",
628 source_region_id, physical_region_id
629 );
630 return Ok(());
631 }
632
633 debug!(
634 "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
635 data_region_id,
636 source_region_id,
637 output.len(),
638 );
639
640 let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
641 self.mito
642 .handle_request(
643 metadata_region_id,
644 store_api::region_request::RegionRequest::Put(put_request),
645 )
646 .await
647 .context(MitoWriteOperationSnafu)?;
648 info!(
649 "Transformed {} logical regions metadata to physical region {}, source region: {}",
650 logical_regions.len(),
651 data_region_id,
652 source_region_id
653 );
654 self.cache.invalidate(&metadata_region_id).await;
655 Ok(())
656 }
657}
658
659fn get_all_with_prefix(
660 region_metadata: &RegionMetadataCacheEntry,
661 prefix: &str,
662 mut callback: impl FnMut(&str, &str) -> Result<()>,
663) -> Result<()> {
664 let range = region_metadata.key_values.range(prefix.to_string()..);
665 for (k, v) in range {
666 if !k.starts_with(prefix) {
667 break;
668 }
669 callback(k, v)?;
670 }
671 Ok(())
672}
673
674#[cfg(test)]
675impl MetadataRegion {
676 pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
679 use datatypes::arrow::array::{Array, AsArray};
680
681 let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
682 .eq(datafusion::prelude::lit(key));
683
684 let projection_input = Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX].into());
685 let scan_req = ScanRequest {
686 projection_input,
687 filters: vec![filter_expr],
688 ..Default::default()
689 };
690 let record_batch_stream = self
691 .mito
692 .scan_to_stream(region_id, scan_req)
693 .await
694 .context(MitoReadOperationSnafu)?;
695 let scan_result = common_recordbatch::util::collect(record_batch_stream)
696 .await
697 .context(CollectRecordBatchStreamSnafu)?;
698
699 let Some(first_batch) = scan_result.first() else {
700 return Ok(None);
701 };
702
703 let column = first_batch.column(0);
704 let column = column.as_string::<i32>();
705 let val = column.is_valid(0).then(|| column.value(0).to_string());
706
707 Ok(val)
708 }
709
710 pub async fn column_semantic_type(
712 &self,
713 physical_region_id: RegionId,
714 logical_region_id: RegionId,
715 column_name: &str,
716 ) -> Result<Option<SemanticType>> {
717 let region_id = utils::to_metadata_region_id(physical_region_id);
718 let column_key = Self::concat_column_key(logical_region_id, column_name);
719 let semantic_type = self.get(region_id, &column_key).await?;
720 semantic_type
721 .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
722 .transpose()
723 }
724}
725
726#[cfg(test)]
727mod test {
728 use datatypes::data_type::ConcreteDataType;
729 use datatypes::schema::ColumnSchema;
730
731 use super::*;
732 use crate::test_util::TestEnv;
733 use crate::utils::to_metadata_region_id;
734
735 #[test]
736 fn test_concat_table_key() {
737 let region_id = RegionId::new(1234, 7844);
738 let expected = "__region_5299989651108".to_string();
739 assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
740 }
741
742 #[test]
743 fn test_concat_column_key() {
744 let region_id = RegionId::new(8489, 9184);
745 let column_name = "my_column";
746 let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
747 assert_eq!(
748 MetadataRegion::concat_column_key(region_id, column_name),
749 expected
750 );
751 }
752
753 #[test]
754 fn test_parse_table_key() {
755 let region_id = RegionId::new(87474, 10607);
756 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
757 assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
758
759 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
760 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
761 }
762
763 #[test]
764 fn test_parse_valid_column_key() {
765 let region_id = RegionId::new(176, 910);
766 let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
767 assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
768
769 let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
770 assert_eq!(decoded, Some((region_id, "my_column".to_string())));
771 }
772
773 #[test]
774 fn test_parse_invalid_column_key() {
775 let key = "__column_asdfasd_????";
776 let result = MetadataRegion::parse_column_key(key);
777 assert!(result.is_err());
778 }
779
780 #[test]
781 fn test_serialize_column_metadata() {
782 let semantic_type = SemanticType::Tag;
783 let column_metadata = ColumnMetadata {
784 column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
785 semantic_type,
786 column_id: 5,
787 };
788 let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
789 let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
790 assert_eq!(
791 MetadataRegion::serialize_column_metadata(&column_metadata),
792 new_fmt
793 );
794 assert_eq!(
796 MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
797 column_metadata
798 );
799 assert_eq!(
800 MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
801 column_metadata
802 );
803
804 let semantic_type = "\"Invalid Column Metadata\"";
805 assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
806 }
807
808 fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
809 HashMap::from([
810 (
811 "label1".to_string(),
812 ColumnMetadata {
813 column_schema: ColumnSchema::new(
814 "label1".to_string(),
815 ConcreteDataType::string_datatype(),
816 false,
817 ),
818 semantic_type: SemanticType::Tag,
819 column_id: 5,
820 },
821 ),
822 (
823 "label2".to_string(),
824 ColumnMetadata {
825 column_schema: ColumnSchema::new(
826 "label2".to_string(),
827 ConcreteDataType::string_datatype(),
828 false,
829 ),
830 semantic_type: SemanticType::Tag,
831 column_id: 5,
832 },
833 ),
834 ])
835 }
836
837 #[tokio::test]
838 async fn add_logical_regions_to_meta_region() {
839 let env = TestEnv::new().await;
840 env.init_metric_region().await;
841 let metadata_region = env.metadata_region();
842 let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
843 let column_metadatas = test_column_metadatas();
844 let logical_region_id = RegionId::new(1024, 1);
845
846 let iter = vec![(
847 logical_region_id,
848 column_metadatas
849 .iter()
850 .map(|(k, v)| (k.as_str(), v))
851 .collect::<HashMap<_, _>>(),
852 )];
853 metadata_region
854 .add_logical_regions(physical_region_id, true, iter.into_iter())
855 .await
856 .unwrap();
857 let iter = vec![(
859 logical_region_id,
860 column_metadatas
861 .iter()
862 .map(|(k, v)| (k.as_str(), v))
863 .collect::<HashMap<_, _>>(),
864 )];
865 metadata_region
866 .add_logical_regions(physical_region_id, true, iter.into_iter())
867 .await
868 .unwrap();
869
870 let logical_regions = metadata_region
872 .logical_regions(physical_region_id)
873 .await
874 .unwrap();
875 assert_eq!(logical_regions.len(), 2);
876
877 let logical_columns = metadata_region
879 .logical_columns(physical_region_id, logical_region_id)
880 .await
881 .unwrap()
882 .into_iter()
883 .collect::<HashMap<_, _>>();
884 assert_eq!(logical_columns.len(), 2);
885 assert_eq!(column_metadatas, logical_columns);
886 }
887}