metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use datafusion::prelude::{col, lit};
29use futures_util::TryStreamExt;
30use futures_util::stream::BoxStream;
31use mito2::engine::MitoEngine;
32use moka::future::Cache;
33use moka::policy::EvictionPolicy;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::ColumnMetadata;
36use store_api::metric_engine_consts::{
37    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
38    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
39    METADATA_SCHEMA_VALUE_COLUMN_NAME,
40};
41use store_api::region_engine::RegionEngine;
42use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
43use store_api::storage::{RegionId, ScanRequest};
44use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
45
46use crate::error::{
47    CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
48    DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
49    MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
50};
51use crate::utils;
52
53const REGION_PREFIX: &str = "__region_";
54const COLUMN_PREFIX: &str = "__column_";
55
56/// The other two fields key and value will be used as a k-v storage.
57/// It contains two group of key:
58/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
59/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
60///   the value is column's semantic type. To avoid the key conflict, this column key
61///   will be encoded by base64([STANDARD_NO_PAD]).
62///
63/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
64/// will handle all the metadata related operations across physical tables. Thus
65/// every operation should be associated to a [RegionId], which is the physical
66/// table id + region sequence. This handler will transform the region group by
67/// itself.
68pub struct MetadataRegion {
69    pub(crate) mito: MitoEngine,
70    /// The cache for contents(key-value pairs) of region metadata.
71    ///
72    /// The cache should be invalidated when any new values are put into the metadata region or any
73    /// values are deleted from the metadata region.
74    cache: Cache<RegionId, RegionMetadataCacheEntry>,
75    /// Logical lock for operations that need to be serialized. Like update & read region columns.
76    ///
77    /// Region entry will be registered on creating and opening logical region, and deregistered on
78    /// removing logical region.
79    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
80}
81
82#[derive(Clone)]
83struct RegionMetadataCacheEntry {
84    key_values: Arc<BTreeMap<String, String>>,
85    size: usize,
86}
87
88/// The max size of the region metadata cache.
89const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
90/// The TTL of the region metadata cache.
91const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
92
93impl MetadataRegion {
94    pub fn new(mito: MitoEngine) -> Self {
95        let cache = Cache::builder()
96            .max_capacity(MAX_CACHE_SIZE)
97            // Use the LRU eviction policy to minimize frequent mito scans.
98            // Recently accessed items are retained longer in the cache.
99            .eviction_policy(EvictionPolicy::lru())
100            .time_to_live(CACHE_TTL)
101            .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
102            .build();
103        Self {
104            mito,
105            cache,
106            logical_region_lock: RwLock::new(HashMap::new()),
107        }
108    }
109
110    /// Open a logical region.
111    ///
112    /// Returns true if the logical region is opened for the first time.
113    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
114        match self
115            .logical_region_lock
116            .write()
117            .await
118            .entry(logical_region_id)
119        {
120            Entry::Occupied(_) => false,
121            Entry::Vacant(vacant_entry) => {
122                vacant_entry.insert(Arc::new(RwLock::new(())));
123                true
124            }
125        }
126    }
127
128    /// Retrieve a read lock guard of given logical region id.
129    pub async fn read_lock_logical_region(
130        &self,
131        logical_region_id: RegionId,
132    ) -> Result<OwnedRwLockReadGuard<()>> {
133        let lock = self
134            .logical_region_lock
135            .read()
136            .await
137            .get(&logical_region_id)
138            .context(LogicalRegionNotFoundSnafu {
139                region_id: logical_region_id,
140            })?
141            .clone();
142        Ok(RwLock::read_owned(lock).await)
143    }
144
145    /// Retrieve a write lock guard of given logical region id.
146    pub async fn write_lock_logical_region(
147        &self,
148        logical_region_id: RegionId,
149    ) -> Result<OwnedRwLockWriteGuard<()>> {
150        let lock = self
151            .logical_region_lock
152            .read()
153            .await
154            .get(&logical_region_id)
155            .context(LogicalRegionNotFoundSnafu {
156                region_id: logical_region_id,
157            })?
158            .clone();
159        Ok(RwLock::write_owned(lock).await)
160    }
161
162    /// Remove a registered logical region from metadata.
163    ///
164    /// This method doesn't check if the previous key exists.
165    pub async fn remove_logical_region(
166        &self,
167        physical_region_id: RegionId,
168        logical_region_id: RegionId,
169    ) -> Result<()> {
170        // concat region key
171        let region_id = utils::to_metadata_region_id(physical_region_id);
172        let region_key = Self::concat_region_key(logical_region_id);
173
174        // concat column keys
175        let logical_columns = self
176            .logical_columns(physical_region_id, logical_region_id)
177            .await?;
178        let mut column_keys = logical_columns
179            .into_iter()
180            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
181            .collect::<Vec<_>>();
182
183        // remove region key and column keys
184        column_keys.push(region_key);
185        self.delete(region_id, &column_keys).await?;
186
187        self.logical_region_lock
188            .write()
189            .await
190            .remove(&logical_region_id);
191
192        Ok(())
193    }
194
195    // TODO(ruihang): avoid using `get_all`
196    /// Get all the columns of a given logical region.
197    /// Return a list of (column_name, column_metadata).
198    pub async fn logical_columns(
199        &self,
200        physical_region_id: RegionId,
201        logical_region_id: RegionId,
202    ) -> Result<Vec<(String, ColumnMetadata)>> {
203        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
204        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
205
206        let mut columns = vec![];
207        for (k, v) in self
208            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
209            .await?
210        {
211            if !k.starts_with(&region_column_prefix) {
212                continue;
213            }
214            // Safety: we have checked the prefix
215            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
216            let column_metadata = Self::deserialize_column_metadata(&v)?;
217            columns.push((column_name, column_metadata));
218        }
219
220        Ok(columns)
221    }
222
223    /// Return all logical regions associated with the physical region.
224    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
225        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
226
227        let mut regions = vec![];
228        for k in self
229            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
230            .await?
231        {
232            if !k.starts_with(REGION_PREFIX) {
233                continue;
234            }
235            // Safety: we have checked the prefix
236            let region_id = Self::parse_region_key(&k).unwrap();
237            let region_id = region_id.parse::<u64>().unwrap().into();
238            regions.push(region_id);
239        }
240
241        Ok(regions)
242    }
243}
244
245// utils to concat and parse key/value
246impl MetadataRegion {
247    pub fn concat_region_key(region_id: RegionId) -> String {
248        format!("{REGION_PREFIX}{}", region_id.as_u64())
249    }
250
251    /// Column name will be encoded by base64([STANDARD_NO_PAD])
252    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
253        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
254        format!(
255            "{COLUMN_PREFIX}{}_{}",
256            region_id.as_u64(),
257            encoded_column_name
258        )
259    }
260
261    /// Concat a column key prefix without column name
262    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
263        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
264    }
265
266    pub fn parse_region_key(key: &str) -> Option<&str> {
267        key.strip_prefix(REGION_PREFIX)
268    }
269
270    /// Parse column key to (logical_region_id, column_name)
271    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
272        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
273            let mut iter = stripped.split('_');
274
275            let region_id_raw = iter.next().unwrap();
276            let region_id = region_id_raw
277                .parse::<u64>()
278                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
279                .into();
280
281            let encoded_column_name = iter.next().unwrap();
282            let column_name = STANDARD_NO_PAD
283                .decode(encoded_column_name)
284                .context(DecodeColumnValueSnafu)?;
285
286            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
287        } else {
288            Ok(None)
289        }
290    }
291
292    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
293        serde_json::to_string(column_metadata).unwrap()
294    }
295
296    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
297        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
298            raw: column_metadata,
299        })
300    }
301}
302
303/// Decode a record batch stream to a stream of items.
304pub fn decode_batch_stream<T: Send + 'static>(
305    mut record_batch_stream: SendableRecordBatchStream,
306    decode: fn(RecordBatch) -> Vec<T>,
307) -> BoxStream<'static, Result<T>> {
308    let stream = try_stream! {
309        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
310            for item in decode(batch) {
311                yield item;
312            }
313        }
314    };
315    Box::pin(stream)
316}
317
318/// Decode a record batch to a list of key and value.
319fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
320    let keys = batch.iter_column_as_string(0);
321    let values = batch.iter_column_as_string(1);
322    keys.zip(values)
323        .filter_map(|(k, v)| match (k, v) {
324            (Some(k), Some(v)) => Some((k, v)),
325            (Some(k), None) => Some((k, "".to_string())),
326            (None, _) => None,
327        })
328        .collect::<Vec<_>>()
329}
330
331/// Decode a record batch to a list of key.
332fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
333    batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
334}
335
336// simulate to `KvBackend`
337//
338// methods in this block assume the given region id is transformed.
339impl MetadataRegion {
340    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
341        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
342
343        let projection = if key_only {
344            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
345        } else {
346            vec![
347                METADATA_SCHEMA_KEY_COLUMN_INDEX,
348                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
349            ]
350        };
351        ScanRequest {
352            projection: Some(projection),
353            filters: vec![filter_expr],
354            ..Default::default()
355        }
356    }
357
358    fn build_read_request() -> ScanRequest {
359        let projection = vec![
360            METADATA_SCHEMA_KEY_COLUMN_INDEX,
361            METADATA_SCHEMA_VALUE_COLUMN_INDEX,
362        ];
363        ScanRequest {
364            projection: Some(projection),
365            ..Default::default()
366        }
367    }
368
369    async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
370        let scan_req = MetadataRegion::build_read_request();
371        let record_batch_stream = self
372            .mito
373            .scan_to_stream(metadata_region_id, scan_req)
374            .await
375            .context(MitoReadOperationSnafu)?;
376
377        let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
378            .try_collect::<BTreeMap<_, _>>()
379            .await?;
380        let mut size = 0;
381        for (k, v) in kv.iter() {
382            size += k.len();
383            size += v.len();
384        }
385        let kv = Arc::new(kv);
386        Ok(RegionMetadataCacheEntry {
387            key_values: kv,
388            size,
389        })
390    }
391
392    async fn get_all_with_prefix(
393        &self,
394        metadata_region_id: RegionId,
395        prefix: &str,
396    ) -> Result<HashMap<String, String>> {
397        let region_metadata = self
398            .cache
399            .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
400            .await
401            .context(CacheGetSnafu)?;
402
403        let range = region_metadata.key_values.range(prefix.to_string()..);
404        let mut result = HashMap::new();
405        for (k, v) in range {
406            if !k.starts_with(prefix) {
407                break;
408            }
409            result.insert(k.clone(), v.clone());
410        }
411        Ok(result)
412    }
413
414    pub async fn get_all_key_with_prefix(
415        &self,
416        region_id: RegionId,
417        prefix: &str,
418    ) -> Result<Vec<String>> {
419        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
420        let record_batch_stream = self
421            .mito
422            .scan_to_stream(region_id, scan_req)
423            .await
424            .context(MitoReadOperationSnafu)?;
425
426        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
427            .try_collect::<Vec<_>>()
428            .await
429    }
430
431    /// Delete the given keys. For performance consideration, this method
432    /// doesn't check if those keys exist or not.
433    async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
434        let delete_request = Self::build_delete_request(keys);
435        self.mito
436            .handle_request(
437                metadata_region_id,
438                store_api::region_request::RegionRequest::Delete(delete_request),
439            )
440            .await
441            .context(MitoWriteOperationSnafu)?;
442        // Invalidates the region metadata cache if any values are deleted from the metadata region.
443        self.cache.invalidate(&metadata_region_id).await;
444
445        Ok(())
446    }
447
448    pub(crate) fn build_put_request_from_iter(
449        kv: impl Iterator<Item = (String, String)>,
450    ) -> RegionPutRequest {
451        let cols = vec![
452            ColumnSchema {
453                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
454                datatype: ColumnDataType::TimestampMillisecond as _,
455                semantic_type: SemanticType::Timestamp as _,
456                ..Default::default()
457            },
458            ColumnSchema {
459                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
460                datatype: ColumnDataType::String as _,
461                semantic_type: SemanticType::Tag as _,
462                ..Default::default()
463            },
464            ColumnSchema {
465                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
466                datatype: ColumnDataType::String as _,
467                semantic_type: SemanticType::Field as _,
468                ..Default::default()
469            },
470        ];
471        let rows = Rows {
472            schema: cols,
473            rows: kv
474                .into_iter()
475                .map(|(key, value)| {
476                    row(vec![
477                        ValueData::TimestampMillisecondValue(0),
478                        ValueData::StringValue(key),
479                        ValueData::StringValue(value),
480                    ])
481                })
482                .collect(),
483        };
484
485        RegionPutRequest { rows, hint: None }
486    }
487
488    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
489        let cols = vec![
490            ColumnSchema {
491                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
492                datatype: ColumnDataType::TimestampMillisecond as _,
493                semantic_type: SemanticType::Timestamp as _,
494                ..Default::default()
495            },
496            ColumnSchema {
497                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
498                datatype: ColumnDataType::String as _,
499                semantic_type: SemanticType::Tag as _,
500                ..Default::default()
501            },
502        ];
503        let rows = keys
504            .iter()
505            .map(|key| {
506                row(vec![
507                    ValueData::TimestampMillisecondValue(0),
508                    ValueData::StringValue(key.clone()),
509                ])
510            })
511            .collect();
512        let rows = Rows { schema: cols, rows };
513
514        RegionDeleteRequest { rows, hint: None }
515    }
516
517    /// Add logical regions to the metadata region.
518    pub async fn add_logical_regions(
519        &self,
520        physical_region_id: RegionId,
521        write_region_id: bool,
522        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
523    ) -> Result<()> {
524        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
525        let iter = logical_regions
526            .into_iter()
527            .flat_map(|(logical_region_id, column_metadatas)| {
528                if write_region_id {
529                    Some((
530                        MetadataRegion::concat_region_key(logical_region_id),
531                        String::new(),
532                    ))
533                } else {
534                    None
535                }
536                .into_iter()
537                .chain(column_metadatas.into_iter().map(
538                    move |(name, column_metadata)| {
539                        (
540                            MetadataRegion::concat_column_key(logical_region_id, name),
541                            MetadataRegion::serialize_column_metadata(column_metadata),
542                        )
543                    },
544                ))
545            })
546            .collect::<Vec<_>>();
547
548        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
549        self.mito
550            .handle_request(
551                metadata_region_id,
552                store_api::region_request::RegionRequest::Put(put_request),
553            )
554            .await
555            .context(MitoWriteOperationSnafu)?;
556        // Invalidates the region metadata cache if any new values are put into the metadata region.
557        self.cache.invalidate(&metadata_region_id).await;
558
559        Ok(())
560    }
561}
562
563#[cfg(test)]
564impl MetadataRegion {
565    /// Retrieves the value associated with the given key in the specified region.
566    /// Returns `Ok(None)` if the key is not found.
567    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
568        use datatypes::arrow::array::{Array, AsArray};
569
570        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
571            .eq(datafusion::prelude::lit(key));
572
573        let scan_req = ScanRequest {
574            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
575            filters: vec![filter_expr],
576            ..Default::default()
577        };
578        let record_batch_stream = self
579            .mito
580            .scan_to_stream(region_id, scan_req)
581            .await
582            .context(MitoReadOperationSnafu)?;
583        let scan_result = common_recordbatch::util::collect(record_batch_stream)
584            .await
585            .context(CollectRecordBatchStreamSnafu)?;
586
587        let Some(first_batch) = scan_result.first() else {
588            return Ok(None);
589        };
590
591        let column = first_batch.column(0);
592        let column = column.as_string::<i32>();
593        let val = column.is_valid(0).then(|| column.value(0).to_string());
594
595        Ok(val)
596    }
597
598    /// Check if the given column exists. Return the semantic type if exists.
599    pub async fn column_semantic_type(
600        &self,
601        physical_region_id: RegionId,
602        logical_region_id: RegionId,
603        column_name: &str,
604    ) -> Result<Option<SemanticType>> {
605        let region_id = utils::to_metadata_region_id(physical_region_id);
606        let column_key = Self::concat_column_key(logical_region_id, column_name);
607        let semantic_type = self.get(region_id, &column_key).await?;
608        semantic_type
609            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
610            .transpose()
611    }
612}
613
614#[cfg(test)]
615mod test {
616    use datatypes::data_type::ConcreteDataType;
617    use datatypes::schema::ColumnSchema;
618
619    use super::*;
620    use crate::test_util::TestEnv;
621    use crate::utils::to_metadata_region_id;
622
623    #[test]
624    fn test_concat_table_key() {
625        let region_id = RegionId::new(1234, 7844);
626        let expected = "__region_5299989651108".to_string();
627        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
628    }
629
630    #[test]
631    fn test_concat_column_key() {
632        let region_id = RegionId::new(8489, 9184);
633        let column_name = "my_column";
634        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
635        assert_eq!(
636            MetadataRegion::concat_column_key(region_id, column_name),
637            expected
638        );
639    }
640
641    #[test]
642    fn test_parse_table_key() {
643        let region_id = RegionId::new(87474, 10607);
644        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
645        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
646
647        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
648        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
649    }
650
651    #[test]
652    fn test_parse_valid_column_key() {
653        let region_id = RegionId::new(176, 910);
654        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
655        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
656
657        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
658        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
659    }
660
661    #[test]
662    fn test_parse_invalid_column_key() {
663        let key = "__column_asdfasd_????";
664        let result = MetadataRegion::parse_column_key(key);
665        assert!(result.is_err());
666    }
667
668    #[test]
669    fn test_serialize_column_metadata() {
670        let semantic_type = SemanticType::Tag;
671        let column_metadata = ColumnMetadata {
672            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
673            semantic_type,
674            column_id: 5,
675        };
676        let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
677        let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
678        assert_eq!(
679            MetadataRegion::serialize_column_metadata(&column_metadata),
680            new_fmt
681        );
682        // Ensure both old and new formats can be deserialized.
683        assert_eq!(
684            MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
685            column_metadata
686        );
687        assert_eq!(
688            MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
689            column_metadata
690        );
691
692        let semantic_type = "\"Invalid Column Metadata\"";
693        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
694    }
695
696    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
697        HashMap::from([
698            (
699                "label1".to_string(),
700                ColumnMetadata {
701                    column_schema: ColumnSchema::new(
702                        "label1".to_string(),
703                        ConcreteDataType::string_datatype(),
704                        false,
705                    ),
706                    semantic_type: SemanticType::Tag,
707                    column_id: 5,
708                },
709            ),
710            (
711                "label2".to_string(),
712                ColumnMetadata {
713                    column_schema: ColumnSchema::new(
714                        "label2".to_string(),
715                        ConcreteDataType::string_datatype(),
716                        false,
717                    ),
718                    semantic_type: SemanticType::Tag,
719                    column_id: 5,
720                },
721            ),
722        ])
723    }
724
725    #[tokio::test]
726    async fn add_logical_regions_to_meta_region() {
727        let env = TestEnv::new().await;
728        env.init_metric_region().await;
729        let metadata_region = env.metadata_region();
730        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
731        let column_metadatas = test_column_metadatas();
732        let logical_region_id = RegionId::new(1024, 1);
733
734        let iter = vec![(
735            logical_region_id,
736            column_metadatas
737                .iter()
738                .map(|(k, v)| (k.as_str(), v))
739                .collect::<HashMap<_, _>>(),
740        )];
741        metadata_region
742            .add_logical_regions(physical_region_id, true, iter.into_iter())
743            .await
744            .unwrap();
745        // Add logical region again.
746        let iter = vec![(
747            logical_region_id,
748            column_metadatas
749                .iter()
750                .map(|(k, v)| (k.as_str(), v))
751                .collect::<HashMap<_, _>>(),
752        )];
753        metadata_region
754            .add_logical_regions(physical_region_id, true, iter.into_iter())
755            .await
756            .unwrap();
757
758        // Check if the logical region is added.
759        let logical_regions = metadata_region
760            .logical_regions(physical_region_id)
761            .await
762            .unwrap();
763        assert_eq!(logical_regions.len(), 2);
764
765        // Check if the logical region columns are added.
766        let logical_columns = metadata_region
767            .logical_columns(physical_region_id, logical_region_id)
768            .await
769            .unwrap()
770            .into_iter()
771            .collect::<HashMap<_, _>>();
772        assert_eq!(logical_columns.len(), 2);
773        assert_eq!(column_metadatas, logical_columns);
774    }
775}