metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40    METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48    CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49    DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50    MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57/// The other two fields key and value will be used as a k-v storage.
58/// It contains two group of key:
59/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
60/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
61///   the value is column's semantic type. To avoid the key conflict, this column key
62///   will be encoded by base64([STANDARD_NO_PAD]).
63///
64/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
65/// will handle all the metadata related operations across physical tables. Thus
66/// every operation should be associated to a [RegionId], which is the physical
67/// table id + region sequence. This handler will transform the region group by
68/// itself.
69pub struct MetadataRegion {
70    pub(crate) mito: MitoEngine,
71    /// The cache for contents(key-value pairs) of region metadata.
72    ///
73    /// The cache should be invalidated when any new values are put into the metadata region or any
74    /// values are deleted from the metadata region.
75    cache: Cache<RegionId, RegionMetadataCacheEntry>,
76    /// Logical lock for operations that need to be serialized. Like update & read region columns.
77    ///
78    /// Region entry will be registered on creating and opening logical region, and deregistered on
79    /// removing logical region.
80    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85    key_values: Arc<BTreeMap<String, String>>,
86    size: usize,
87}
88
89/// The max size of the region metadata cache.
90const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91/// The TTL of the region metadata cache.
92const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95    pub fn new(mito: MitoEngine) -> Self {
96        let cache = Cache::builder()
97            .max_capacity(MAX_CACHE_SIZE)
98            // Use the LRU eviction policy to minimize frequent mito scans.
99            // Recently accessed items are retained longer in the cache.
100            .eviction_policy(EvictionPolicy::lru())
101            .time_to_live(CACHE_TTL)
102            .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103            .build();
104        Self {
105            mito,
106            cache,
107            logical_region_lock: RwLock::new(HashMap::new()),
108        }
109    }
110
111    /// Open a logical region.
112    ///
113    /// Returns true if the logical region is opened for the first time.
114    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115        match self
116            .logical_region_lock
117            .write()
118            .await
119            .entry(logical_region_id)
120        {
121            Entry::Occupied(_) => false,
122            Entry::Vacant(vacant_entry) => {
123                vacant_entry.insert(Arc::new(RwLock::new(())));
124                true
125            }
126        }
127    }
128
129    /// Retrieve a read lock guard of given logical region id.
130    pub async fn read_lock_logical_region(
131        &self,
132        logical_region_id: RegionId,
133    ) -> Result<OwnedRwLockReadGuard<()>> {
134        let lock = self
135            .logical_region_lock
136            .read()
137            .await
138            .get(&logical_region_id)
139            .context(LogicalRegionNotFoundSnafu {
140                region_id: logical_region_id,
141            })?
142            .clone();
143        Ok(RwLock::read_owned(lock).await)
144    }
145
146    /// Retrieve a write lock guard of given logical region id.
147    pub async fn write_lock_logical_region(
148        &self,
149        logical_region_id: RegionId,
150    ) -> Result<OwnedRwLockWriteGuard<()>> {
151        let lock = self
152            .logical_region_lock
153            .read()
154            .await
155            .get(&logical_region_id)
156            .context(LogicalRegionNotFoundSnafu {
157                region_id: logical_region_id,
158            })?
159            .clone();
160        Ok(RwLock::write_owned(lock).await)
161    }
162
163    /// Remove a registered logical region from metadata.
164    ///
165    /// This method doesn't check if the previous key exists.
166    pub async fn remove_logical_region(
167        &self,
168        physical_region_id: RegionId,
169        logical_region_id: RegionId,
170    ) -> Result<()> {
171        // concat region key
172        let region_id = utils::to_metadata_region_id(physical_region_id);
173        let region_key = Self::concat_region_key(logical_region_id);
174
175        // concat column keys
176        let logical_columns = self
177            .logical_columns(physical_region_id, logical_region_id)
178            .await?;
179        let mut column_keys = logical_columns
180            .into_iter()
181            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182            .collect::<Vec<_>>();
183
184        // remove region key and column keys
185        column_keys.push(region_key);
186        self.delete(region_id, &column_keys).await?;
187
188        self.logical_region_lock
189            .write()
190            .await
191            .remove(&logical_region_id);
192
193        Ok(())
194    }
195
196    // TODO(ruihang): avoid using `get_all`
197    /// Get all the columns of a given logical region.
198    /// Return a list of (column_name, column_metadata).
199    pub async fn logical_columns(
200        &self,
201        physical_region_id: RegionId,
202        logical_region_id: RegionId,
203    ) -> Result<Vec<(String, ColumnMetadata)>> {
204        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207        let mut columns = vec![];
208        for (k, v) in self
209            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
210            .await?
211        {
212            if !k.starts_with(&region_column_prefix) {
213                continue;
214            }
215            // Safety: we have checked the prefix
216            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217            let column_metadata = Self::deserialize_column_metadata(&v)?;
218            columns.push((column_name, column_metadata));
219        }
220
221        Ok(columns)
222    }
223
224    /// Return all logical regions associated with the physical region.
225    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228        let mut regions = vec![];
229        for k in self
230            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231            .await?
232        {
233            if !k.starts_with(REGION_PREFIX) {
234                continue;
235            }
236            // Safety: we have checked the prefix
237            let region_id = Self::parse_region_key(&k).unwrap();
238            let region_id = region_id.parse::<u64>().unwrap().into();
239            regions.push(region_id);
240        }
241
242        Ok(regions)
243    }
244}
245
246// utils to concat and parse key/value
247impl MetadataRegion {
248    pub fn concat_region_key(region_id: RegionId) -> String {
249        format!("{REGION_PREFIX}{}", region_id.as_u64())
250    }
251
252    /// Column name will be encoded by base64([STANDARD_NO_PAD])
253    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255        format!(
256            "{COLUMN_PREFIX}{}_{}",
257            region_id.as_u64(),
258            encoded_column_name
259        )
260    }
261
262    /// Concat a column key prefix without column name
263    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265    }
266
267    pub fn parse_region_key(key: &str) -> Option<&str> {
268        key.strip_prefix(REGION_PREFIX)
269    }
270
271    /// Parse column key to (logical_region_id, column_name)
272    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274            let mut iter = stripped.split('_');
275
276            let region_id_raw = iter.next().unwrap();
277            let region_id = region_id_raw
278                .parse::<u64>()
279                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280                .into();
281
282            let encoded_column_name = iter.next().unwrap();
283            let column_name = STANDARD_NO_PAD
284                .decode(encoded_column_name)
285                .context(DecodeColumnValueSnafu)?;
286
287            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288        } else {
289            Ok(None)
290        }
291    }
292
293    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294        serde_json::to_string(column_metadata).unwrap()
295    }
296
297    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299            raw: column_metadata,
300        })
301    }
302}
303
304/// Decode a record batch stream to a stream of items.
305pub fn decode_batch_stream<T: Send + 'static>(
306    mut record_batch_stream: SendableRecordBatchStream,
307    decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309    let stream = try_stream! {
310        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311            for item in decode(batch) {
312                yield item;
313            }
314        }
315    };
316    Box::pin(stream)
317}
318
319/// Decode a record batch to a list of key and value.
320fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321    let keys = batch.iter_column_as_string(0);
322    let values = batch.iter_column_as_string(1);
323    keys.zip(values)
324        .filter_map(|(k, v)| match (k, v) {
325            (Some(k), Some(v)) => Some((k, v)),
326            (Some(k), None) => Some((k, "".to_string())),
327            (None, _) => None,
328        })
329        .collect::<Vec<_>>()
330}
331
332/// Decode a record batch to a list of key.
333fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334    batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337// simulate to `KvBackend`
338//
339// methods in this block assume the given region id is transformed.
340impl MetadataRegion {
341    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344        let projection = if key_only {
345            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346        } else {
347            vec![
348                METADATA_SCHEMA_KEY_COLUMN_INDEX,
349                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350            ]
351        };
352        ScanRequest {
353            projection: Some(projection),
354            filters: vec![filter_expr],
355            ..Default::default()
356        }
357    }
358
359    fn build_read_request() -> ScanRequest {
360        let projection = vec![
361            METADATA_SCHEMA_KEY_COLUMN_INDEX,
362            METADATA_SCHEMA_VALUE_COLUMN_INDEX,
363        ];
364        ScanRequest {
365            projection: Some(projection),
366            ..Default::default()
367        }
368    }
369
370    async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
371        let scan_req = MetadataRegion::build_read_request();
372        let record_batch_stream = self
373            .mito
374            .scan_to_stream(metadata_region_id, scan_req)
375            .await
376            .context(MitoReadOperationSnafu)?;
377
378        let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
379            .try_collect::<BTreeMap<_, _>>()
380            .await?;
381        let mut size = 0;
382        for (k, v) in kv.iter() {
383            size += k.len();
384            size += v.len();
385        }
386        let kv = Arc::new(kv);
387        Ok(RegionMetadataCacheEntry {
388            key_values: kv,
389            size,
390        })
391    }
392
393    async fn get_all_with_prefix(
394        &self,
395        metadata_region_id: RegionId,
396        prefix: &str,
397    ) -> Result<HashMap<String, String>> {
398        let region_metadata = self
399            .cache
400            .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
401            .await
402            .context(CacheGetSnafu)?;
403
404        let mut result = HashMap::new();
405        get_all_with_prefix(&region_metadata, prefix, |k, v| {
406            result.insert(k.to_string(), v.to_string());
407            Ok(())
408        })?;
409        Ok(result)
410    }
411
412    pub async fn get_all_key_with_prefix(
413        &self,
414        region_id: RegionId,
415        prefix: &str,
416    ) -> Result<Vec<String>> {
417        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
418        let record_batch_stream = self
419            .mito
420            .scan_to_stream(region_id, scan_req)
421            .await
422            .context(MitoReadOperationSnafu)?;
423
424        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
425            .try_collect::<Vec<_>>()
426            .await
427    }
428
429    /// Delete the given keys. For performance consideration, this method
430    /// doesn't check if those keys exist or not.
431    async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
432        let delete_request = Self::build_delete_request(keys);
433        self.mito
434            .handle_request(
435                metadata_region_id,
436                store_api::region_request::RegionRequest::Delete(delete_request),
437            )
438            .await
439            .context(MitoWriteOperationSnafu)?;
440        // Invalidates the region metadata cache if any values are deleted from the metadata region.
441        self.cache.invalidate(&metadata_region_id).await;
442
443        Ok(())
444    }
445
446    pub(crate) fn build_put_request_from_iter(
447        kv: impl Iterator<Item = (String, String)>,
448    ) -> RegionPutRequest {
449        let cols = vec![
450            ColumnSchema {
451                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452                datatype: ColumnDataType::TimestampMillisecond as _,
453                semantic_type: SemanticType::Timestamp as _,
454                ..Default::default()
455            },
456            ColumnSchema {
457                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458                datatype: ColumnDataType::String as _,
459                semantic_type: SemanticType::Tag as _,
460                ..Default::default()
461            },
462            ColumnSchema {
463                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
464                datatype: ColumnDataType::String as _,
465                semantic_type: SemanticType::Field as _,
466                ..Default::default()
467            },
468        ];
469        let rows = Rows {
470            schema: cols,
471            rows: kv
472                .into_iter()
473                .map(|(key, value)| {
474                    row(vec![
475                        ValueData::TimestampMillisecondValue(0),
476                        ValueData::StringValue(key),
477                        ValueData::StringValue(value),
478                    ])
479                })
480                .collect(),
481        };
482
483        RegionPutRequest {
484            rows,
485            hint: None,
486            partition_expr_version: None,
487        }
488    }
489
490    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
491        let cols = vec![
492            ColumnSchema {
493                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
494                datatype: ColumnDataType::TimestampMillisecond as _,
495                semantic_type: SemanticType::Timestamp as _,
496                ..Default::default()
497            },
498            ColumnSchema {
499                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
500                datatype: ColumnDataType::String as _,
501                semantic_type: SemanticType::Tag as _,
502                ..Default::default()
503            },
504        ];
505        let rows = keys
506            .iter()
507            .map(|key| {
508                row(vec![
509                    ValueData::TimestampMillisecondValue(0),
510                    ValueData::StringValue(key.clone()),
511                ])
512            })
513            .collect();
514        let rows = Rows { schema: cols, rows };
515
516        RegionDeleteRequest {
517            rows,
518            hint: None,
519            partition_expr_version: None,
520        }
521    }
522
523    /// Add logical regions to the metadata region.
524    pub async fn add_logical_regions(
525        &self,
526        physical_region_id: RegionId,
527        write_region_id: bool,
528        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
529    ) -> Result<()> {
530        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
531        let iter = logical_regions
532            .into_iter()
533            .flat_map(|(logical_region_id, column_metadatas)| {
534                if write_region_id {
535                    Some((
536                        MetadataRegion::concat_region_key(logical_region_id),
537                        String::new(),
538                    ))
539                } else {
540                    None
541                }
542                .into_iter()
543                .chain(column_metadatas.into_iter().map(
544                    move |(name, column_metadata)| {
545                        (
546                            MetadataRegion::concat_column_key(logical_region_id, name),
547                            MetadataRegion::serialize_column_metadata(column_metadata),
548                        )
549                    },
550                ))
551            })
552            .collect::<Vec<_>>();
553
554        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
555        self.mito
556            .handle_request(
557                metadata_region_id,
558                store_api::region_request::RegionRequest::Put(put_request),
559            )
560            .await
561            .context(MitoWriteOperationSnafu)?;
562        // Invalidates the region metadata cache if any new values are put into the metadata region.
563        self.cache.invalidate(&metadata_region_id).await;
564
565        Ok(())
566    }
567
568    /// Updates logical region metadata so that any entries previously referencing
569    /// `source_region_id` are modified to reference the data region of `physical_region_id`.
570    ///
571    /// This method should be called after copying files from `source_region_id`
572    /// into the target region. It scans the metadata for the target physical
573    /// region, finds logical regions with the same region number as the source,
574    /// and reinserts region and column entries updated to use the target's
575    /// region number.
576    pub async fn transform_logical_region_metadata(
577        &self,
578        physical_region_id: RegionId,
579        source_region_id: RegionId,
580    ) -> Result<()> {
581        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
582        let data_region_id = utils::to_data_region_id(physical_region_id);
583        let logical_regions = self
584            .logical_regions(data_region_id)
585            .await?
586            .into_iter()
587            .filter(|r| r.region_number() == source_region_id.region_number())
588            .collect::<Vec<_>>();
589        if logical_regions.is_empty() {
590            info!(
591                "No logical regions found from source region {}, physical region id: {}",
592                source_region_id, physical_region_id,
593            );
594            return Ok(());
595        }
596
597        let metadata = self.load_all(metadata_region_id).await?;
598        let mut output = Vec::new();
599        for logical_region_id in &logical_regions {
600            let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
601            get_all_with_prefix(&metadata, &prefix, |k, v| {
602                // Safety: we have checked the prefix
603                let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
604                // Change the region number to the data region number.
605                let new_key = MetadataRegion::concat_column_key(
606                    RegionId::new(
607                        src_logical_region_id.table_id(),
608                        data_region_id.region_number(),
609                    ),
610                    &column_name,
611                );
612                output.push((new_key, v.to_string()));
613                Ok(())
614            })?;
615
616            let new_key = MetadataRegion::concat_region_key(RegionId::new(
617                logical_region_id.table_id(),
618                data_region_id.region_number(),
619            ));
620            output.push((new_key, String::new()));
621        }
622
623        if output.is_empty() {
624            warn!(
625                "No logical regions metadata found from source region {}, physical region id: {}",
626                source_region_id, physical_region_id
627            );
628            return Ok(());
629        }
630
631        debug!(
632            "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
633            data_region_id,
634            source_region_id,
635            output.len(),
636        );
637
638        let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
639        self.mito
640            .handle_request(
641                metadata_region_id,
642                store_api::region_request::RegionRequest::Put(put_request),
643            )
644            .await
645            .context(MitoWriteOperationSnafu)?;
646        info!(
647            "Transformed {} logical regions metadata to physical region {}, source region: {}",
648            logical_regions.len(),
649            data_region_id,
650            source_region_id
651        );
652        self.cache.invalidate(&metadata_region_id).await;
653        Ok(())
654    }
655}
656
657fn get_all_with_prefix(
658    region_metadata: &RegionMetadataCacheEntry,
659    prefix: &str,
660    mut callback: impl FnMut(&str, &str) -> Result<()>,
661) -> Result<()> {
662    let range = region_metadata.key_values.range(prefix.to_string()..);
663    for (k, v) in range {
664        if !k.starts_with(prefix) {
665            break;
666        }
667        callback(k, v)?;
668    }
669    Ok(())
670}
671
672#[cfg(test)]
673impl MetadataRegion {
674    /// Retrieves the value associated with the given key in the specified region.
675    /// Returns `Ok(None)` if the key is not found.
676    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
677        use datatypes::arrow::array::{Array, AsArray};
678
679        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
680            .eq(datafusion::prelude::lit(key));
681
682        let scan_req = ScanRequest {
683            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
684            filters: vec![filter_expr],
685            ..Default::default()
686        };
687        let record_batch_stream = self
688            .mito
689            .scan_to_stream(region_id, scan_req)
690            .await
691            .context(MitoReadOperationSnafu)?;
692        let scan_result = common_recordbatch::util::collect(record_batch_stream)
693            .await
694            .context(CollectRecordBatchStreamSnafu)?;
695
696        let Some(first_batch) = scan_result.first() else {
697            return Ok(None);
698        };
699
700        let column = first_batch.column(0);
701        let column = column.as_string::<i32>();
702        let val = column.is_valid(0).then(|| column.value(0).to_string());
703
704        Ok(val)
705    }
706
707    /// Check if the given column exists. Return the semantic type if exists.
708    pub async fn column_semantic_type(
709        &self,
710        physical_region_id: RegionId,
711        logical_region_id: RegionId,
712        column_name: &str,
713    ) -> Result<Option<SemanticType>> {
714        let region_id = utils::to_metadata_region_id(physical_region_id);
715        let column_key = Self::concat_column_key(logical_region_id, column_name);
716        let semantic_type = self.get(region_id, &column_key).await?;
717        semantic_type
718            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
719            .transpose()
720    }
721}
722
723#[cfg(test)]
724mod test {
725    use datatypes::data_type::ConcreteDataType;
726    use datatypes::schema::ColumnSchema;
727
728    use super::*;
729    use crate::test_util::TestEnv;
730    use crate::utils::to_metadata_region_id;
731
732    #[test]
733    fn test_concat_table_key() {
734        let region_id = RegionId::new(1234, 7844);
735        let expected = "__region_5299989651108".to_string();
736        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
737    }
738
739    #[test]
740    fn test_concat_column_key() {
741        let region_id = RegionId::new(8489, 9184);
742        let column_name = "my_column";
743        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
744        assert_eq!(
745            MetadataRegion::concat_column_key(region_id, column_name),
746            expected
747        );
748    }
749
750    #[test]
751    fn test_parse_table_key() {
752        let region_id = RegionId::new(87474, 10607);
753        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
754        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
755
756        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
757        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
758    }
759
760    #[test]
761    fn test_parse_valid_column_key() {
762        let region_id = RegionId::new(176, 910);
763        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
764        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
765
766        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
767        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
768    }
769
770    #[test]
771    fn test_parse_invalid_column_key() {
772        let key = "__column_asdfasd_????";
773        let result = MetadataRegion::parse_column_key(key);
774        assert!(result.is_err());
775    }
776
777    #[test]
778    fn test_serialize_column_metadata() {
779        let semantic_type = SemanticType::Tag;
780        let column_metadata = ColumnMetadata {
781            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
782            semantic_type,
783            column_id: 5,
784        };
785        let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
786        let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
787        assert_eq!(
788            MetadataRegion::serialize_column_metadata(&column_metadata),
789            new_fmt
790        );
791        // Ensure both old and new formats can be deserialized.
792        assert_eq!(
793            MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
794            column_metadata
795        );
796        assert_eq!(
797            MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
798            column_metadata
799        );
800
801        let semantic_type = "\"Invalid Column Metadata\"";
802        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
803    }
804
805    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
806        HashMap::from([
807            (
808                "label1".to_string(),
809                ColumnMetadata {
810                    column_schema: ColumnSchema::new(
811                        "label1".to_string(),
812                        ConcreteDataType::string_datatype(),
813                        false,
814                    ),
815                    semantic_type: SemanticType::Tag,
816                    column_id: 5,
817                },
818            ),
819            (
820                "label2".to_string(),
821                ColumnMetadata {
822                    column_schema: ColumnSchema::new(
823                        "label2".to_string(),
824                        ConcreteDataType::string_datatype(),
825                        false,
826                    ),
827                    semantic_type: SemanticType::Tag,
828                    column_id: 5,
829                },
830            ),
831        ])
832    }
833
834    #[tokio::test]
835    async fn add_logical_regions_to_meta_region() {
836        let env = TestEnv::new().await;
837        env.init_metric_region().await;
838        let metadata_region = env.metadata_region();
839        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
840        let column_metadatas = test_column_metadatas();
841        let logical_region_id = RegionId::new(1024, 1);
842
843        let iter = vec![(
844            logical_region_id,
845            column_metadatas
846                .iter()
847                .map(|(k, v)| (k.as_str(), v))
848                .collect::<HashMap<_, _>>(),
849        )];
850        metadata_region
851            .add_logical_regions(physical_region_id, true, iter.into_iter())
852            .await
853            .unwrap();
854        // Add logical region again.
855        let iter = vec![(
856            logical_region_id,
857            column_metadatas
858                .iter()
859                .map(|(k, v)| (k.as_str(), v))
860                .collect::<HashMap<_, _>>(),
861        )];
862        metadata_region
863            .add_logical_regions(physical_region_id, true, iter.into_iter())
864            .await
865            .unwrap();
866
867        // Check if the logical region is added.
868        let logical_regions = metadata_region
869            .logical_regions(physical_region_id)
870            .await
871            .unwrap();
872        assert_eq!(logical_regions.len(), 2);
873
874        // Check if the logical region columns are added.
875        let logical_columns = metadata_region
876            .logical_columns(physical_region_id, logical_region_id)
877            .await
878            .unwrap()
879            .into_iter()
880            .collect::<HashMap<_, _>>();
881        assert_eq!(logical_columns.len(), 2);
882        assert_eq!(column_metadatas, logical_columns);
883    }
884}