metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::engine::general_purpose::STANDARD_NO_PAD;
25use base64::Engine;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use datafusion::prelude::{col, lit};
29use futures_util::stream::BoxStream;
30use futures_util::TryStreamExt;
31use mito2::engine::MitoEngine;
32use moka::future::Cache;
33use moka::policy::EvictionPolicy;
34use snafu::{OptionExt, ResultExt};
35use store_api::metadata::ColumnMetadata;
36use store_api::metric_engine_consts::{
37    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
38    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
39    METADATA_SCHEMA_VALUE_COLUMN_NAME,
40};
41use store_api::region_engine::RegionEngine;
42use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
43use store_api::storage::{RegionId, ScanRequest};
44use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
45
46use crate::error::{
47    CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
48    DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
49    MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
50};
51use crate::utils;
52
53const REGION_PREFIX: &str = "__region_";
54const COLUMN_PREFIX: &str = "__column_";
55
56/// The other two fields key and value will be used as a k-v storage.
57/// It contains two group of key:
58/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
59/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
60///   the value is column's semantic type. To avoid the key conflict, this column key
61///   will be encoded by base64([STANDARD_NO_PAD]).
62///
63/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
64/// will handle all the metadata related operations across physical tables. Thus
65/// every operation should be associated to a [RegionId], which is the physical
66/// table id + region sequence. This handler will transform the region group by
67/// itself.
68pub struct MetadataRegion {
69    pub(crate) mito: MitoEngine,
70    /// The cache for contents(key-value pairs) of region metadata.
71    ///
72    /// The cache should be invalidated when any new values are put into the metadata region or any
73    /// values are deleted from the metadata region.
74    cache: Cache<RegionId, RegionMetadataCacheEntry>,
75    /// Logical lock for operations that need to be serialized. Like update & read region columns.
76    ///
77    /// Region entry will be registered on creating and opening logical region, and deregistered on
78    /// removing logical region.
79    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
80}
81
82#[derive(Clone)]
83struct RegionMetadataCacheEntry {
84    key_values: Arc<BTreeMap<String, String>>,
85    size: usize,
86}
87
88/// The max size of the region metadata cache.
89const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
90/// The TTL of the region metadata cache.
91const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
92
93impl MetadataRegion {
94    pub fn new(mito: MitoEngine) -> Self {
95        let cache = Cache::builder()
96            .max_capacity(MAX_CACHE_SIZE)
97            // Use the LRU eviction policy to minimize frequent mito scans.
98            // Recently accessed items are retained longer in the cache.
99            .eviction_policy(EvictionPolicy::lru())
100            .time_to_live(CACHE_TTL)
101            .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
102            .build();
103        Self {
104            mito,
105            cache,
106            logical_region_lock: RwLock::new(HashMap::new()),
107        }
108    }
109
110    /// Open a logical region.
111    ///
112    /// Returns true if the logical region is opened for the first time.
113    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
114        match self
115            .logical_region_lock
116            .write()
117            .await
118            .entry(logical_region_id)
119        {
120            Entry::Occupied(_) => false,
121            Entry::Vacant(vacant_entry) => {
122                vacant_entry.insert(Arc::new(RwLock::new(())));
123                true
124            }
125        }
126    }
127
128    /// Retrieve a read lock guard of given logical region id.
129    pub async fn read_lock_logical_region(
130        &self,
131        logical_region_id: RegionId,
132    ) -> Result<OwnedRwLockReadGuard<()>> {
133        let lock = self
134            .logical_region_lock
135            .read()
136            .await
137            .get(&logical_region_id)
138            .context(LogicalRegionNotFoundSnafu {
139                region_id: logical_region_id,
140            })?
141            .clone();
142        Ok(RwLock::read_owned(lock).await)
143    }
144
145    /// Retrieve a write lock guard of given logical region id.
146    pub async fn write_lock_logical_region(
147        &self,
148        logical_region_id: RegionId,
149    ) -> Result<OwnedRwLockWriteGuard<()>> {
150        let lock = self
151            .logical_region_lock
152            .read()
153            .await
154            .get(&logical_region_id)
155            .context(LogicalRegionNotFoundSnafu {
156                region_id: logical_region_id,
157            })?
158            .clone();
159        Ok(RwLock::write_owned(lock).await)
160    }
161
162    /// Remove a registered logical region from metadata.
163    ///
164    /// This method doesn't check if the previous key exists.
165    pub async fn remove_logical_region(
166        &self,
167        physical_region_id: RegionId,
168        logical_region_id: RegionId,
169    ) -> Result<()> {
170        // concat region key
171        let region_id = utils::to_metadata_region_id(physical_region_id);
172        let region_key = Self::concat_region_key(logical_region_id);
173
174        // concat column keys
175        let logical_columns = self
176            .logical_columns(physical_region_id, logical_region_id)
177            .await?;
178        let mut column_keys = logical_columns
179            .into_iter()
180            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
181            .collect::<Vec<_>>();
182
183        // remove region key and column keys
184        column_keys.push(region_key);
185        self.delete(region_id, &column_keys).await?;
186
187        self.logical_region_lock
188            .write()
189            .await
190            .remove(&logical_region_id);
191
192        Ok(())
193    }
194
195    // TODO(ruihang): avoid using `get_all`
196    /// Get all the columns of a given logical region.
197    /// Return a list of (column_name, column_metadata).
198    pub async fn logical_columns(
199        &self,
200        physical_region_id: RegionId,
201        logical_region_id: RegionId,
202    ) -> Result<Vec<(String, ColumnMetadata)>> {
203        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
204        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
205
206        let mut columns = vec![];
207        for (k, v) in self
208            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
209            .await?
210        {
211            if !k.starts_with(&region_column_prefix) {
212                continue;
213            }
214            // Safety: we have checked the prefix
215            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
216            let column_metadata = Self::deserialize_column_metadata(&v)?;
217            columns.push((column_name, column_metadata));
218        }
219
220        Ok(columns)
221    }
222
223    /// Return all logical regions associated with the physical region.
224    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
225        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
226
227        let mut regions = vec![];
228        for k in self
229            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
230            .await?
231        {
232            if !k.starts_with(REGION_PREFIX) {
233                continue;
234            }
235            // Safety: we have checked the prefix
236            let region_id = Self::parse_region_key(&k).unwrap();
237            let region_id = region_id.parse::<u64>().unwrap().into();
238            regions.push(region_id);
239        }
240
241        Ok(regions)
242    }
243}
244
245// utils to concat and parse key/value
246impl MetadataRegion {
247    pub fn concat_region_key(region_id: RegionId) -> String {
248        format!("{REGION_PREFIX}{}", region_id.as_u64())
249    }
250
251    /// Column name will be encoded by base64([STANDARD_NO_PAD])
252    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
253        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
254        format!(
255            "{COLUMN_PREFIX}{}_{}",
256            region_id.as_u64(),
257            encoded_column_name
258        )
259    }
260
261    /// Concat a column key prefix without column name
262    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
263        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
264    }
265
266    pub fn parse_region_key(key: &str) -> Option<&str> {
267        key.strip_prefix(REGION_PREFIX)
268    }
269
270    /// Parse column key to (logical_region_id, column_name)
271    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
272        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
273            let mut iter = stripped.split('_');
274
275            let region_id_raw = iter.next().unwrap();
276            let region_id = region_id_raw
277                .parse::<u64>()
278                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
279                .into();
280
281            let encoded_column_name = iter.next().unwrap();
282            let column_name = STANDARD_NO_PAD
283                .decode(encoded_column_name)
284                .context(DecodeColumnValueSnafu)?;
285
286            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
287        } else {
288            Ok(None)
289        }
290    }
291
292    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
293        serde_json::to_string(column_metadata).unwrap()
294    }
295
296    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
297        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
298            raw: column_metadata,
299        })
300    }
301}
302
303/// Decode a record batch stream to a stream of items.
304pub fn decode_batch_stream<T: Send + 'static>(
305    mut record_batch_stream: SendableRecordBatchStream,
306    decode: fn(RecordBatch) -> Vec<T>,
307) -> BoxStream<'static, Result<T>> {
308    let stream = try_stream! {
309        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
310            for item in decode(batch) {
311                yield item;
312            }
313        }
314    };
315    Box::pin(stream)
316}
317
318/// Decode a record batch to a list of key and value.
319fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
320    let key_col = batch.column(0);
321    let val_col = batch.column(1);
322
323    (0..batch.num_rows())
324        .flat_map(move |row_index| {
325            let key = key_col
326                .get_ref(row_index)
327                .as_string()
328                .unwrap()
329                .map(|s| s.to_string());
330
331            key.map(|k| {
332                (
333                    k,
334                    val_col
335                        .get_ref(row_index)
336                        .as_string()
337                        .unwrap()
338                        .map(|s| s.to_string())
339                        .unwrap_or_default(),
340                )
341            })
342        })
343        .collect()
344}
345
346/// Decode a record batch to a list of key.
347fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
348    let key_col = batch.column(0);
349
350    (0..batch.num_rows())
351        .flat_map(move |row_index| {
352            let key = key_col
353                .get_ref(row_index)
354                .as_string()
355                .unwrap()
356                .map(|s| s.to_string());
357            key
358        })
359        .collect()
360}
361
362// simulate to `KvBackend`
363//
364// methods in this block assume the given region id is transformed.
365impl MetadataRegion {
366    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
367        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
368
369        let projection = if key_only {
370            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
371        } else {
372            vec![
373                METADATA_SCHEMA_KEY_COLUMN_INDEX,
374                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
375            ]
376        };
377        ScanRequest {
378            projection: Some(projection),
379            filters: vec![filter_expr],
380            ..Default::default()
381        }
382    }
383
384    fn build_read_request() -> ScanRequest {
385        let projection = vec![
386            METADATA_SCHEMA_KEY_COLUMN_INDEX,
387            METADATA_SCHEMA_VALUE_COLUMN_INDEX,
388        ];
389        ScanRequest {
390            projection: Some(projection),
391            ..Default::default()
392        }
393    }
394
395    async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
396        let scan_req = MetadataRegion::build_read_request();
397        let record_batch_stream = self
398            .mito
399            .scan_to_stream(metadata_region_id, scan_req)
400            .await
401            .context(MitoReadOperationSnafu)?;
402
403        let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
404            .try_collect::<BTreeMap<_, _>>()
405            .await?;
406        let mut size = 0;
407        for (k, v) in kv.iter() {
408            size += k.len();
409            size += v.len();
410        }
411        let kv = Arc::new(kv);
412        Ok(RegionMetadataCacheEntry {
413            key_values: kv,
414            size,
415        })
416    }
417
418    async fn get_all_with_prefix(
419        &self,
420        metadata_region_id: RegionId,
421        prefix: &str,
422    ) -> Result<HashMap<String, String>> {
423        let region_metadata = self
424            .cache
425            .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
426            .await
427            .context(CacheGetSnafu)?;
428
429        let range = region_metadata.key_values.range(prefix.to_string()..);
430        let mut result = HashMap::new();
431        for (k, v) in range {
432            if !k.starts_with(prefix) {
433                break;
434            }
435            result.insert(k.to_string(), v.to_string());
436        }
437        Ok(result)
438    }
439
440    pub async fn get_all_key_with_prefix(
441        &self,
442        region_id: RegionId,
443        prefix: &str,
444    ) -> Result<Vec<String>> {
445        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
446        let record_batch_stream = self
447            .mito
448            .scan_to_stream(region_id, scan_req)
449            .await
450            .context(MitoReadOperationSnafu)?;
451
452        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
453            .try_collect::<Vec<_>>()
454            .await
455    }
456
457    /// Delete the given keys. For performance consideration, this method
458    /// doesn't check if those keys exist or not.
459    async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
460        let delete_request = Self::build_delete_request(keys);
461        self.mito
462            .handle_request(
463                metadata_region_id,
464                store_api::region_request::RegionRequest::Delete(delete_request),
465            )
466            .await
467            .context(MitoWriteOperationSnafu)?;
468        // Invalidates the region metadata cache if any values are deleted from the metadata region.
469        self.cache.invalidate(&metadata_region_id).await;
470
471        Ok(())
472    }
473
474    pub(crate) fn build_put_request_from_iter(
475        kv: impl Iterator<Item = (String, String)>,
476    ) -> RegionPutRequest {
477        let cols = vec![
478            ColumnSchema {
479                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
480                datatype: ColumnDataType::TimestampMillisecond as _,
481                semantic_type: SemanticType::Timestamp as _,
482                ..Default::default()
483            },
484            ColumnSchema {
485                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
486                datatype: ColumnDataType::String as _,
487                semantic_type: SemanticType::Tag as _,
488                ..Default::default()
489            },
490            ColumnSchema {
491                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
492                datatype: ColumnDataType::String as _,
493                semantic_type: SemanticType::Field as _,
494                ..Default::default()
495            },
496        ];
497        let rows = Rows {
498            schema: cols,
499            rows: kv
500                .into_iter()
501                .map(|(key, value)| {
502                    row(vec![
503                        ValueData::TimestampMillisecondValue(0),
504                        ValueData::StringValue(key),
505                        ValueData::StringValue(value),
506                    ])
507                })
508                .collect(),
509        };
510
511        RegionPutRequest { rows, hint: None }
512    }
513
514    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
515        let cols = vec![
516            ColumnSchema {
517                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
518                datatype: ColumnDataType::TimestampMillisecond as _,
519                semantic_type: SemanticType::Timestamp as _,
520                ..Default::default()
521            },
522            ColumnSchema {
523                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
524                datatype: ColumnDataType::String as _,
525                semantic_type: SemanticType::Tag as _,
526                ..Default::default()
527            },
528        ];
529        let rows = keys
530            .iter()
531            .map(|key| {
532                row(vec![
533                    ValueData::TimestampMillisecondValue(0),
534                    ValueData::StringValue(key.to_string()),
535                ])
536            })
537            .collect();
538        let rows = Rows { schema: cols, rows };
539
540        RegionDeleteRequest { rows }
541    }
542
543    /// Add logical regions to the metadata region.
544    pub async fn add_logical_regions(
545        &self,
546        physical_region_id: RegionId,
547        write_region_id: bool,
548        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
549    ) -> Result<()> {
550        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
551        let iter = logical_regions
552            .into_iter()
553            .flat_map(|(logical_region_id, column_metadatas)| {
554                if write_region_id {
555                    Some((
556                        MetadataRegion::concat_region_key(logical_region_id),
557                        String::new(),
558                    ))
559                } else {
560                    None
561                }
562                .into_iter()
563                .chain(column_metadatas.into_iter().map(
564                    move |(name, column_metadata)| {
565                        (
566                            MetadataRegion::concat_column_key(logical_region_id, name),
567                            MetadataRegion::serialize_column_metadata(column_metadata),
568                        )
569                    },
570                ))
571            })
572            .collect::<Vec<_>>();
573
574        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
575        self.mito
576            .handle_request(
577                metadata_region_id,
578                store_api::region_request::RegionRequest::Put(put_request),
579            )
580            .await
581            .context(MitoWriteOperationSnafu)?;
582        // Invalidates the region metadata cache if any new values are put into the metadata region.
583        self.cache.invalidate(&metadata_region_id).await;
584
585        Ok(())
586    }
587}
588
589#[cfg(test)]
590impl MetadataRegion {
591    /// Retrieves the value associated with the given key in the specified region.
592    /// Returns `Ok(None)` if the key is not found.
593    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
594        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
595            .eq(datafusion::prelude::lit(key));
596
597        let scan_req = ScanRequest {
598            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
599            filters: vec![filter_expr],
600            ..Default::default()
601        };
602        let record_batch_stream = self
603            .mito
604            .scan_to_stream(region_id, scan_req)
605            .await
606            .context(MitoReadOperationSnafu)?;
607        let scan_result = common_recordbatch::util::collect(record_batch_stream)
608            .await
609            .context(CollectRecordBatchStreamSnafu)?;
610
611        let Some(first_batch) = scan_result.first() else {
612            return Ok(None);
613        };
614
615        let val = first_batch
616            .column(0)
617            .get_ref(0)
618            .as_string()
619            .unwrap()
620            .map(|s| s.to_string());
621
622        Ok(val)
623    }
624
625    /// Check if the given column exists. Return the semantic type if exists.
626    pub async fn column_semantic_type(
627        &self,
628        physical_region_id: RegionId,
629        logical_region_id: RegionId,
630        column_name: &str,
631    ) -> Result<Option<SemanticType>> {
632        let region_id = utils::to_metadata_region_id(physical_region_id);
633        let column_key = Self::concat_column_key(logical_region_id, column_name);
634        let semantic_type = self.get(region_id, &column_key).await?;
635        semantic_type
636            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
637            .transpose()
638    }
639}
640
641#[cfg(test)]
642mod test {
643    use datatypes::data_type::ConcreteDataType;
644    use datatypes::schema::ColumnSchema;
645
646    use super::*;
647    use crate::test_util::TestEnv;
648    use crate::utils::to_metadata_region_id;
649
650    #[test]
651    fn test_concat_table_key() {
652        let region_id = RegionId::new(1234, 7844);
653        let expected = "__region_5299989651108".to_string();
654        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
655    }
656
657    #[test]
658    fn test_concat_column_key() {
659        let region_id = RegionId::new(8489, 9184);
660        let column_name = "my_column";
661        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
662        assert_eq!(
663            MetadataRegion::concat_column_key(region_id, column_name),
664            expected
665        );
666    }
667
668    #[test]
669    fn test_parse_table_key() {
670        let region_id = RegionId::new(87474, 10607);
671        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
672        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
673
674        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
675        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
676    }
677
678    #[test]
679    fn test_parse_valid_column_key() {
680        let region_id = RegionId::new(176, 910);
681        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
682        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
683
684        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
685        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
686    }
687
688    #[test]
689    fn test_parse_invalid_column_key() {
690        let key = "__column_asdfasd_????";
691        let result = MetadataRegion::parse_column_key(key);
692        assert!(result.is_err());
693    }
694
695    #[test]
696    fn test_serialize_column_metadata() {
697        let semantic_type = SemanticType::Tag;
698        let column_metadata = ColumnMetadata {
699            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
700            semantic_type,
701            column_id: 5,
702        };
703        let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
704        assert_eq!(
705            MetadataRegion::serialize_column_metadata(&column_metadata),
706            expected
707        );
708
709        let semantic_type = "\"Invalid Column Metadata\"";
710        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
711    }
712
713    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
714        HashMap::from([
715            (
716                "label1".to_string(),
717                ColumnMetadata {
718                    column_schema: ColumnSchema::new(
719                        "label1".to_string(),
720                        ConcreteDataType::string_datatype(),
721                        false,
722                    ),
723                    semantic_type: SemanticType::Tag,
724                    column_id: 5,
725                },
726            ),
727            (
728                "label2".to_string(),
729                ColumnMetadata {
730                    column_schema: ColumnSchema::new(
731                        "label2".to_string(),
732                        ConcreteDataType::string_datatype(),
733                        false,
734                    ),
735                    semantic_type: SemanticType::Tag,
736                    column_id: 5,
737                },
738            ),
739        ])
740    }
741
742    #[tokio::test]
743    async fn add_logical_regions_to_meta_region() {
744        let env = TestEnv::new().await;
745        env.init_metric_region().await;
746        let metadata_region = env.metadata_region();
747        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
748        let column_metadatas = test_column_metadatas();
749        let logical_region_id = RegionId::new(1024, 1);
750
751        let iter = vec![(
752            logical_region_id,
753            column_metadatas
754                .iter()
755                .map(|(k, v)| (k.as_str(), v))
756                .collect::<HashMap<_, _>>(),
757        )];
758        metadata_region
759            .add_logical_regions(physical_region_id, true, iter.into_iter())
760            .await
761            .unwrap();
762        // Add logical region again.
763        let iter = vec![(
764            logical_region_id,
765            column_metadatas
766                .iter()
767                .map(|(k, v)| (k.as_str(), v))
768                .collect::<HashMap<_, _>>(),
769        )];
770        metadata_region
771            .add_logical_regions(physical_region_id, true, iter.into_iter())
772            .await
773            .unwrap();
774
775        // Check if the logical region is added.
776        let logical_regions = metadata_region
777            .logical_regions(physical_region_id)
778            .await
779            .unwrap();
780        assert_eq!(logical_regions.len(), 2);
781
782        // Check if the logical region columns are added.
783        let logical_columns = metadata_region
784            .logical_columns(physical_region_id, logical_region_id)
785            .await
786            .unwrap()
787            .into_iter()
788            .collect::<HashMap<_, _>>();
789        assert_eq!(logical_columns.len(), 2);
790        assert_eq!(column_metadatas, logical_columns);
791    }
792}