metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
21use async_stream::try_stream;
22use base64::engine::general_purpose::STANDARD_NO_PAD;
23use base64::Engine;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::prelude::{col, lit};
26use futures_util::stream::BoxStream;
27use futures_util::TryStreamExt;
28use mito2::engine::MitoEngine;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
34    METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
38use store_api::storage::{RegionId, ScanRequest};
39use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
40
41use crate::error::{
42    CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
43    LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
44    ParseRegionIdSnafu, Result,
45};
46use crate::utils;
47
48const REGION_PREFIX: &str = "__region_";
49const COLUMN_PREFIX: &str = "__column_";
50
51/// The other two fields key and value will be used as a k-v storage.
52/// It contains two group of key:
53/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
54/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
55///   the value is column's semantic type. To avoid the key conflict, this column key
56///   will be encoded by base64([STANDARD_NO_PAD]).
57///
58/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
59/// will handle all the metadata related operations across physical tables. Thus
60/// every operation should be associated to a [RegionId], which is the physical
61/// table id + region sequence. This handler will transform the region group by
62/// itself.
63pub struct MetadataRegion {
64    pub(crate) mito: MitoEngine,
65    /// Logical lock for operations that need to be serialized. Like update & read region columns.
66    ///
67    /// Region entry will be registered on creating and opening logical region, and deregistered on
68    /// removing logical region.
69    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
70}
71
72impl MetadataRegion {
73    pub fn new(mito: MitoEngine) -> Self {
74        Self {
75            mito,
76            logical_region_lock: RwLock::new(HashMap::new()),
77        }
78    }
79
80    /// Open a logical region.
81    ///
82    /// Returns true if the logical region is opened for the first time.
83    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
84        match self
85            .logical_region_lock
86            .write()
87            .await
88            .entry(logical_region_id)
89        {
90            Entry::Occupied(_) => false,
91            Entry::Vacant(vacant_entry) => {
92                vacant_entry.insert(Arc::new(RwLock::new(())));
93                true
94            }
95        }
96    }
97
98    /// Retrieve a read lock guard of given logical region id.
99    pub async fn read_lock_logical_region(
100        &self,
101        logical_region_id: RegionId,
102    ) -> Result<OwnedRwLockReadGuard<()>> {
103        let lock = self
104            .logical_region_lock
105            .read()
106            .await
107            .get(&logical_region_id)
108            .context(LogicalRegionNotFoundSnafu {
109                region_id: logical_region_id,
110            })?
111            .clone();
112        Ok(RwLock::read_owned(lock).await)
113    }
114
115    /// Retrieve a write lock guard of given logical region id.
116    pub async fn write_lock_logical_region(
117        &self,
118        logical_region_id: RegionId,
119    ) -> Result<OwnedRwLockWriteGuard<()>> {
120        let lock = self
121            .logical_region_lock
122            .read()
123            .await
124            .get(&logical_region_id)
125            .context(LogicalRegionNotFoundSnafu {
126                region_id: logical_region_id,
127            })?
128            .clone();
129        Ok(RwLock::write_owned(lock).await)
130    }
131
132    /// Remove a registered logical region from metadata.
133    ///
134    /// This method doesn't check if the previous key exists.
135    pub async fn remove_logical_region(
136        &self,
137        physical_region_id: RegionId,
138        logical_region_id: RegionId,
139    ) -> Result<()> {
140        // concat region key
141        let region_id = utils::to_metadata_region_id(physical_region_id);
142        let region_key = Self::concat_region_key(logical_region_id);
143
144        // concat column keys
145        let logical_columns = self
146            .logical_columns(physical_region_id, logical_region_id)
147            .await?;
148        let mut column_keys = logical_columns
149            .into_iter()
150            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
151            .collect::<Vec<_>>();
152
153        // remove region key and column keys
154        column_keys.push(region_key);
155        self.delete(region_id, &column_keys).await?;
156
157        self.logical_region_lock
158            .write()
159            .await
160            .remove(&logical_region_id);
161
162        Ok(())
163    }
164
165    // TODO(ruihang): avoid using `get_all`
166    /// Get all the columns of a given logical region.
167    /// Return a list of (column_name, column_metadata).
168    pub async fn logical_columns(
169        &self,
170        physical_region_id: RegionId,
171        logical_region_id: RegionId,
172    ) -> Result<Vec<(String, ColumnMetadata)>> {
173        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
174        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
175
176        let mut columns = vec![];
177        for (k, v) in self
178            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
179            .await?
180        {
181            if !k.starts_with(&region_column_prefix) {
182                continue;
183            }
184            // Safety: we have checked the prefix
185            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
186            let column_metadata = Self::deserialize_column_metadata(&v)?;
187            columns.push((column_name, column_metadata));
188        }
189
190        Ok(columns)
191    }
192
193    /// Return all logical regions associated with the physical region.
194    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
195        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
196
197        let mut regions = vec![];
198        for k in self
199            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
200            .await?
201        {
202            if !k.starts_with(REGION_PREFIX) {
203                continue;
204            }
205            // Safety: we have checked the prefix
206            let region_id = Self::parse_region_key(&k).unwrap();
207            let region_id = region_id.parse::<u64>().unwrap().into();
208            regions.push(region_id);
209        }
210
211        Ok(regions)
212    }
213}
214
215// utils to concat and parse key/value
216impl MetadataRegion {
217    pub fn concat_region_key(region_id: RegionId) -> String {
218        format!("{REGION_PREFIX}{}", region_id.as_u64())
219    }
220
221    /// Column name will be encoded by base64([STANDARD_NO_PAD])
222    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
223        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
224        format!(
225            "{COLUMN_PREFIX}{}_{}",
226            region_id.as_u64(),
227            encoded_column_name
228        )
229    }
230
231    /// Concat a column key prefix without column name
232    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
233        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
234    }
235
236    pub fn parse_region_key(key: &str) -> Option<&str> {
237        key.strip_prefix(REGION_PREFIX)
238    }
239
240    /// Parse column key to (logical_region_id, column_name)
241    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
242        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
243            let mut iter = stripped.split('_');
244
245            let region_id_raw = iter.next().unwrap();
246            let region_id = region_id_raw
247                .parse::<u64>()
248                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
249                .into();
250
251            let encoded_column_name = iter.next().unwrap();
252            let column_name = STANDARD_NO_PAD
253                .decode(encoded_column_name)
254                .context(DecodeColumnValueSnafu)?;
255
256            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
257        } else {
258            Ok(None)
259        }
260    }
261
262    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
263        serde_json::to_string(column_metadata).unwrap()
264    }
265
266    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
267        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
268            raw: column_metadata,
269        })
270    }
271}
272
273/// Decode a record batch stream to a stream of items.
274pub fn decode_batch_stream<T: Send + 'static>(
275    mut record_batch_stream: SendableRecordBatchStream,
276    decode: fn(RecordBatch) -> Vec<T>,
277) -> BoxStream<'static, Result<T>> {
278    let stream = try_stream! {
279        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
280            for item in decode(batch) {
281                yield item;
282            }
283        }
284    };
285    Box::pin(stream)
286}
287
288/// Decode a record batch to a list of key and value.
289fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
290    let key_col = batch.column(0);
291    let val_col = batch.column(1);
292
293    (0..batch.num_rows())
294        .flat_map(move |row_index| {
295            let key = key_col
296                .get_ref(row_index)
297                .as_string()
298                .unwrap()
299                .map(|s| s.to_string());
300
301            key.map(|k| {
302                (
303                    k,
304                    val_col
305                        .get_ref(row_index)
306                        .as_string()
307                        .unwrap()
308                        .map(|s| s.to_string())
309                        .unwrap_or_default(),
310                )
311            })
312        })
313        .collect()
314}
315
316/// Decode a record batch to a list of key.
317fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
318    let key_col = batch.column(0);
319
320    (0..batch.num_rows())
321        .flat_map(move |row_index| {
322            let key = key_col
323                .get_ref(row_index)
324                .as_string()
325                .unwrap()
326                .map(|s| s.to_string());
327            key
328        })
329        .collect()
330}
331
332// simulate to `KvBackend`
333//
334// methods in this block assume the given region id is transformed.
335impl MetadataRegion {
336    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
337        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
338
339        let projection = if key_only {
340            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
341        } else {
342            vec![
343                METADATA_SCHEMA_KEY_COLUMN_INDEX,
344                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
345            ]
346        };
347        ScanRequest {
348            projection: Some(projection),
349            filters: vec![filter_expr],
350            ..Default::default()
351        }
352    }
353
354    pub async fn get_all_with_prefix(
355        &self,
356        region_id: RegionId,
357        prefix: &str,
358    ) -> Result<HashMap<String, String>> {
359        let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
360        let record_batch_stream = self
361            .mito
362            .scan_to_stream(region_id, scan_req)
363            .await
364            .context(MitoReadOperationSnafu)?;
365
366        decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
367            .try_collect::<HashMap<_, _>>()
368            .await
369    }
370
371    pub async fn get_all_key_with_prefix(
372        &self,
373        region_id: RegionId,
374        prefix: &str,
375    ) -> Result<Vec<String>> {
376        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
377        let record_batch_stream = self
378            .mito
379            .scan_to_stream(region_id, scan_req)
380            .await
381            .context(MitoReadOperationSnafu)?;
382
383        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
384            .try_collect::<Vec<_>>()
385            .await
386    }
387
388    /// Delete the given keys. For performance consideration, this method
389    /// doesn't check if those keys exist or not.
390    async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
391        let delete_request = Self::build_delete_request(keys);
392        self.mito
393            .handle_request(
394                region_id,
395                store_api::region_request::RegionRequest::Delete(delete_request),
396            )
397            .await
398            .context(MitoWriteOperationSnafu)?;
399        Ok(())
400    }
401
402    pub(crate) fn build_put_request_from_iter(
403        kv: impl Iterator<Item = (String, String)>,
404    ) -> RegionPutRequest {
405        let cols = vec![
406            ColumnSchema {
407                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
408                datatype: ColumnDataType::TimestampMillisecond as _,
409                semantic_type: SemanticType::Timestamp as _,
410                ..Default::default()
411            },
412            ColumnSchema {
413                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
414                datatype: ColumnDataType::String as _,
415                semantic_type: SemanticType::Tag as _,
416                ..Default::default()
417            },
418            ColumnSchema {
419                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
420                datatype: ColumnDataType::String as _,
421                semantic_type: SemanticType::Field as _,
422                ..Default::default()
423            },
424        ];
425        let rows = Rows {
426            schema: cols,
427            rows: kv
428                .into_iter()
429                .map(|(key, value)| Row {
430                    values: vec![
431                        Value {
432                            value_data: Some(ValueData::TimestampMillisecondValue(0)),
433                        },
434                        Value {
435                            value_data: Some(ValueData::StringValue(key)),
436                        },
437                        Value {
438                            value_data: Some(ValueData::StringValue(value)),
439                        },
440                    ],
441                })
442                .collect(),
443        };
444
445        RegionPutRequest { rows, hint: None }
446    }
447
448    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
449        let cols = vec![
450            ColumnSchema {
451                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
452                datatype: ColumnDataType::TimestampMillisecond as _,
453                semantic_type: SemanticType::Timestamp as _,
454                ..Default::default()
455            },
456            ColumnSchema {
457                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
458                datatype: ColumnDataType::String as _,
459                semantic_type: SemanticType::Tag as _,
460                ..Default::default()
461            },
462        ];
463        let rows = keys
464            .iter()
465            .map(|key| Row {
466                values: vec![
467                    Value {
468                        value_data: Some(ValueData::TimestampMillisecondValue(0)),
469                    },
470                    Value {
471                        value_data: Some(ValueData::StringValue(key.to_string())),
472                    },
473                ],
474            })
475            .collect();
476        let rows = Rows { schema: cols, rows };
477
478        RegionDeleteRequest { rows }
479    }
480
481    /// Add logical regions to the metadata region.
482    pub async fn add_logical_regions(
483        &self,
484        physical_region_id: RegionId,
485        write_region_id: bool,
486        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
487    ) -> Result<()> {
488        let region_id = utils::to_metadata_region_id(physical_region_id);
489        let iter = logical_regions
490            .into_iter()
491            .flat_map(|(logical_region_id, column_metadatas)| {
492                if write_region_id {
493                    Some((
494                        MetadataRegion::concat_region_key(logical_region_id),
495                        String::new(),
496                    ))
497                } else {
498                    None
499                }
500                .into_iter()
501                .chain(column_metadatas.into_iter().map(
502                    move |(name, column_metadata)| {
503                        (
504                            MetadataRegion::concat_column_key(logical_region_id, name),
505                            MetadataRegion::serialize_column_metadata(column_metadata),
506                        )
507                    },
508                ))
509            })
510            .collect::<Vec<_>>();
511
512        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
513        self.mito
514            .handle_request(
515                region_id,
516                store_api::region_request::RegionRequest::Put(put_request),
517            )
518            .await
519            .context(MitoWriteOperationSnafu)?;
520
521        Ok(())
522    }
523}
524
525#[cfg(test)]
526impl MetadataRegion {
527    /// Retrieves the value associated with the given key in the specified region.
528    /// Returns `Ok(None)` if the key is not found.
529    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
530        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
531            .eq(datafusion::prelude::lit(key));
532
533        let scan_req = ScanRequest {
534            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
535            filters: vec![filter_expr],
536            ..Default::default()
537        };
538        let record_batch_stream = self
539            .mito
540            .scan_to_stream(region_id, scan_req)
541            .await
542            .context(MitoReadOperationSnafu)?;
543        let scan_result = common_recordbatch::util::collect(record_batch_stream)
544            .await
545            .context(CollectRecordBatchStreamSnafu)?;
546
547        let Some(first_batch) = scan_result.first() else {
548            return Ok(None);
549        };
550
551        let val = first_batch
552            .column(0)
553            .get_ref(0)
554            .as_string()
555            .unwrap()
556            .map(|s| s.to_string());
557
558        Ok(val)
559    }
560
561    /// Check if the given column exists. Return the semantic type if exists.
562    pub async fn column_semantic_type(
563        &self,
564        physical_region_id: RegionId,
565        logical_region_id: RegionId,
566        column_name: &str,
567    ) -> Result<Option<SemanticType>> {
568        let region_id = utils::to_metadata_region_id(physical_region_id);
569        let column_key = Self::concat_column_key(logical_region_id, column_name);
570        let semantic_type = self.get(region_id, &column_key).await?;
571        semantic_type
572            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
573            .transpose()
574    }
575}
576
577#[cfg(test)]
578mod test {
579    use datatypes::data_type::ConcreteDataType;
580    use datatypes::schema::ColumnSchema;
581
582    use super::*;
583    use crate::test_util::TestEnv;
584    use crate::utils::to_metadata_region_id;
585
586    #[test]
587    fn test_concat_table_key() {
588        let region_id = RegionId::new(1234, 7844);
589        let expected = "__region_5299989651108".to_string();
590        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
591    }
592
593    #[test]
594    fn test_concat_column_key() {
595        let region_id = RegionId::new(8489, 9184);
596        let column_name = "my_column";
597        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
598        assert_eq!(
599            MetadataRegion::concat_column_key(region_id, column_name),
600            expected
601        );
602    }
603
604    #[test]
605    fn test_parse_table_key() {
606        let region_id = RegionId::new(87474, 10607);
607        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
608        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
609
610        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
611        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
612    }
613
614    #[test]
615    fn test_parse_valid_column_key() {
616        let region_id = RegionId::new(176, 910);
617        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
618        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
619
620        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
621        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
622    }
623
624    #[test]
625    fn test_parse_invalid_column_key() {
626        let key = "__column_asdfasd_????";
627        let result = MetadataRegion::parse_column_key(key);
628        assert!(result.is_err());
629    }
630
631    #[test]
632    fn test_serialize_column_metadata() {
633        let semantic_type = SemanticType::Tag;
634        let column_metadata = ColumnMetadata {
635            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
636            semantic_type,
637            column_id: 5,
638        };
639        let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
640        assert_eq!(
641            MetadataRegion::serialize_column_metadata(&column_metadata),
642            expected
643        );
644
645        let semantic_type = "\"Invalid Column Metadata\"";
646        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
647    }
648
649    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
650        HashMap::from([
651            (
652                "label1".to_string(),
653                ColumnMetadata {
654                    column_schema: ColumnSchema::new(
655                        "label1".to_string(),
656                        ConcreteDataType::string_datatype(),
657                        false,
658                    ),
659                    semantic_type: SemanticType::Tag,
660                    column_id: 5,
661                },
662            ),
663            (
664                "label2".to_string(),
665                ColumnMetadata {
666                    column_schema: ColumnSchema::new(
667                        "label2".to_string(),
668                        ConcreteDataType::string_datatype(),
669                        false,
670                    ),
671                    semantic_type: SemanticType::Tag,
672                    column_id: 5,
673                },
674            ),
675        ])
676    }
677
678    #[tokio::test]
679    async fn add_logical_regions_to_meta_region() {
680        let env = TestEnv::new().await;
681        env.init_metric_region().await;
682        let metadata_region = env.metadata_region();
683        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
684        let column_metadatas = test_column_metadatas();
685        let logical_region_id = RegionId::new(1024, 1);
686
687        let iter = vec![(
688            logical_region_id,
689            column_metadatas
690                .iter()
691                .map(|(k, v)| (k.as_str(), v))
692                .collect::<HashMap<_, _>>(),
693        )];
694        metadata_region
695            .add_logical_regions(physical_region_id, true, iter.into_iter())
696            .await
697            .unwrap();
698        // Add logical region again.
699        let iter = vec![(
700            logical_region_id,
701            column_metadatas
702                .iter()
703                .map(|(k, v)| (k.as_str(), v))
704                .collect::<HashMap<_, _>>(),
705        )];
706        metadata_region
707            .add_logical_regions(physical_region_id, true, iter.into_iter())
708            .await
709            .unwrap();
710
711        // Check if the logical region is added.
712        let logical_regions = metadata_region
713            .logical_regions(physical_region_id)
714            .await
715            .unwrap();
716        assert_eq!(logical_regions.len(), 2);
717
718        // Check if the logical region columns are added.
719        let logical_columns = metadata_region
720            .logical_columns(physical_region_id, logical_region_id)
721            .await
722            .unwrap()
723            .into_iter()
724            .collect::<HashMap<_, _>>();
725        assert_eq!(logical_columns.len(), 2);
726        assert_eq!(column_metadatas, logical_columns);
727    }
728}