Skip to main content

metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::{BTreeMap, HashMap};
17use std::sync::Arc;
18use std::time::Duration;
19
20use api::v1::helper::row;
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Rows, SemanticType};
23use async_stream::try_stream;
24use base64::Engine;
25use base64::engine::general_purpose::STANDARD_NO_PAD;
26use common_base::readable_size::ReadableSize;
27use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
28use common_telemetry::{debug, info, warn};
29use datafusion::prelude::{col, lit};
30use futures_util::TryStreamExt;
31use futures_util::stream::BoxStream;
32use mito2::engine::MitoEngine;
33use moka::future::Cache;
34use moka::policy::EvictionPolicy;
35use snafu::{OptionExt, ResultExt};
36use store_api::metadata::ColumnMetadata;
37use store_api::metric_engine_consts::{
38    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
39    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
40    METADATA_SCHEMA_VALUE_COLUMN_NAME,
41};
42use store_api::region_engine::RegionEngine;
43use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
44use store_api::storage::{RegionId, ScanRequest};
45use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
46
47use crate::error::{
48    CacheGetSnafu, CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu,
49    DeserializeColumnMetadataSnafu, LogicalRegionNotFoundSnafu, MitoReadOperationSnafu,
50    MitoWriteOperationSnafu, ParseRegionIdSnafu, Result,
51};
52use crate::utils;
53
54const REGION_PREFIX: &str = "__region_";
55const COLUMN_PREFIX: &str = "__column_";
56
57/// The other two fields key and value will be used as a k-v storage.
58/// It contains two group of key:
59/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
60/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
61///   the value is column's semantic type. To avoid the key conflict, this column key
62///   will be encoded by base64([STANDARD_NO_PAD]).
63///
64/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
65/// will handle all the metadata related operations across physical tables. Thus
66/// every operation should be associated to a [RegionId], which is the physical
67/// table id + region sequence. This handler will transform the region group by
68/// itself.
69pub struct MetadataRegion {
70    pub(crate) mito: MitoEngine,
71    /// The cache for contents(key-value pairs) of region metadata.
72    ///
73    /// The cache should be invalidated when any new values are put into the metadata region or any
74    /// values are deleted from the metadata region.
75    cache: Cache<RegionId, RegionMetadataCacheEntry>,
76    /// Logical lock for operations that need to be serialized. Like update & read region columns.
77    ///
78    /// Region entry will be registered on creating and opening logical region, and deregistered on
79    /// removing logical region.
80    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
81}
82
83#[derive(Clone)]
84struct RegionMetadataCacheEntry {
85    key_values: Arc<BTreeMap<String, String>>,
86    size: usize,
87}
88
89/// The max size of the region metadata cache.
90const MAX_CACHE_SIZE: u64 = ReadableSize::mb(128).as_bytes();
91/// The TTL of the region metadata cache.
92const CACHE_TTL: Duration = Duration::from_secs(5 * 60);
93
94impl MetadataRegion {
95    pub fn new(mito: MitoEngine) -> Self {
96        let cache = Cache::builder()
97            .max_capacity(MAX_CACHE_SIZE)
98            // Use the LRU eviction policy to minimize frequent mito scans.
99            // Recently accessed items are retained longer in the cache.
100            .eviction_policy(EvictionPolicy::lru())
101            .time_to_live(CACHE_TTL)
102            .weigher(|_, v: &RegionMetadataCacheEntry| v.size as u32)
103            .build();
104        Self {
105            mito,
106            cache,
107            logical_region_lock: RwLock::new(HashMap::new()),
108        }
109    }
110
111    /// Open a logical region.
112    ///
113    /// Returns true if the logical region is opened for the first time.
114    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
115        match self
116            .logical_region_lock
117            .write()
118            .await
119            .entry(logical_region_id)
120        {
121            Entry::Occupied(_) => false,
122            Entry::Vacant(vacant_entry) => {
123                vacant_entry.insert(Arc::new(RwLock::new(())));
124                true
125            }
126        }
127    }
128
129    /// Retrieve a read lock guard of given logical region id.
130    pub async fn read_lock_logical_region(
131        &self,
132        logical_region_id: RegionId,
133    ) -> Result<OwnedRwLockReadGuard<()>> {
134        let lock = self
135            .logical_region_lock
136            .read()
137            .await
138            .get(&logical_region_id)
139            .context(LogicalRegionNotFoundSnafu {
140                region_id: logical_region_id,
141            })?
142            .clone();
143        Ok(RwLock::read_owned(lock).await)
144    }
145
146    /// Retrieve a write lock guard of given logical region id.
147    pub async fn write_lock_logical_region(
148        &self,
149        logical_region_id: RegionId,
150    ) -> Result<OwnedRwLockWriteGuard<()>> {
151        let lock = self
152            .logical_region_lock
153            .read()
154            .await
155            .get(&logical_region_id)
156            .context(LogicalRegionNotFoundSnafu {
157                region_id: logical_region_id,
158            })?
159            .clone();
160        Ok(RwLock::write_owned(lock).await)
161    }
162
163    /// Remove a registered logical region from metadata.
164    ///
165    /// This method doesn't check if the previous key exists.
166    pub async fn remove_logical_region(
167        &self,
168        physical_region_id: RegionId,
169        logical_region_id: RegionId,
170    ) -> Result<()> {
171        // concat region key
172        let region_id = utils::to_metadata_region_id(physical_region_id);
173        let region_key = Self::concat_region_key(logical_region_id);
174
175        // concat column keys
176        let logical_columns = self
177            .logical_columns(physical_region_id, logical_region_id)
178            .await?;
179        let mut column_keys = logical_columns
180            .into_iter()
181            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
182            .collect::<Vec<_>>();
183
184        // remove region key and column keys
185        column_keys.push(region_key);
186        self.delete(region_id, &column_keys).await?;
187
188        self.logical_region_lock
189            .write()
190            .await
191            .remove(&logical_region_id);
192
193        Ok(())
194    }
195
196    // TODO(ruihang): avoid using `get_all`
197    /// Get all the columns of a given logical region.
198    /// Return a list of (column_name, column_metadata).
199    pub async fn logical_columns(
200        &self,
201        physical_region_id: RegionId,
202        logical_region_id: RegionId,
203    ) -> Result<Vec<(String, ColumnMetadata)>> {
204        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
205        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
206
207        let mut columns = vec![];
208        for (k, v) in self
209            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
210            .await?
211        {
212            if !k.starts_with(&region_column_prefix) {
213                continue;
214            }
215            // Safety: we have checked the prefix
216            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
217            let column_metadata = Self::deserialize_column_metadata(&v)?;
218            columns.push((column_name, column_metadata));
219        }
220
221        Ok(columns)
222    }
223
224    /// Return all logical regions associated with the physical region.
225    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
226        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
227
228        let mut regions = vec![];
229        for k in self
230            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
231            .await?
232        {
233            if !k.starts_with(REGION_PREFIX) {
234                continue;
235            }
236            // Safety: we have checked the prefix
237            let region_id = Self::parse_region_key(&k).unwrap();
238            let region_id = region_id.parse::<u64>().unwrap().into();
239            regions.push(region_id);
240        }
241
242        Ok(regions)
243    }
244}
245
246// utils to concat and parse key/value
247impl MetadataRegion {
248    pub fn concat_region_key(region_id: RegionId) -> String {
249        format!("{REGION_PREFIX}{}", region_id.as_u64())
250    }
251
252    /// Column name will be encoded by base64([STANDARD_NO_PAD])
253    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
254        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
255        format!(
256            "{COLUMN_PREFIX}{}_{}",
257            region_id.as_u64(),
258            encoded_column_name
259        )
260    }
261
262    /// Concat a column key prefix without column name
263    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
264        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
265    }
266
267    pub fn parse_region_key(key: &str) -> Option<&str> {
268        key.strip_prefix(REGION_PREFIX)
269    }
270
271    /// Parse column key to (logical_region_id, column_name)
272    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
273        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
274            let mut iter = stripped.split('_');
275
276            let region_id_raw = iter.next().unwrap();
277            let region_id = region_id_raw
278                .parse::<u64>()
279                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
280                .into();
281
282            let encoded_column_name = iter.next().unwrap();
283            let column_name = STANDARD_NO_PAD
284                .decode(encoded_column_name)
285                .context(DecodeColumnValueSnafu)?;
286
287            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
288        } else {
289            Ok(None)
290        }
291    }
292
293    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
294        serde_json::to_string(column_metadata).unwrap()
295    }
296
297    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
298        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
299            raw: column_metadata,
300        })
301    }
302}
303
304/// Decode a record batch stream to a stream of items.
305pub fn decode_batch_stream<T: Send + 'static>(
306    mut record_batch_stream: SendableRecordBatchStream,
307    decode: fn(RecordBatch) -> Vec<T>,
308) -> BoxStream<'static, Result<T>> {
309    let stream = try_stream! {
310        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
311            for item in decode(batch) {
312                yield item;
313            }
314        }
315    };
316    Box::pin(stream)
317}
318
319/// Decode a record batch to a list of key and value.
320fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
321    let keys = batch.iter_column_as_string(0);
322    let values = batch.iter_column_as_string(1);
323    keys.zip(values)
324        .filter_map(|(k, v)| match (k, v) {
325            (Some(k), Some(v)) => Some((k, v)),
326            (Some(k), None) => Some((k, "".to_string())),
327            (None, _) => None,
328        })
329        .collect::<Vec<_>>()
330}
331
332/// Decode a record batch to a list of key.
333fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
334    batch.iter_column_as_string(0).flatten().collect::<Vec<_>>()
335}
336
337// simulate to `KvBackend`
338//
339// methods in this block assume the given region id is transformed.
340impl MetadataRegion {
341    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
342        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
343
344        let projection = if key_only {
345            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
346        } else {
347            vec![
348                METADATA_SCHEMA_KEY_COLUMN_INDEX,
349                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
350            ]
351        };
352        let projection_input = Some(projection.into());
353        ScanRequest {
354            projection_input,
355            filters: vec![filter_expr],
356            ..Default::default()
357        }
358    }
359
360    fn build_read_request() -> ScanRequest {
361        let projection = vec![
362            METADATA_SCHEMA_KEY_COLUMN_INDEX,
363            METADATA_SCHEMA_VALUE_COLUMN_INDEX,
364        ];
365        let projection_input = Some(projection.into());
366        ScanRequest {
367            projection_input,
368            ..Default::default()
369        }
370    }
371
372    async fn load_all(&self, metadata_region_id: RegionId) -> Result<RegionMetadataCacheEntry> {
373        let scan_req = MetadataRegion::build_read_request();
374        let record_batch_stream = self
375            .mito
376            .scan_to_stream(metadata_region_id, scan_req)
377            .await
378            .context(MitoReadOperationSnafu)?;
379
380        let kv = decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
381            .try_collect::<BTreeMap<_, _>>()
382            .await?;
383        let mut size = 0;
384        for (k, v) in kv.iter() {
385            size += k.len();
386            size += v.len();
387        }
388        let kv = Arc::new(kv);
389        Ok(RegionMetadataCacheEntry {
390            key_values: kv,
391            size,
392        })
393    }
394
395    async fn get_all_with_prefix(
396        &self,
397        metadata_region_id: RegionId,
398        prefix: &str,
399    ) -> Result<HashMap<String, String>> {
400        let region_metadata = self
401            .cache
402            .try_get_with(metadata_region_id, self.load_all(metadata_region_id))
403            .await
404            .context(CacheGetSnafu)?;
405
406        let mut result = HashMap::new();
407        get_all_with_prefix(&region_metadata, prefix, |k, v| {
408            result.insert(k.to_string(), v.to_string());
409            Ok(())
410        })?;
411        Ok(result)
412    }
413
414    pub async fn get_all_key_with_prefix(
415        &self,
416        region_id: RegionId,
417        prefix: &str,
418    ) -> Result<Vec<String>> {
419        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
420        let record_batch_stream = self
421            .mito
422            .scan_to_stream(region_id, scan_req)
423            .await
424            .context(MitoReadOperationSnafu)?;
425
426        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
427            .try_collect::<Vec<_>>()
428            .await
429    }
430
431    /// Delete the given keys. For performance consideration, this method
432    /// doesn't check if those keys exist or not.
433    async fn delete(&self, metadata_region_id: RegionId, keys: &[String]) -> Result<()> {
434        let delete_request = Self::build_delete_request(keys);
435        self.mito
436            .handle_request(
437                metadata_region_id,
438                store_api::region_request::RegionRequest::Delete(delete_request),
439            )
440            .await
441            .context(MitoWriteOperationSnafu)?;
442        // Invalidates the region metadata cache if any values are deleted from the metadata region.
443        self.cache.invalidate(&metadata_region_id).await;
444
445        Ok(())
446    }
447
448    pub(crate) fn build_put_request_from_iter(
449        kv: impl Iterator<Item = (String, String)>,
450    ) -> RegionPutRequest {
451        let cols = vec![
452            ColumnSchema {
453                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
454                datatype: ColumnDataType::TimestampMillisecond as _,
455                semantic_type: SemanticType::Timestamp as _,
456                ..Default::default()
457            },
458            ColumnSchema {
459                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
460                datatype: ColumnDataType::String as _,
461                semantic_type: SemanticType::Tag as _,
462                ..Default::default()
463            },
464            ColumnSchema {
465                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
466                datatype: ColumnDataType::String as _,
467                semantic_type: SemanticType::Field as _,
468                ..Default::default()
469            },
470        ];
471        let rows = Rows {
472            schema: cols,
473            rows: kv
474                .into_iter()
475                .map(|(key, value)| {
476                    row(vec![
477                        ValueData::TimestampMillisecondValue(0),
478                        ValueData::StringValue(key),
479                        ValueData::StringValue(value),
480                    ])
481                })
482                .collect(),
483        };
484
485        RegionPutRequest {
486            rows,
487            hint: None,
488            partition_expr_version: None,
489        }
490    }
491
492    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
493        let cols = vec![
494            ColumnSchema {
495                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
496                datatype: ColumnDataType::TimestampMillisecond as _,
497                semantic_type: SemanticType::Timestamp as _,
498                ..Default::default()
499            },
500            ColumnSchema {
501                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
502                datatype: ColumnDataType::String as _,
503                semantic_type: SemanticType::Tag as _,
504                ..Default::default()
505            },
506        ];
507        let rows = keys
508            .iter()
509            .map(|key| {
510                row(vec![
511                    ValueData::TimestampMillisecondValue(0),
512                    ValueData::StringValue(key.clone()),
513                ])
514            })
515            .collect();
516        let rows = Rows { schema: cols, rows };
517
518        RegionDeleteRequest {
519            rows,
520            hint: None,
521            partition_expr_version: None,
522        }
523    }
524
525    /// Add logical regions to the metadata region.
526    pub async fn add_logical_regions(
527        &self,
528        physical_region_id: RegionId,
529        write_region_id: bool,
530        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
531    ) -> Result<()> {
532        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
533        let iter = logical_regions
534            .into_iter()
535            .flat_map(|(logical_region_id, column_metadatas)| {
536                if write_region_id {
537                    Some((
538                        MetadataRegion::concat_region_key(logical_region_id),
539                        String::new(),
540                    ))
541                } else {
542                    None
543                }
544                .into_iter()
545                .chain(column_metadatas.into_iter().map(
546                    move |(name, column_metadata)| {
547                        (
548                            MetadataRegion::concat_column_key(logical_region_id, name),
549                            MetadataRegion::serialize_column_metadata(column_metadata),
550                        )
551                    },
552                ))
553            })
554            .collect::<Vec<_>>();
555
556        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
557        self.mito
558            .handle_request(
559                metadata_region_id,
560                store_api::region_request::RegionRequest::Put(put_request),
561            )
562            .await
563            .context(MitoWriteOperationSnafu)?;
564        // Invalidates the region metadata cache if any new values are put into the metadata region.
565        self.cache.invalidate(&metadata_region_id).await;
566
567        Ok(())
568    }
569
570    /// Updates logical region metadata so that any entries previously referencing
571    /// `source_region_id` are modified to reference the data region of `physical_region_id`.
572    ///
573    /// This method should be called after copying files from `source_region_id`
574    /// into the target region. It scans the metadata for the target physical
575    /// region, finds logical regions with the same region number as the source,
576    /// and reinserts region and column entries updated to use the target's
577    /// region number.
578    pub async fn transform_logical_region_metadata(
579        &self,
580        physical_region_id: RegionId,
581        source_region_id: RegionId,
582    ) -> Result<()> {
583        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
584        let data_region_id = utils::to_data_region_id(physical_region_id);
585        let logical_regions = self
586            .logical_regions(data_region_id)
587            .await?
588            .into_iter()
589            .filter(|r| r.region_number() == source_region_id.region_number())
590            .collect::<Vec<_>>();
591        if logical_regions.is_empty() {
592            info!(
593                "No logical regions found from source region {}, physical region id: {}",
594                source_region_id, physical_region_id,
595            );
596            return Ok(());
597        }
598
599        let metadata = self.load_all(metadata_region_id).await?;
600        let mut output = Vec::new();
601        for logical_region_id in &logical_regions {
602            let prefix = MetadataRegion::concat_column_key_prefix(*logical_region_id);
603            get_all_with_prefix(&metadata, &prefix, |k, v| {
604                // Safety: we have checked the prefix
605                let (src_logical_region_id, column_name) = Self::parse_column_key(k)?.unwrap();
606                // Change the region number to the data region number.
607                let new_key = MetadataRegion::concat_column_key(
608                    RegionId::new(
609                        src_logical_region_id.table_id(),
610                        data_region_id.region_number(),
611                    ),
612                    &column_name,
613                );
614                output.push((new_key, v.to_string()));
615                Ok(())
616            })?;
617
618            let new_key = MetadataRegion::concat_region_key(RegionId::new(
619                logical_region_id.table_id(),
620                data_region_id.region_number(),
621            ));
622            output.push((new_key, String::new()));
623        }
624
625        if output.is_empty() {
626            warn!(
627                "No logical regions metadata found from source region {}, physical region id: {}",
628                source_region_id, physical_region_id
629            );
630            return Ok(());
631        }
632
633        debug!(
634            "Transform logical regions metadata to physical region {}, source region: {}, transformed metadata: {}",
635            data_region_id,
636            source_region_id,
637            output.len(),
638        );
639
640        let put_request = MetadataRegion::build_put_request_from_iter(output.into_iter());
641        self.mito
642            .handle_request(
643                metadata_region_id,
644                store_api::region_request::RegionRequest::Put(put_request),
645            )
646            .await
647            .context(MitoWriteOperationSnafu)?;
648        info!(
649            "Transformed {} logical regions metadata to physical region {}, source region: {}",
650            logical_regions.len(),
651            data_region_id,
652            source_region_id
653        );
654        self.cache.invalidate(&metadata_region_id).await;
655        Ok(())
656    }
657}
658
659fn get_all_with_prefix(
660    region_metadata: &RegionMetadataCacheEntry,
661    prefix: &str,
662    mut callback: impl FnMut(&str, &str) -> Result<()>,
663) -> Result<()> {
664    let range = region_metadata.key_values.range(prefix.to_string()..);
665    for (k, v) in range {
666        if !k.starts_with(prefix) {
667            break;
668        }
669        callback(k, v)?;
670    }
671    Ok(())
672}
673
674#[cfg(test)]
675impl MetadataRegion {
676    /// Retrieves the value associated with the given key in the specified region.
677    /// Returns `Ok(None)` if the key is not found.
678    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
679        use datatypes::arrow::array::{Array, AsArray};
680
681        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
682            .eq(datafusion::prelude::lit(key));
683
684        let projection_input = Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX].into());
685        let scan_req = ScanRequest {
686            projection_input,
687            filters: vec![filter_expr],
688            ..Default::default()
689        };
690        let record_batch_stream = self
691            .mito
692            .scan_to_stream(region_id, scan_req)
693            .await
694            .context(MitoReadOperationSnafu)?;
695        let scan_result = common_recordbatch::util::collect(record_batch_stream)
696            .await
697            .context(CollectRecordBatchStreamSnafu)?;
698
699        let Some(first_batch) = scan_result.first() else {
700            return Ok(None);
701        };
702
703        let column = first_batch.column(0);
704        let column = column.as_string::<i32>();
705        let val = column.is_valid(0).then(|| column.value(0).to_string());
706
707        Ok(val)
708    }
709
710    /// Check if the given column exists. Return the semantic type if exists.
711    pub async fn column_semantic_type(
712        &self,
713        physical_region_id: RegionId,
714        logical_region_id: RegionId,
715        column_name: &str,
716    ) -> Result<Option<SemanticType>> {
717        let region_id = utils::to_metadata_region_id(physical_region_id);
718        let column_key = Self::concat_column_key(logical_region_id, column_name);
719        let semantic_type = self.get(region_id, &column_key).await?;
720        semantic_type
721            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
722            .transpose()
723    }
724}
725
726#[cfg(test)]
727mod test {
728    use datatypes::data_type::ConcreteDataType;
729    use datatypes::schema::ColumnSchema;
730
731    use super::*;
732    use crate::test_util::TestEnv;
733    use crate::utils::to_metadata_region_id;
734
735    #[test]
736    fn test_concat_table_key() {
737        let region_id = RegionId::new(1234, 7844);
738        let expected = "__region_5299989651108".to_string();
739        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
740    }
741
742    #[test]
743    fn test_concat_column_key() {
744        let region_id = RegionId::new(8489, 9184);
745        let column_name = "my_column";
746        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
747        assert_eq!(
748            MetadataRegion::concat_column_key(region_id, column_name),
749            expected
750        );
751    }
752
753    #[test]
754    fn test_parse_table_key() {
755        let region_id = RegionId::new(87474, 10607);
756        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
757        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
758
759        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
760        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
761    }
762
763    #[test]
764    fn test_parse_valid_column_key() {
765        let region_id = RegionId::new(176, 910);
766        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
767        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
768
769        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
770        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
771    }
772
773    #[test]
774    fn test_parse_invalid_column_key() {
775        let key = "__column_asdfasd_????";
776        let result = MetadataRegion::parse_column_key(key);
777        assert!(result.is_err());
778    }
779
780    #[test]
781    fn test_serialize_column_metadata() {
782        let semantic_type = SemanticType::Tag;
783        let column_metadata = ColumnMetadata {
784            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
785            semantic_type,
786            column_id: 5,
787        };
788        let old_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
789        let new_fmt = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":{\"size_type\":\"Utf8\"}},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
790        assert_eq!(
791            MetadataRegion::serialize_column_metadata(&column_metadata),
792            new_fmt
793        );
794        // Ensure both old and new formats can be deserialized.
795        assert_eq!(
796            MetadataRegion::deserialize_column_metadata(&old_fmt).unwrap(),
797            column_metadata
798        );
799        assert_eq!(
800            MetadataRegion::deserialize_column_metadata(&new_fmt).unwrap(),
801            column_metadata
802        );
803
804        let semantic_type = "\"Invalid Column Metadata\"";
805        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
806    }
807
808    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
809        HashMap::from([
810            (
811                "label1".to_string(),
812                ColumnMetadata {
813                    column_schema: ColumnSchema::new(
814                        "label1".to_string(),
815                        ConcreteDataType::string_datatype(),
816                        false,
817                    ),
818                    semantic_type: SemanticType::Tag,
819                    column_id: 5,
820                },
821            ),
822            (
823                "label2".to_string(),
824                ColumnMetadata {
825                    column_schema: ColumnSchema::new(
826                        "label2".to_string(),
827                        ConcreteDataType::string_datatype(),
828                        false,
829                    ),
830                    semantic_type: SemanticType::Tag,
831                    column_id: 5,
832                },
833            ),
834        ])
835    }
836
837    #[tokio::test]
838    async fn add_logical_regions_to_meta_region() {
839        let env = TestEnv::new().await;
840        env.init_metric_region().await;
841        let metadata_region = env.metadata_region();
842        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
843        let column_metadatas = test_column_metadatas();
844        let logical_region_id = RegionId::new(1024, 1);
845
846        let iter = vec![(
847            logical_region_id,
848            column_metadatas
849                .iter()
850                .map(|(k, v)| (k.as_str(), v))
851                .collect::<HashMap<_, _>>(),
852        )];
853        metadata_region
854            .add_logical_regions(physical_region_id, true, iter.into_iter())
855            .await
856            .unwrap();
857        // Add logical region again.
858        let iter = vec![(
859            logical_region_id,
860            column_metadatas
861                .iter()
862                .map(|(k, v)| (k.as_str(), v))
863                .collect::<HashMap<_, _>>(),
864        )];
865        metadata_region
866            .add_logical_regions(physical_region_id, true, iter.into_iter())
867            .await
868            .unwrap();
869
870        // Check if the logical region is added.
871        let logical_regions = metadata_region
872            .logical_regions(physical_region_id)
873            .await
874            .unwrap();
875        assert_eq!(logical_regions.len(), 2);
876
877        // Check if the logical region columns are added.
878        let logical_columns = metadata_region
879            .logical_columns(physical_region_id, logical_region_id)
880            .await
881            .unwrap()
882            .into_iter()
883            .collect::<HashMap<_, _>>();
884        assert_eq!(logical_columns.len(), 2);
885        assert_eq!(column_metadatas, logical_columns);
886    }
887}