metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40    METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48    CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49    DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50    MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57/// The other two fields key and value will be used as a k-v storage.
58/// It contains two group of key:
59/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
60/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
61///   the value is column's semantic type. To avoid the key conflict, this column key
62///   will be encoded by base64([STANDARD_NO_PAD]).
63///
64/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
65/// will handle all the metadata related operations across physical tables. Thus
66/// every operation should be associated to a [RegionId], which is the physical
67/// table id + region sequence. This handler will transform the region group by
68/// itself.
69pub struct MetadataRegion {
70    pub(crate) mito: MitoEngine,
71    /// The cache for contents(key-value pairs) of region metadata.
72    ///
73    /// The cache should be invalidated when any new values are put into the metadata region or any
74    /// values are deleted from the metadata region.
75    cache: Cache<RegionId, RegionMetadataCacheEntry>,
76    /// Logical lock for operations that need to be serialized. Like update & read region columns.
77    ///
78    /// Region entry will be registered on creating and opening logical region, and deregistered on
79    /// removing logical region.
80    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85    key_values: Arc<BTreeMap<String, String>>,
86    size: usize,
87}
88
89/// The max size of the region metadata cache.
90const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91/// The TTL of the region metadata cache.
92const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95    pub fn new(mito: MitoEngine) -> Self {
96        let cache = Cache::builder()
97            .max_capacity(MAX_CACHE_SIZE)
98            // Use the LRU eviction policy to minimize frequent mito scans.
99            // Recently accessed items are retained longer in the cache.
100            .eviction_policy(EvictionPolicy::lru())
101            .time_to_live(CACHE_TTL)
102            .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103            .build();
104        Self {
105            mito,
106            cache,
107            logical_region_lock: RwLock::new(HashMap::new()),
108        }
109    }
110
111    /// Open a logical region.
112    ///
113    /// Returns true if the logical region is opened for the first time.
114    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115        match self
116            .logical_region_lock
117            .write()
118            .await
119            .entry(logical_region_id)
120        {
121            Entry::Occupied(_) => false,
122            Entry::Vacant(vacant_entry) => {
123                vacant_entry.insert(Arc::new(RwLock::new(())));
124                true
125            }
126        }
127    }
128
129    /// Retrieve a read lock guard of given logical region id.
130    pub async fn read_lock_logical_region(
131        &self,
132        logical_region_id: RegionId,
133    ) -> Result<OwnedRwLockReadGuard<()>> {
134        let lock = self
135            .logical_region_lock
136            .read()
137            .await
138            .get(&logical_region_id)
139            .context(LogicalRegionNotFoundSnafu {
140                region_id: logical_region_id,
141            })?
142            .clone();
143        Ok(RwLock::read_owned(lock).await)
144    }
145
146    /// Retrieve a write lock guard of given logical region id.
147    pub async fn write_lock_logical_region(
148        &self,
149        logical_region_id: RegionId,
150    ) -> Result<OwnedRwLockWriteGuard<()>> {
151        let lock = self
152            .logical_region_lock
153            .read()
154            .await
155            .get(&logical_region_id)
156            .context(LogicalRegionNotFoundSnafu {
157                region_id: logical_region_id,
158            })?
159            .clone();
160        Ok(RwLock::write_owned(lock).await)
161    }
162
163    /// Remove a registered logical region from metadata.
164    ///
165    /// This method doesn't check if the previous key exists.
166    pub async fn remove_logical_region(
167        &self,
168        physical_region_id: RegionId,
169        logical_region_id: RegionId,
170    ) -> Result<()> {
171        // concat region key
172        let region_id = utils::to_metadata_region_id(physical_region_id);
173        let region_key = Self::concat_region_key(logical_region_id);
174
175        // concat column keys
176        let logical_columns = self
177            .logical_columns(physical_region_id, logical_region_id)
178            .await?;
179        let mut column_keys = logical_columns
180            .into_iter()
181            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182            .collect::<Vec<_>>();
183
184        // remove region key and column keys
185        column_keys.push(region_key);
186        self.delete(region_id, &column_keys).await?;
187
188        self.logical_region_lock
189            .write()
190            .await
191            .remove(&logical_region_id);
192
193        Ok(())
194    }
195
196    // TODO(ruihang): avoid using `get_all`
197    /// Get all the columns of a given logical region.
198    /// Return a list of (column_name, column_metadata).
199    pub async fn logical_columns(
200        &self,
201        physical_region_id: RegionId,
202        logical_region_id: RegionId,
203    ) -> Result<Vec<(String, ColumnMetadata)>> {
204        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207        let mut columns = vec![];
208        for (k, v) in self
209            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
210            .await?
211        {
212            if !k.starts_with(&region_column_prefix) {
213                continue;
214            }
215            // Safety: we have checked the prefix
216            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217            let column_metadata = Self::deserialize_column_metadata(&v)?;
218            columns.push((column_name, column_metadata));
219        }
220
221        Ok(columns)
222    }
223
224    /// Return all logical regions associated with the physical region.
225    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228        let mut regions = vec![];
229        for k in self
230            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231            .await?
232        {
233            if !k.starts_with(REGION_PREFIX) {
234                continue;
235            }
236            // Safety: we have checked the prefix
237            let region_id = Self::parse_region_key(&k).unwrap();
238            let region_id = region_id.parse::<u64>().unwrap().into();
239            regions.push(region_id);
240        }
241
242        Ok(regions)
243    }
244}
245
246// utils to concat and parse key/value
247impl MetadataRegion {
248    pub fn concat_region_key(region_id: RegionId) -> String {
249        format!("{REGION_PREFIX}{}", region_id.as_u64())
250    }
251
252    /// Column name will be encoded by base64([STANDARD_NO_PAD])
253    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255        format!(
256            "{COLUMN_PREFIX}{}_{}",
257            region_id.as_u64(),
258            encoded_column_name
259        )
260    }
261
262    /// Concat a column key prefix without column name
263    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265    }
266
267    pub fn parse_region_key(key: &str) -> Option<&str> {
268        key.strip_prefix(REGION_PREFIX)
269    }
270
271    /// Parse column key to (logical_region_id, column_name)
272    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274            let mut iter = stripped.split('_');
275
276            let region_id_raw = iter.next().unwrap();
277            let region_id = region_id_raw
278                .parse::<u64>()
279                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280                .into();
281
282            let encoded_column_name = iter.next().unwrap();
283            let column_name = STANDARD_NO_PAD
284                .decode(encoded_column_name)
285                .context(DecodeColumnValueSnafu)?;
286
287            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288        } else {
289            Ok(None)
290        }
291    }
292
293    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294        serde_json::to_string(column_metadata).unwrap()
295    }
296
297    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299            raw: column_metadata,
300        })
301    }
302}
303
304/// Decode a record batch stream to a stream of items.
305pub fn decode_batch_stream<T: Send + 'static>(
306    mut record_batch_stream: SendableRecordBatchStream,
307    decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309    let stream = try_stream! {
310        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311            for item in decode(batch) {
312                yield item;
313            }
314        }
315    };
316    Box::pin(stream)
317}
318
319/// Decode a record batch to a list of key and value.
320fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321    let keys = batch.iter_column_as_string(0);
322    let values = batch.iter_column_as_string(1);
323    keys.zip(values)
324        .filter_map(|(k, v)| match (k, v) {
325            (Some(k), Some(v)) => Some((k, v)),
326            (Some(k), None) => Some((k, "".to_string())),
327            (None, _) => None,
328        })
329        .collect::<Vec<_>>()
330}
331
332/// Decode a record batch to a list of key.
333fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334    batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337// simulate to `KvBackend`
338//
339// methods in this block assume the given region id is transformed.
340impl MetadataRegion {
341    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344        let projection = if key_only {
345            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346        } else {
347            vec![
348                METADATA_SCHEMA_KEY_COLUMN_INDEX,
349                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350            ]
351        };
352        ScanRequest {
353            projection: Some(projection),
354            filters: vec![filter_expr],
355            ..Default::default()
356        }
357    }
358
359    fn build_read_request() -> ScanRequest {
360        let projection = vec![
361            METADATA_SCHEMA_KEY_COLUMN_INDEX,
362            METADATA_SCHEMA_VALUE_COLUMN_INDEX,
363        ];
364        ScanRequest {
365            projection: Some(projection),
366            ..Default::default()
367        }
368    }
369
370    async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
371        let scan_req = MetadataRegion::build_read_request();
372        let record_batch_stream = self
373            .mito
374            .scan_to_stream(metadata_region_id, scan_req)
375            .await
376            .context(MitoReadOperationSnafu)?;
377
378        let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
379            .try_collect::<BTreeMap<_, _>>()
380            .await?;
381        let mut size = 0;
382        for (k, v) in kv.iter() {
383            size += k.len();
384            size += v.len();
385        }
386        let kv = Arc::new(kv);
387        Ok(RegionMetadataCacheEntry {
388            key_values: kv,
389            size,
390        })
391    }
392
393    async fn get_all_with_prefix(
394        &self,
395        metadata_region_id: RegionId,
396        prefix: &str,
397    ) -> Result<HashMap<String, String>> {
398        let region_metadata = self
399            .cache
400            .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
401            .await
402            .context(CacheGetSnafu)?;
403
404        let mut result = HashMap::new();
405        get_all_with_prefix(&region_metadata, prefix, |k, v| {
406            result.insert(k.to_string(), v.to_string());
407            Ok(())
408        })?;
409        Ok(result)
410    }
411
412    pub async fn get_all_key_with_prefix(
413        &self,
414        region_id: RegionId,
415        prefix: &str,
416    ) -> Result<Vec<String>> {
417        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
418        let record_batch_stream = self
419            .mito
420            .scan_to_stream(region_id, scan_req)
421            .await
422            .context(MitoReadOperationSnafu)?;
423
424        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
425            .try_collect::<Vec<_>>()
426            .await
427    }
428
429    /// Delete the given keys. For performance consideration, this method
430    /// doesn't check if those keys exist or not.
431    async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
432        let delete_request = Self::build_delete_request(keys);
433        self.mito
434            .handle_request(
435                metadata_region_id,
436                store_api::region_request::RegionRequest::Delete(delete_request),
437            )
438            .await
439            .context(MitoWriteOperationSnafu)?;
440        // Invalidates the region metadata cache if any values are deleted from the metadata region.
441        self.cache.invalidate(&metadata_region_id).await;
442
443        Ok(())
444    }
445
446    pub(crate) fn build_put_request_from_iter(
447        kv: impl Iterator<Item = (String, String)>,
448    ) -> RegionPutRequest {
449        let cols = vec![
450            ColumnSchema {
451                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452                datatype: ColumnDataType::TimestampMillisecond as _,
453                semantic_type: SemanticType::Timestamp as _,
454                ..Default::default()
455            },
456            ColumnSchema {
457                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458                datatype: ColumnDataType::String as _,
459                semantic_type: SemanticType::Tag as _,
460                ..Default::default()
461            },
462            ColumnSchema {
463                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
464                datatype: ColumnDataType::String as _,
465                semantic_type: SemanticType::Field as _,
466                ..Default::default()
467            },
468        ];
469        let rows = Rows {
470            schema: cols,
471            rows: kv
472                .into_iter()
473                .map(|(key, value)| {
474                    row(vec![
475                        ValueData::TimestampMillisecondValue(0),
476                        ValueData::StringValue(key),
477                        ValueData::StringValue(value),
478                    ])
479                })
480                .collect(),
481        };
482
483        RegionPutRequest { rows, hint: None }
484    }
485
486    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
487        let cols = vec![
488            ColumnSchema {
489                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
490                datatype: ColumnDataType::TimestampMillisecond as _,
491                semantic_type: SemanticType::Timestamp as _,
492                ..Default::default()
493            },
494            ColumnSchema {
495                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
496                datatype: ColumnDataType::String as _,
497                semantic_type: SemanticType::Tag as _,
498                ..Default::default()
499            },
500        ];
501        let rows = keys
502            .iter()
503            .map(|key| {
504                row(vec![
505                    ValueData::TimestampMillisecondValue(0),
506                    ValueData::StringValue(key.clone()),
507                ])
508            })
509            .collect();
510        let rows = Rows { schema: cols, rows };
511
512        RegionDeleteRequest { rows, hint: None }
513    }
514
515    /// Add logical regions to the metadata region.
516    pub async fn add_logical_regions(
517        &self,
518        physical_region_id: RegionId,
519        write_region_id: bool,
520        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
521    ) -> Result<()> {
522        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
523        let iter = logical_regions
524            .into_iter()
525            .flat_map(|(logical_region_id, column_metadatas)| {
526                if write_region_id {
527                    Some((
528                        MetadataRegion::concat_region_key(logical_region_id),
529                        String::new(),
530                    ))
531                } else {
532                    None
533                }
534                .into_iter()
535                .chain(column_metadatas.into_iter().map(
536                    move |(name, column_metadata)| {
537                        (
538                            MetadataRegion::concat_column_key(logical_region_id, name),
539                            MetadataRegion::serialize_column_metadata(column_metadata),
540                        )
541                    },
542                ))
543            })
544            .collect::<Vec<_>>();
545
546        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
547        self.mito
548            .handle_request(
549                metadata_region_id,
550                store_api::region_request::RegionRequest::Put(put_request),
551            )
552            .await
553            .context(MitoWriteOperationSnafu)?;
554        // Invalidates the region metadata cache if any new values are put into the metadata region.
555        self.cache.invalidate(&metadata_region_id).await;
556
557        Ok(())
558    }
559
560    /// Updates logical region metadata so that any entries previously referencing
561    /// `source_region_id` are modified to reference the data region of `physical_region_id`.
562    ///
563    /// This method should be called after copying files from `source_region_id`
564    /// into the target region. It scans the metadata for the target physical
565    /// region, finds logical regions with the same region number as the source,
566    /// and reinserts region and column entries updated to use the target's
567    /// region number.
568    pub async fn transform_logical_region_metadata(
569        &self,
570        physical_region_id: RegionId,
571        source_region_id: RegionId,
572    ) -> Result<()> {
573        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
574        let data_region_id = utils::to_data_region_id(physical_region_id);
575        let logical_regions = self
576            .logical_regions(data_region_id)
577            .await?
578            .into_iter()
579            .filter(|r| r.region_number() == source_region_id.region_number())
580            .collect::<Vec<_>>();
581        if logical_regions.is_empty() {
582            info!(
583                "No logical regions found from source region {}, physical region id: {}",
584                source_region_id, physical_region_id,
585            );
586            return Ok(());
587        }
588
589        let metadata = self.load_all(metadata_region_id).await?;
590        let mut output = Vec::new();
591        for logical_region_id in &logical_regions {
592            let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
593            get_all_with_prefix(&metadata, &prefix, |k, v| {
594                // Safety: we have checked the prefix
595                let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
596                // Change the region number to the data region number.
597                let new_key = MetadataRegion::concat_column_key(
598                    RegionId::new(
599                        src_logical_region_id.table_id(),
600                        data_region_id.region_number(),
601                    ),
602                    &column_name,
603                );
604                output.push((new_key, v.to_string()));
605                Ok(())
606            })?;
607
608            let new_key = MetadataRegion::concat_region_key(RegionId::new(
609                logical_region_id.table_id(),
610                data_region_id.region_number(),
611            ));
612            output.push((new_key, String::new()));
613        }
614
615        if output.is_empty() {
616            warn!(
617                "No logical regions metadata found from source region {}, physical region id: {}",
618                source_region_id, physical_region_id
619            );
620            return Ok(());
621        }
622
623        debug!(
624            "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
625            data_region_id,
626            source_region_id,
627            output.len(),
628        );
629
630        let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
631        self.mito
632            .handle_request(
633                metadata_region_id,
634                store_api::region_request::RegionRequest::Put(put_request),
635            )
636            .await
637            .context(MitoWriteOperationSnafu)?;
638        info!(
639            "Transformed {} logical regions metadata to physical region {}, source region: {}",
640            logical_regions.len(),
641            data_region_id,
642            source_region_id
643        );
644        self.cache.invalidate(&metadata_region_id).await;
645        Ok(())
646    }
647}
648
649fn get_all_with_prefix(
650    region_metadata: &RegionMetadataCacheEntry,
651    prefix: &str,
652    mut callback: impl FnMut(&str, &str) -> Result<()>,
653) -> Result<()> {
654    let range = region_metadata.key_values.range(prefix.to_string()..);
655    for (k, v) in range {
656        if !k.starts_with(prefix) {
657            break;
658        }
659        callback(k, v)?;
660    }
661    Ok(())
662}
663
664#[cfg(test)]
665impl MetadataRegion {
666    /// Retrieves the value associated with the given key in the specified region.
667    /// Returns `Ok(None)` if the key is not found.
668    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
669        use datatypes::arrow::array::{Array, AsArray};
670
671        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
672            .eq(datafusion::prelude::lit(key));
673
674        let scan_req = ScanRequest {
675            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
676            filters: vec![filter_expr],
677            ..Default::default()
678        };
679        let record_batch_stream = self
680            .mito
681            .scan_to_stream(region_id, scan_req)
682            .await
683            .context(MitoReadOperationSnafu)?;
684        let scan_result = common_recordbatch::util::collect(record_batch_stream)
685            .await
686            .context(CollectRecordBatchStreamSnafu)?;
687
688        let Some(first_batch) = scan_result.first() else {
689            return Ok(None);
690        };
691
692        let column = first_batch.column(0);
693        let column = column.as_string::<i32>();
694        let val = column.is_valid(0).then(|| column.value(0).to_string());
695
696        Ok(val)
697    }
698
699    /// Check if the given column exists. Return the semantic type if exists.
700    pub async fn column_semantic_type(
701        &self,
702        physical_region_id: RegionId,
703        logical_region_id: RegionId,
704        column_name: &str,
705    ) -> Result<Option<SemanticType>> {
706        let region_id = utils::to_metadata_region_id(physical_region_id);
707        let column_key = Self::concat_column_key(logical_region_id, column_name);
708        let semantic_type = self.get(region_id, &column_key).await?;
709        semantic_type
710            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
711            .transpose()
712    }
713}
714
715#[cfg(test)]
716mod test {
717    use datatypes::data_type::ConcreteDataType;
718    use datatypes::schema::ColumnSchema;
719
720    use super::*;
721    use crate::test_util::TestEnv;
722    use crate::utils::to_metadata_region_id;
723
724    #[test]
725    fn test_concat_table_key() {
726        let region_id = RegionId::new(1234, 7844);
727        let expected = "__region_5299989651108".to_string();
728        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
729    }
730
731    #[test]
732    fn test_concat_column_key() {
733        let region_id = RegionId::new(8489, 9184);
734        let column_name = "my_column";
735        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
736        assert_eq!(
737            MetadataRegion::concat_column_key(region_id, column_name),
738            expected
739        );
740    }
741
742    #[test]
743    fn test_parse_table_key() {
744        let region_id = RegionId::new(87474, 10607);
745        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
746        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
747
748        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
749        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
750    }
751
752    #[test]
753    fn test_parse_valid_column_key() {
754        let region_id = RegionId::new(176, 910);
755        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
756        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
757
758        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
759        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
760    }
761
762    #[test]
763    fn test_parse_invalid_column_key() {
764        let key = "__column_asdfasd_????";
765        let result = MetadataRegion::parse_column_key(key);
766        assert!(result.is_err());
767    }
768
769    #[test]
770    fn test_serialize_column_metadata() {
771        let semantic_type = SemanticType::Tag;
772        let column_metadata = ColumnMetadata {
773            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
774            semantic_type,
775            column_id: 5,
776        };
777        let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
778        let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
779        assert_eq!(
780            MetadataRegion::serialize_column_metadata(&column_metadata),
781            new_fmt
782        );
783        // Ensure both old and new formats can be deserialized.
784        assert_eq!(
785            MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
786            column_metadata
787        );
788        assert_eq!(
789            MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
790            column_metadata
791        );
792
793        let semantic_type = "\"Invalid Column Metadata\"";
794        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
795    }
796
797    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
798        HashMap::from([
799            (
800                "label1".to_string(),
801                ColumnMetadata {
802                    column_schema: ColumnSchema::new(
803                        "label1".to_string(),
804                        ConcreteDataType::string_datatype(),
805                        false,
806                    ),
807                    semantic_type: SemanticType::Tag,
808                    column_id: 5,
809                },
810            ),
811            (
812                "label2".to_string(),
813                ColumnMetadata {
814                    column_schema: ColumnSchema::new(
815                        "label2".to_string(),
816                        ConcreteDataType::string_datatype(),
817                        false,
818                    ),
819                    semantic_type: SemanticType::Tag,
820                    column_id: 5,
821                },
822            ),
823        ])
824    }
825
826    #[tokio::test]
827    async fn add_logical_regions_to_meta_region() {
828        let env = TestEnv::new().await;
829        env.init_metric_region().await;
830        let metadata_region = env.metadata_region();
831        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
832        let column_metadatas = test_column_metadatas();
833        let logical_region_id = RegionId::new(1024, 1);
834
835        let iter = vec![(
836            logical_region_id,
837            column_metadatas
838                .iter()
839                .map(|(k, v)| (k.as_str(), v))
840                .collect::<HashMap<_, _>>(),
841        )];
842        metadata_region
843            .add_logical_regions(physical_region_id, true, iter.into_iter())
844            .await
845            .unwrap();
846        // Add logical region again.
847        let iter = vec![(
848            logical_region_id,
849            column_metadatas
850                .iter()
851                .map(|(k, v)| (k.as_str(), v))
852                .collect::<HashMap<_, _>>(),
853        )];
854        metadata_region
855            .add_logical_regions(physical_region_id, true, iter.into_iter())
856            .await
857            .unwrap();
858
859        // Check if the logical region is added.
860        let logical_regions = metadata_region
861            .logical_regions(physical_region_id)
862            .await
863            .unwrap();
864        assert_eq!(logical_regions.len(), 2);
865
866        // Check if the logical region columns are added.
867        let logical_columns = metadata_region
868            .logical_columns(physical_region_id, logical_region_id)
869            .await
870            .unwrap()
871            .into_iter()
872            .collect::<HashMap<_, _>>();
873        assert_eq!(logical_columns.len(), 2);
874        assert_eq!(column_metadatas, logical_columns);
875    }
876}