metric_engine/
metadata_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::sync::Arc;
18
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, Rows, SemanticType, Value};
21use async_stream::try_stream;
22use base64::engine::general_purpose::STANDARD_NO_PAD;
23use base64::Engine;
24use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
25use datafusion::prelude::{col, lit};
26use futures_util::stream::BoxStream;
27use futures_util::TryStreamExt;
28use mito2::engine::MitoEngine;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::ColumnMetadata;
31use store_api::metric_engine_consts::{
32    METADATA_SCHEMA_KEY_COLUMN_INDEX, METADATA_SCHEMA_KEY_COLUMN_NAME,
33    METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME, METADATA_SCHEMA_VALUE_COLUMN_INDEX,
34    METADATA_SCHEMA_VALUE_COLUMN_NAME,
35};
36use store_api::region_engine::RegionEngine;
37use store_api::region_request::{RegionDeleteRequest, RegionPutRequest};
38use store_api::storage::{RegionId, ScanRequest};
39use tokio::sync::{OwnedRwLockReadGuard, OwnedRwLockWriteGuard, RwLock};
40
41use crate::error::{
42    CollectRecordBatchStreamSnafu, DecodeColumnValueSnafu, DeserializeColumnMetadataSnafu,
43    LogicalRegionNotFoundSnafu, MitoReadOperationSnafu, MitoWriteOperationSnafu,
44    ParseRegionIdSnafu, Result,
45};
46use crate::utils;
47
48const REGION_PREFIX: &str = "__region_";
49const COLUMN_PREFIX: &str = "__column_";
50
51/// The other two fields key and value will be used as a k-v storage.
52/// It contains two group of key:
53/// - `__region_<LOGICAL_REGION_ID>` is used for marking table existence. It doesn't have value.
54/// - `__column_<LOGICAL_REGION_ID>_<COLUMN_NAME>` is used for marking column existence,
55///   the value is column's semantic type. To avoid the key conflict, this column key
56///   will be encoded by base64([STANDARD_NO_PAD]).
57///
58/// This is a generic handler like [MetricEngine](crate::engine::MetricEngine). It
59/// will handle all the metadata related operations across physical tables. Thus
60/// every operation should be associated to a [RegionId], which is the physical
61/// table id + region sequence. This handler will transform the region group by
62/// itself.
63pub struct MetadataRegion {
64    pub(crate) mito: MitoEngine,
65    /// Logical lock for operations that need to be serialized. Like update & read region columns.
66    ///
67    /// Region entry will be registered on creating and opening logical region, and deregistered on
68    /// removing logical region.
69    logical_region_lock: RwLock<HashMap<RegionId, Arc<RwLock<()>>>>,
70}
71
72impl MetadataRegion {
73    pub fn new(mito: MitoEngine) -> Self {
74        Self {
75            mito,
76            logical_region_lock: RwLock::new(HashMap::new()),
77        }
78    }
79
80    /// Open a logical region.
81    ///
82    /// Returns true if the logical region is opened for the first time.
83    pub async fn open_logical_region(&self, logical_region_id: RegionId) -> bool {
84        match self
85            .logical_region_lock
86            .write()
87            .await
88            .entry(logical_region_id)
89        {
90            Entry::Occupied(_) => false,
91            Entry::Vacant(vacant_entry) => {
92                vacant_entry.insert(Arc::new(RwLock::new(())));
93                true
94            }
95        }
96    }
97
98    /// Retrieve a read lock guard of given logical region id.
99    pub async fn read_lock_logical_region(
100        &self,
101        logical_region_id: RegionId,
102    ) -> Result<OwnedRwLockReadGuard<()>> {
103        let lock = self
104            .logical_region_lock
105            .read()
106            .await
107            .get(&logical_region_id)
108            .context(LogicalRegionNotFoundSnafu {
109                region_id: logical_region_id,
110            })?
111            .clone();
112        Ok(RwLock::read_owned(lock).await)
113    }
114
115    /// Retrieve a write lock guard of given logical region id.
116    pub async fn write_lock_logical_region(
117        &self,
118        logical_region_id: RegionId,
119    ) -> Result<OwnedRwLockWriteGuard<()>> {
120        let lock = self
121            .logical_region_lock
122            .read()
123            .await
124            .get(&logical_region_id)
125            .context(LogicalRegionNotFoundSnafu {
126                region_id: logical_region_id,
127            })?
128            .clone();
129        Ok(RwLock::write_owned(lock).await)
130    }
131
132    /// Remove a registered logical region from metadata.
133    ///
134    /// This method doesn't check if the previous key exists.
135    pub async fn remove_logical_region(
136        &self,
137        physical_region_id: RegionId,
138        logical_region_id: RegionId,
139    ) -> Result<()> {
140        // concat region key
141        let region_id = utils::to_metadata_region_id(physical_region_id);
142        let region_key = Self::concat_region_key(logical_region_id);
143
144        // concat column keys
145        let logical_columns = self
146            .logical_columns(physical_region_id, logical_region_id)
147            .await?;
148        let mut column_keys = logical_columns
149            .into_iter()
150            .map(|(col, _)| Self::concat_column_key(logical_region_id, &col))
151            .collect::<Vec<_>>();
152
153        // remove region key and column keys
154        column_keys.push(region_key);
155        self.delete(region_id, &column_keys).await?;
156
157        self.logical_region_lock
158            .write()
159            .await
160            .remove(&logical_region_id);
161
162        Ok(())
163    }
164
165    // TODO(ruihang): avoid using `get_all`
166    /// Get all the columns of a given logical region.
167    /// Return a list of (column_name, column_metadata).
168    pub async fn logical_columns(
169        &self,
170        physical_region_id: RegionId,
171        logical_region_id: RegionId,
172    ) -> Result<Vec<(String, ColumnMetadata)>> {
173        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
174        let region_column_prefix = Self::concat_column_key_prefix(logical_region_id);
175
176        let mut columns = vec![];
177        for (k, v) in self
178            .get_all_with_prefix(metadata_region_id, &region_column_prefix)
179            .await?
180        {
181            if !k.starts_with(&region_column_prefix) {
182                continue;
183            }
184            // Safety: we have checked the prefix
185            let (_, column_name) = Self::parse_column_key(&k)?.unwrap();
186            let column_metadata = Self::deserialize_column_metadata(&v)?;
187            columns.push((column_name, column_metadata));
188        }
189
190        Ok(columns)
191    }
192
193    /// Return all logical regions associated with the physical region.
194    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
195        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
196
197        let mut regions = vec![];
198        for k in self
199            .get_all_key_with_prefix(metadata_region_id, REGION_PREFIX)
200            .await?
201        {
202            if !k.starts_with(REGION_PREFIX) {
203                continue;
204            }
205            // Safety: we have checked the prefix
206            let region_id = Self::parse_region_key(&k).unwrap();
207            let region_id = region_id.parse::<u64>().unwrap().into();
208            regions.push(region_id);
209        }
210
211        Ok(regions)
212    }
213}
214
215// utils to concat and parse key/value
216impl MetadataRegion {
217    pub fn concat_region_key(region_id: RegionId) -> String {
218        format!("{REGION_PREFIX}{}", region_id.as_u64())
219    }
220
221    /// Column name will be encoded by base64([STANDARD_NO_PAD])
222    pub fn concat_column_key(region_id: RegionId, column_name: &str) -> String {
223        let encoded_column_name = STANDARD_NO_PAD.encode(column_name);
224        format!(
225            "{COLUMN_PREFIX}{}_{}",
226            region_id.as_u64(),
227            encoded_column_name
228        )
229    }
230
231    /// Concat a column key prefix without column name
232    pub fn concat_column_key_prefix(region_id: RegionId) -> String {
233        format!("{COLUMN_PREFIX}{}_", region_id.as_u64())
234    }
235
236    pub fn parse_region_key(key: &str) -> Option<&str> {
237        key.strip_prefix(REGION_PREFIX)
238    }
239
240    /// Parse column key to (logical_region_id, column_name)
241    pub fn parse_column_key(key: &str) -> Result<Option<(RegionId, String)>> {
242        if let Some(stripped) = key.strip_prefix(COLUMN_PREFIX) {
243            let mut iter = stripped.split('_');
244
245            let region_id_raw = iter.next().unwrap();
246            let region_id = region_id_raw
247                .parse::<u64>()
248                .with_context(|_| ParseRegionIdSnafu { raw: region_id_raw })?
249                .into();
250
251            let encoded_column_name = iter.next().unwrap();
252            let column_name = STANDARD_NO_PAD
253                .decode(encoded_column_name)
254                .context(DecodeColumnValueSnafu)?;
255
256            Ok(Some((region_id, String::from_utf8(column_name).unwrap())))
257        } else {
258            Ok(None)
259        }
260    }
261
262    pub fn serialize_column_metadata(column_metadata: &ColumnMetadata) -> String {
263        serde_json::to_string(column_metadata).unwrap()
264    }
265
266    pub fn deserialize_column_metadata(column_metadata: &str) -> Result<ColumnMetadata> {
267        serde_json::from_str(column_metadata).with_context(|_| DeserializeColumnMetadataSnafu {
268            raw: column_metadata,
269        })
270    }
271}
272
273/// Decode a record batch stream to a stream of items.
274pub fn decode_batch_stream<T: Send + 'static>(
275    mut record_batch_stream: SendableRecordBatchStream,
276    decode: fn(RecordBatch) -> Vec<T>,
277) -> BoxStream<'static, Result<T>> {
278    let stream = try_stream! {
279        while let Some(batch) = record_batch_stream.try_next().await.context(CollectRecordBatchStreamSnafu)? {
280            for item in decode(batch) {
281                yield item;
282            }
283        }
284    };
285    Box::pin(stream)
286}
287
288/// Decode a record batch to a list of key and value.
289fn decode_record_batch_to_key_and_value(batch: RecordBatch) -> Vec<(String, String)> {
290    let key_col = batch.column(0);
291    let val_col = batch.column(1);
292
293    (0..batch.num_rows())
294        .flat_map(move |row_index| {
295            let key = key_col
296                .get_ref(row_index)
297                .as_string()
298                .unwrap()
299                .map(|s| s.to_string());
300
301            key.map(|k| {
302                (
303                    k,
304                    val_col
305                        .get_ref(row_index)
306                        .as_string()
307                        .unwrap()
308                        .map(|s| s.to_string())
309                        .unwrap_or_default(),
310                )
311            })
312        })
313        .collect()
314}
315
316/// Decode a record batch to a list of key.
317fn decode_record_batch_to_key(batch: RecordBatch) -> Vec<String> {
318    let key_col = batch.column(0);
319
320    (0..batch.num_rows())
321        .flat_map(move |row_index| {
322            let key = key_col
323                .get_ref(row_index)
324                .as_string()
325                .unwrap()
326                .map(|s| s.to_string());
327            key
328        })
329        .collect()
330}
331
332// simulate to `KvBackend`
333//
334// methods in this block assume the given region id is transformed.
335impl MetadataRegion {
336    fn build_prefix_read_request(prefix: &str, key_only: bool) -> ScanRequest {
337        let filter_expr = col(METADATA_SCHEMA_KEY_COLUMN_NAME).like(lit(prefix));
338
339        let projection = if key_only {
340            vec![METADATA_SCHEMA_KEY_COLUMN_INDEX]
341        } else {
342            vec![
343                METADATA_SCHEMA_KEY_COLUMN_INDEX,
344                METADATA_SCHEMA_VALUE_COLUMN_INDEX,
345            ]
346        };
347        ScanRequest {
348            projection: Some(projection),
349            filters: vec![filter_expr],
350            output_ordering: None,
351            limit: None,
352            series_row_selector: None,
353            sequence: None,
354            distribution: None,
355        }
356    }
357
358    pub async fn get_all_with_prefix(
359        &self,
360        region_id: RegionId,
361        prefix: &str,
362    ) -> Result<HashMap<String, String>> {
363        let scan_req = MetadataRegion::build_prefix_read_request(prefix, false);
364        let record_batch_stream = self
365            .mito
366            .scan_to_stream(region_id, scan_req)
367            .await
368            .context(MitoReadOperationSnafu)?;
369
370        decode_batch_stream(record_batch_stream, decode_record_batch_to_key_and_value)
371            .try_collect::<HashMap<_, _>>()
372            .await
373    }
374
375    pub async fn get_all_key_with_prefix(
376        &self,
377        region_id: RegionId,
378        prefix: &str,
379    ) -> Result<Vec<String>> {
380        let scan_req = MetadataRegion::build_prefix_read_request(prefix, true);
381        let record_batch_stream = self
382            .mito
383            .scan_to_stream(region_id, scan_req)
384            .await
385            .context(MitoReadOperationSnafu)?;
386
387        decode_batch_stream(record_batch_stream, decode_record_batch_to_key)
388            .try_collect::<Vec<_>>()
389            .await
390    }
391
392    /// Delete the given keys. For performance consideration, this method
393    /// doesn't check if those keys exist or not.
394    async fn delete(&self, region_id: RegionId, keys: &[String]) -> Result<()> {
395        let delete_request = Self::build_delete_request(keys);
396        self.mito
397            .handle_request(
398                region_id,
399                store_api::region_request::RegionRequest::Delete(delete_request),
400            )
401            .await
402            .context(MitoWriteOperationSnafu)?;
403        Ok(())
404    }
405
406    pub(crate) fn build_put_request_from_iter(
407        kv: impl Iterator<Item = (String, String)>,
408    ) -> RegionPutRequest {
409        let cols = vec![
410            ColumnSchema {
411                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
412                datatype: ColumnDataType::TimestampMillisecond as _,
413                semantic_type: SemanticType::Timestamp as _,
414                ..Default::default()
415            },
416            ColumnSchema {
417                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
418                datatype: ColumnDataType::String as _,
419                semantic_type: SemanticType::Tag as _,
420                ..Default::default()
421            },
422            ColumnSchema {
423                column_name: METADATA_SCHEMA_VALUE_COLUMN_NAME.to_string(),
424                datatype: ColumnDataType::String as _,
425                semantic_type: SemanticType::Field as _,
426                ..Default::default()
427            },
428        ];
429        let rows = Rows {
430            schema: cols,
431            rows: kv
432                .into_iter()
433                .map(|(key, value)| Row {
434                    values: vec![
435                        Value {
436                            value_data: Some(ValueData::TimestampMillisecondValue(0)),
437                        },
438                        Value {
439                            value_data: Some(ValueData::StringValue(key)),
440                        },
441                        Value {
442                            value_data: Some(ValueData::StringValue(value)),
443                        },
444                    ],
445                })
446                .collect(),
447        };
448
449        RegionPutRequest { rows, hint: None }
450    }
451
452    fn build_delete_request(keys: &[String]) -> RegionDeleteRequest {
453        let cols = vec![
454            ColumnSchema {
455                column_name: METADATA_SCHEMA_TIMESTAMP_COLUMN_NAME.to_string(),
456                datatype: ColumnDataType::TimestampMillisecond as _,
457                semantic_type: SemanticType::Timestamp as _,
458                ..Default::default()
459            },
460            ColumnSchema {
461                column_name: METADATA_SCHEMA_KEY_COLUMN_NAME.to_string(),
462                datatype: ColumnDataType::String as _,
463                semantic_type: SemanticType::Tag as _,
464                ..Default::default()
465            },
466        ];
467        let rows = keys
468            .iter()
469            .map(|key| Row {
470                values: vec![
471                    Value {
472                        value_data: Some(ValueData::TimestampMillisecondValue(0)),
473                    },
474                    Value {
475                        value_data: Some(ValueData::StringValue(key.to_string())),
476                    },
477                ],
478            })
479            .collect();
480        let rows = Rows { schema: cols, rows };
481
482        RegionDeleteRequest { rows }
483    }
484
485    /// Add logical regions to the metadata region.
486    pub async fn add_logical_regions(
487        &self,
488        physical_region_id: RegionId,
489        write_region_id: bool,
490        logical_regions: impl Iterator<Item = (RegionId, HashMap<&str, &ColumnMetadata>)>,
491    ) -> Result<()> {
492        let region_id = utils::to_metadata_region_id(physical_region_id);
493        let iter = logical_regions
494            .into_iter()
495            .flat_map(|(logical_region_id, column_metadatas)| {
496                if write_region_id {
497                    Some((
498                        MetadataRegion::concat_region_key(logical_region_id),
499                        String::new(),
500                    ))
501                } else {
502                    None
503                }
504                .into_iter()
505                .chain(column_metadatas.into_iter().map(
506                    move |(name, column_metadata)| {
507                        (
508                            MetadataRegion::concat_column_key(logical_region_id, name),
509                            MetadataRegion::serialize_column_metadata(column_metadata),
510                        )
511                    },
512                ))
513            })
514            .collect::<Vec<_>>();
515
516        let put_request = MetadataRegion::build_put_request_from_iter(iter.into_iter());
517        self.mito
518            .handle_request(
519                region_id,
520                store_api::region_request::RegionRequest::Put(put_request),
521            )
522            .await
523            .context(MitoWriteOperationSnafu)?;
524
525        Ok(())
526    }
527}
528
529#[cfg(test)]
530impl MetadataRegion {
531    /// Retrieves the value associated with the given key in the specified region.
532    /// Returns `Ok(None)` if the key is not found.
533    pub async fn get(&self, region_id: RegionId, key: &str) -> Result<Option<String>> {
534        let filter_expr = datafusion::prelude::col(METADATA_SCHEMA_KEY_COLUMN_NAME)
535            .eq(datafusion::prelude::lit(key));
536
537        let scan_req = ScanRequest {
538            projection: Some(vec![METADATA_SCHEMA_VALUE_COLUMN_INDEX]),
539            filters: vec![filter_expr],
540            output_ordering: None,
541            limit: None,
542            series_row_selector: None,
543            sequence: None,
544            distribution: None,
545        };
546        let record_batch_stream = self
547            .mito
548            .scan_to_stream(region_id, scan_req)
549            .await
550            .context(MitoReadOperationSnafu)?;
551        let scan_result = common_recordbatch::util::collect(record_batch_stream)
552            .await
553            .context(CollectRecordBatchStreamSnafu)?;
554
555        let Some(first_batch) = scan_result.first() else {
556            return Ok(None);
557        };
558
559        let val = first_batch
560            .column(0)
561            .get_ref(0)
562            .as_string()
563            .unwrap()
564            .map(|s| s.to_string());
565
566        Ok(val)
567    }
568
569    /// Check if the given column exists. Return the semantic type if exists.
570    pub async fn column_semantic_type(
571        &self,
572        physical_region_id: RegionId,
573        logical_region_id: RegionId,
574        column_name: &str,
575    ) -> Result<Option<SemanticType>> {
576        let region_id = utils::to_metadata_region_id(physical_region_id);
577        let column_key = Self::concat_column_key(logical_region_id, column_name);
578        let semantic_type = self.get(region_id, &column_key).await?;
579        semantic_type
580            .map(|s| Self::deserialize_column_metadata(&s).map(|c| c.semantic_type))
581            .transpose()
582    }
583}
584
585#[cfg(test)]
586mod test {
587    use datatypes::data_type::ConcreteDataType;
588    use datatypes::schema::ColumnSchema;
589
590    use super::*;
591    use crate::test_util::TestEnv;
592    use crate::utils::to_metadata_region_id;
593
594    #[test]
595    fn test_concat_table_key() {
596        let region_id = RegionId::new(1234, 7844);
597        let expected = "__region_5299989651108".to_string();
598        assert_eq!(MetadataRegion::concat_region_key(region_id), expected);
599    }
600
601    #[test]
602    fn test_concat_column_key() {
603        let region_id = RegionId::new(8489, 9184);
604        let column_name = "my_column";
605        let expected = "__column_36459977384928_bXlfY29sdW1u".to_string();
606        assert_eq!(
607            MetadataRegion::concat_column_key(region_id, column_name),
608            expected
609        );
610    }
611
612    #[test]
613    fn test_parse_table_key() {
614        let region_id = RegionId::new(87474, 10607);
615        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
616        assert_eq!(encoded, "__column_375697969260911_bXlfY29sdW1u");
617
618        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
619        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
620    }
621
622    #[test]
623    fn test_parse_valid_column_key() {
624        let region_id = RegionId::new(176, 910);
625        let encoded = MetadataRegion::concat_column_key(region_id, "my_column");
626        assert_eq!(encoded, "__column_755914245006_bXlfY29sdW1u");
627
628        let decoded = MetadataRegion::parse_column_key(&encoded).unwrap();
629        assert_eq!(decoded, Some((region_id, "my_column".to_string())));
630    }
631
632    #[test]
633    fn test_parse_invalid_column_key() {
634        let key = "__column_asdfasd_????";
635        let result = MetadataRegion::parse_column_key(key);
636        assert!(result.is_err());
637    }
638
639    #[test]
640    fn test_serialize_column_metadata() {
641        let semantic_type = SemanticType::Tag;
642        let column_metadata = ColumnMetadata {
643            column_schema: ColumnSchema::new("blabla", ConcreteDataType::string_datatype(), false),
644            semantic_type,
645            column_id: 5,
646        };
647        let expected = "{\"column_schema\":{\"name\":\"blabla\",\"data_type\":{\"String\":null},\"is_nullable\":false,\"is_time_index\":false,\"default_constraint\":null,\"metadata\":{}},\"semantic_type\":\"Tag\",\"column_id\":5}".to_string();
648        assert_eq!(
649            MetadataRegion::serialize_column_metadata(&column_metadata),
650            expected
651        );
652
653        let semantic_type = "\"Invalid Column Metadata\"";
654        assert!(MetadataRegion::deserialize_column_metadata(semantic_type).is_err());
655    }
656
657    fn test_column_metadatas() -> HashMap<String, ColumnMetadata> {
658        HashMap::from([
659            (
660                "label1".to_string(),
661                ColumnMetadata {
662                    column_schema: ColumnSchema::new(
663                        "label1".to_string(),
664                        ConcreteDataType::string_datatype(),
665                        false,
666                    ),
667                    semantic_type: SemanticType::Tag,
668                    column_id: 5,
669                },
670            ),
671            (
672                "label2".to_string(),
673                ColumnMetadata {
674                    column_schema: ColumnSchema::new(
675                        "label2".to_string(),
676                        ConcreteDataType::string_datatype(),
677                        false,
678                    ),
679                    semantic_type: SemanticType::Tag,
680                    column_id: 5,
681                },
682            ),
683        ])
684    }
685
686    #[tokio::test]
687    async fn add_logical_regions_to_meta_region() {
688        let env = TestEnv::new().await;
689        env.init_metric_region().await;
690        let metadata_region = env.metadata_region();
691        let physical_region_id = to_metadata_region_id(env.default_physical_region_id());
692        let column_metadatas = test_column_metadatas();
693        let logical_region_id = RegionId::new(1024, 1);
694
695        let iter = vec![(
696            logical_region_id,
697            column_metadatas
698                .iter()
699                .map(|(k, v)| (k.as_str(), v))
700                .collect::<HashMap<_, _>>(),
701        )];
702        metadata_region
703            .add_logical_regions(physical_region_id, true, iter.into_iter())
704            .await
705            .unwrap();
706        // Add logical region again.
707        let iter = vec![(
708            logical_region_id,
709            column_metadatas
710                .iter()
711                .map(|(k, v)| (k.as_str(), v))
712                .collect::<HashMap<_, _>>(),
713        )];
714        metadata_region
715            .add_logical_regions(physical_region_id, true, iter.into_iter())
716            .await
717            .unwrap();
718
719        // Check if the logical region is added.
720        let logical_regions = metadata_region
721            .logical_regions(physical_region_id)
722            .await
723            .unwrap();
724        assert_eq!(logical_regions.len(), 2);
725
726        // Check if the logical region columns are added.
727        let logical_columns = metadata_region
728            .logical_columns(physical_region_id, logical_region_id)
729            .await
730            .unwrap()
731            .into_iter()
732            .collect::<HashMap<_, _>>();
733        assert_eq!(logical_columns.len(), 2);
734        assert_eq!(column_metadatas, logical_columns);
735    }
736}