common_meta/maintenance/reconcile_table/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fmt;
17
18use api::v1::SemanticType;
19use datatypes::schema::ColumnSchema;
20use snafu::{ensure, OptionExt};
21use store_api::metadata::{ColumnMetadata, RegionMetadata};
22use store_api::storage::{RegionId, TableId};
23use table::metadata::RawTableMeta;
24use table::table_reference::TableReference;
25
26use crate::error::{
27    MismatchColumnIdSnafu, MissingColumnInColumnMetadataSnafu, Result, UnexpectedSnafu,
28};
29
30#[derive(Debug, PartialEq, Eq)]
31struct PartialRegionMetadata<'a> {
32    column_metadatas: &'a [ColumnMetadata],
33    primary_key: &'a [u32],
34    table_id: TableId,
35}
36
37impl<'a> From<&'a RegionMetadata> for PartialRegionMetadata<'a> {
38    fn from(region_metadata: &'a RegionMetadata) -> Self {
39        Self {
40            column_metadatas: &region_metadata.column_metadatas,
41            primary_key: &region_metadata.primary_key,
42            table_id: region_metadata.region_id.table_id(),
43        }
44    }
45}
46
47/// A display wrapper for [`ColumnMetadata`] that formats the column metadata in a more readable way.
48struct ColumnMetadataDisplay<'a>(pub &'a ColumnMetadata);
49
50impl<'a> fmt::Debug for ColumnMetadataDisplay<'a> {
51    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
52        let col = self.0;
53        write!(
54            f,
55            "Column {{ name: {}, id: {}, semantic_type: {:?}, data_type: {:?} }}",
56            col.column_schema.name, col.column_id, col.semantic_type, col.column_schema.data_type,
57        )
58    }
59}
60
61/// Checks if the column metadatas are consistent.
62///
63/// The column metadatas are consistent if:
64/// - The column metadatas are the same.
65/// - The primary key are the same.
66/// - The table id of the region metadatas are the same.
67///
68/// ## Panic
69/// Panic if region_metadatas is empty.
70pub(crate) fn check_column_metadatas_consistent(
71    region_metadatas: &[RegionMetadata],
72) -> Option<Vec<ColumnMetadata>> {
73    let is_column_metadata_consistent = region_metadatas
74        .windows(2)
75        .all(|w| PartialRegionMetadata::from(&w[0]) == PartialRegionMetadata::from(&w[1]));
76
77    if !is_column_metadata_consistent {
78        return None;
79    }
80
81    Some(region_metadatas[0].column_metadatas.clone())
82}
83
84/// Resolves column metadata inconsistencies among the given region metadatas
85/// by using the column metadata from the metasrv as the source of truth.
86///
87/// All region metadatas whose column metadata differs from the given `column_metadatas`
88/// will be marked for reconciliation.
89///
90/// Returns the region ids that need to be reconciled.
91pub(crate) fn resolve_column_metadatas_with_metasrv(
92    column_metadatas: &[ColumnMetadata],
93    region_metadatas: &[RegionMetadata],
94) -> Result<Vec<RegionId>> {
95    let is_same_table = region_metadatas
96        .windows(2)
97        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
98
99    ensure!(
100        is_same_table,
101        UnexpectedSnafu {
102            err_msg: "Region metadatas are not from the same table"
103        }
104    );
105
106    let mut regions_ids = vec![];
107    for region_metadata in region_metadatas {
108        if region_metadata.column_metadatas != column_metadatas {
109            let is_invariant_preserved = check_column_metadata_invariants(
110                column_metadatas,
111                &region_metadata.column_metadatas,
112            );
113            ensure!(
114                is_invariant_preserved,
115                UnexpectedSnafu {
116                    err_msg: format!(
117                        "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
118                        region_metadata.region_id,
119                        column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
120                        region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
121                    )
122                }
123            );
124            regions_ids.push(region_metadata.region_id);
125        }
126    }
127    Ok(regions_ids)
128}
129
130/// Resolves column metadata inconsistencies among the given region metadatas
131/// by selecting the column metadata with the highest schema version.
132///
133/// This strategy assumes that at most two versions of column metadata may exist,
134/// due to the poison mechanism, making the highest schema version a safe choice.
135///
136/// Returns the resolved column metadata and the region ids that need to be reconciled.
137pub(crate) fn resolve_column_metadatas_with_latest(
138    region_metadatas: &[RegionMetadata],
139) -> Result<(Vec<ColumnMetadata>, Vec<RegionId>)> {
140    let is_same_table = region_metadatas
141        .windows(2)
142        .all(|w| w[0].region_id.table_id() == w[1].region_id.table_id());
143
144    ensure!(
145        is_same_table,
146        UnexpectedSnafu {
147            err_msg: "Region metadatas are not from the same table"
148        }
149    );
150
151    let latest_region_metadata = region_metadatas
152        .iter()
153        .max_by_key(|c| c.schema_version)
154        .context(UnexpectedSnafu {
155            err_msg: "All Region metadatas have the same schema version",
156        })?;
157    let latest_column_metadatas = PartialRegionMetadata::from(latest_region_metadata);
158
159    let mut region_ids = vec![];
160    for region_metadata in region_metadatas {
161        if PartialRegionMetadata::from(region_metadata) != latest_column_metadatas {
162            let is_invariant_preserved = check_column_metadata_invariants(
163                &latest_region_metadata.column_metadatas,
164                &region_metadata.column_metadatas,
165            );
166            ensure!(
167                is_invariant_preserved,
168                UnexpectedSnafu {
169                    err_msg: format!(
170                        "Column metadata invariants violated for region {}. Resolved column metadata: {:?}, region column metadata: {:?}",
171                        region_metadata.region_id,
172                        latest_column_metadatas.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>(),
173                        region_metadata.column_metadatas.iter().map(ColumnMetadataDisplay).collect::<Vec<_>>()
174                    )
175                }
176            );
177            region_ids.push(region_metadata.region_id);
178        }
179    }
180
181    // TODO(weny): verify the new column metadatas are acceptable for regions.
182    Ok((latest_region_metadata.column_metadatas.clone(), region_ids))
183}
184
185/// Constructs a vector of [`ColumnMetadata`] from the provided table information.
186///
187/// This function maps each [`ColumnSchema`] to its corresponding [`ColumnMetadata`] by
188/// determining the semantic type (Tag, Timestamp, or Field) and retrieving the column ID
189/// from the `name_to_ids` mapping.
190///
191/// Returns an error if any column name is missing in the mapping.
192pub(crate) fn build_column_metadata_from_table_info(
193    column_schemas: &[ColumnSchema],
194    primary_key_indexes: &[usize],
195    name_to_ids: &HashMap<String, u32>,
196) -> Result<Vec<ColumnMetadata>> {
197    let primary_names = primary_key_indexes
198        .iter()
199        .map(|i| column_schemas[*i].name.as_str())
200        .collect::<HashSet<_>>();
201
202    column_schemas
203        .iter()
204        .map(|column_schema| {
205            let column_id = *name_to_ids
206                .get(column_schema.name.as_str())
207                .with_context(|| UnexpectedSnafu {
208                    err_msg: format!(
209                        "Column name {} not found in name_to_ids",
210                        column_schema.name
211                    ),
212                })?;
213
214            let semantic_type = if primary_names.contains(&column_schema.name.as_str()) {
215                SemanticType::Tag
216            } else if column_schema.is_time_index() {
217                SemanticType::Timestamp
218            } else {
219                SemanticType::Field
220            };
221            Ok(ColumnMetadata {
222                column_schema: column_schema.clone(),
223                semantic_type,
224                column_id,
225            })
226        })
227        .collect::<Result<Vec<_>>>()
228}
229
230/// Checks whether the schema invariants hold between the existing and new column metadata.
231///
232/// Invariants:
233/// - Primary key (Tag) columns must exist in the new metadata, with identical name and ID.
234/// - Timestamp column must remain exactly the same in name and ID.
235pub(crate) fn check_column_metadata_invariants(
236    new_column_metadatas: &[ColumnMetadata],
237    column_metadatas: &[ColumnMetadata],
238) -> bool {
239    let new_primary_keys = new_column_metadatas
240        .iter()
241        .filter(|c| c.semantic_type == SemanticType::Tag)
242        .map(|c| (c.column_schema.name.as_str(), c.column_id))
243        .collect::<HashMap<_, _>>();
244
245    let old_primary_keys = column_metadatas
246        .iter()
247        .filter(|c| c.semantic_type == SemanticType::Tag)
248        .map(|c| (c.column_schema.name.as_str(), c.column_id));
249
250    for (name, id) in old_primary_keys {
251        if new_primary_keys.get(name) != Some(&id) {
252            return false;
253        }
254    }
255
256    let new_ts_column = new_column_metadatas
257        .iter()
258        .find(|c| c.semantic_type == SemanticType::Timestamp)
259        .map(|c| (c.column_schema.name.as_str(), c.column_id));
260
261    let old_ts_column = column_metadatas
262        .iter()
263        .find(|c| c.semantic_type == SemanticType::Timestamp)
264        .map(|c| (c.column_schema.name.as_str(), c.column_id));
265
266    new_ts_column == old_ts_column
267}
268
269/// Builds a [`RawTableMeta`] from the provided [`ColumnMetadata`]s.
270///
271/// Returns an error if:
272/// - Any column is missing in the `name_to_ids`.
273/// - The column id in table metadata is not the same as the column id in the column metadata.
274/// - The table index is missing in the column metadata.
275/// - The primary key or partition key columns are missing in the column metadata.
276pub(crate) fn build_table_meta_from_column_metadatas(
277    table_id: TableId,
278    table_ref: TableReference,
279    table_meta: &RawTableMeta,
280    name_to_ids: &HashMap<String, u32>,
281    column_metadata: &[ColumnMetadata],
282) -> Result<RawTableMeta> {
283    let column_in_column_metadata = column_metadata
284        .iter()
285        .map(|c| (c.column_schema.name.as_str(), c))
286        .collect::<HashMap<_, _>>();
287    let primary_key_names = table_meta
288        .primary_key_indices
289        .iter()
290        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
291        .collect::<HashSet<_>>();
292    let partition_key_names = table_meta
293        .partition_key_indices
294        .iter()
295        .map(|i| table_meta.schema.column_schemas[*i].name.as_str())
296        .collect::<HashSet<_>>();
297    ensure!(
298        column_metadata
299            .iter()
300            .any(|c| c.semantic_type == SemanticType::Timestamp),
301        UnexpectedSnafu {
302            err_msg: format!(
303                "Missing table index in column metadata, table: {}, table_id: {}",
304                table_ref, table_id
305            ),
306        }
307    );
308
309    // Ensures all primary key and partition key exists in the column metadata.
310    for column_name in primary_key_names.iter().chain(partition_key_names.iter()) {
311        let column_in_column_metadata =
312            column_in_column_metadata
313                .get(column_name)
314                .with_context(|| MissingColumnInColumnMetadataSnafu {
315                    column_name: column_name.to_string(),
316                    table_name: table_ref.to_string(),
317                    table_id,
318                })?;
319
320        let column_id = *name_to_ids
321            .get(*column_name)
322            .with_context(|| UnexpectedSnafu {
323                err_msg: format!("column id not found in name_to_ids: {}", column_name),
324            })?;
325        ensure!(
326            column_id == column_in_column_metadata.column_id,
327            MismatchColumnIdSnafu {
328                column_name: column_name.to_string(),
329                column_id,
330                table_name: table_ref.to_string(),
331                table_id,
332            }
333        );
334    }
335
336    let mut new_raw_table_meta = table_meta.clone();
337    let primary_key_indices = &mut new_raw_table_meta.primary_key_indices;
338    let partition_key_indices = &mut new_raw_table_meta.partition_key_indices;
339    let value_indices = &mut new_raw_table_meta.value_indices;
340    let time_index = &mut new_raw_table_meta.schema.timestamp_index;
341    let columns = &mut new_raw_table_meta.schema.column_schemas;
342    let column_ids = &mut new_raw_table_meta.column_ids;
343
344    column_ids.clear();
345    value_indices.clear();
346    columns.clear();
347    primary_key_indices.clear();
348    partition_key_indices.clear();
349
350    for (idx, col) in column_metadata.iter().enumerate() {
351        if partition_key_names.contains(&col.column_schema.name.as_str()) {
352            partition_key_indices.push(idx);
353        }
354        match col.semantic_type {
355            SemanticType::Tag => {
356                primary_key_indices.push(idx);
357            }
358            SemanticType::Field => {
359                value_indices.push(idx);
360            }
361            SemanticType::Timestamp => {
362                value_indices.push(idx);
363                *time_index = Some(idx);
364            }
365        }
366
367        columns.push(col.column_schema.clone());
368        column_ids.push(col.column_id);
369    }
370
371    if let Some(time_index) = *time_index {
372        new_raw_table_meta.schema.column_schemas[time_index].set_time_index();
373    }
374
375    Ok(new_raw_table_meta)
376}
377
378#[cfg(test)]
379mod tests {
380    use std::assert_matches::assert_matches;
381    use std::collections::HashMap;
382    use std::sync::Arc;
383
384    use api::v1::SemanticType;
385    use datatypes::prelude::ConcreteDataType;
386    use datatypes::schema::{ColumnSchema, Schema, SchemaBuilder};
387    use store_api::metadata::ColumnMetadata;
388    use table::metadata::{RawTableMeta, TableMetaBuilder};
389    use table::table_reference::TableReference;
390
391    use super::*;
392    use crate::ddl::test_util::region_metadata::build_region_metadata;
393    use crate::error::Error;
394
395    fn new_test_schema() -> Schema {
396        let column_schemas = vec![
397            ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
398            ColumnSchema::new(
399                "ts",
400                ConcreteDataType::timestamp_millisecond_datatype(),
401                false,
402            )
403            .with_time_index(true),
404            ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
405        ];
406        SchemaBuilder::try_from(column_schemas)
407            .unwrap()
408            .version(123)
409            .build()
410            .unwrap()
411    }
412
413    fn new_test_column_metadatas() -> Vec<ColumnMetadata> {
414        vec![
415            ColumnMetadata {
416                column_schema: ColumnSchema::new("col1", ConcreteDataType::int32_datatype(), true),
417                semantic_type: SemanticType::Tag,
418                column_id: 0,
419            },
420            ColumnMetadata {
421                column_schema: ColumnSchema::new(
422                    "ts",
423                    ConcreteDataType::timestamp_millisecond_datatype(),
424                    false,
425                )
426                .with_time_index(true),
427                semantic_type: SemanticType::Timestamp,
428                column_id: 1,
429            },
430            ColumnMetadata {
431                column_schema: ColumnSchema::new("col2", ConcreteDataType::int32_datatype(), true),
432                semantic_type: SemanticType::Field,
433                column_id: 2,
434            },
435        ]
436    }
437
438    fn new_test_raw_table_info() -> RawTableMeta {
439        let mut table_meta_builder = TableMetaBuilder::empty();
440        let table_meta = table_meta_builder
441            .schema(Arc::new(new_test_schema()))
442            .primary_key_indices(vec![0])
443            .partition_key_indices(vec![2])
444            .next_column_id(4)
445            .build()
446            .unwrap();
447
448        table_meta.into()
449    }
450
451    #[test]
452    fn test_build_table_info_from_column_metadatas() {
453        let mut column_metadatas = new_test_column_metadatas();
454        column_metadatas.push(ColumnMetadata {
455            column_schema: ColumnSchema::new("col3", ConcreteDataType::string_datatype(), true),
456            semantic_type: SemanticType::Tag,
457            column_id: 3,
458        });
459
460        let table_id = 1;
461        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
462        let table_meta = new_test_raw_table_info();
463        let name_to_ids = HashMap::from([
464            ("col1".to_string(), 0),
465            ("ts".to_string(), 1),
466            ("col2".to_string(), 2),
467        ]);
468
469        let new_table_meta = build_table_meta_from_column_metadatas(
470            table_id,
471            table_ref,
472            &table_meta,
473            &name_to_ids,
474            &column_metadatas,
475        )
476        .unwrap();
477
478        assert_eq!(new_table_meta.primary_key_indices, vec![0, 3]);
479        assert_eq!(new_table_meta.partition_key_indices, vec![2]);
480        assert_eq!(new_table_meta.value_indices, vec![1, 2]);
481        assert_eq!(new_table_meta.schema.timestamp_index, Some(1));
482        assert_eq!(new_table_meta.column_ids, vec![0, 1, 2, 3]);
483    }
484
485    #[test]
486    fn test_build_table_info_from_column_metadatas_with_incorrect_name_to_ids() {
487        let column_metadatas = new_test_column_metadatas();
488        let table_id = 1;
489        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
490        let table_meta = new_test_raw_table_info();
491        let name_to_ids = HashMap::from([
492            ("col1".to_string(), 0),
493            ("ts".to_string(), 1),
494            // Change column id of col2 to 3.
495            ("col2".to_string(), 3),
496        ]);
497
498        let err = build_table_meta_from_column_metadatas(
499            table_id,
500            table_ref,
501            &table_meta,
502            &name_to_ids,
503            &column_metadatas,
504        )
505        .unwrap_err();
506
507        assert_matches!(err, Error::MismatchColumnId { .. });
508    }
509
510    #[test]
511    fn test_build_table_info_from_column_metadatas_with_missing_time_index() {
512        let mut column_metadatas = new_test_column_metadatas();
513        column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
514        let table_id = 1;
515        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
516        let table_meta = new_test_raw_table_info();
517        let name_to_ids = HashMap::from([
518            ("col1".to_string(), 0),
519            ("ts".to_string(), 1),
520            ("col2".to_string(), 2),
521        ]);
522
523        let err = build_table_meta_from_column_metadatas(
524            table_id,
525            table_ref,
526            &table_meta,
527            &name_to_ids,
528            &column_metadatas,
529        )
530        .unwrap_err();
531
532        assert!(
533            err.to_string()
534                .contains("Missing table index in column metadata"),
535            "err: {}",
536            err
537        );
538    }
539
540    #[test]
541    fn test_build_table_info_from_column_metadatas_with_missing_column() {
542        let mut column_metadatas = new_test_column_metadatas();
543        // Remove primary key column.
544        column_metadatas.retain(|c| c.column_id != 0);
545        let table_id = 1;
546        let table_ref = TableReference::full("test_catalog", "test_schema", "test_table");
547        let table_meta = new_test_raw_table_info();
548        let name_to_ids = HashMap::from([
549            ("col1".to_string(), 0),
550            ("ts".to_string(), 1),
551            ("col2".to_string(), 2),
552        ]);
553
554        let err = build_table_meta_from_column_metadatas(
555            table_id,
556            table_ref,
557            &table_meta,
558            &name_to_ids,
559            &column_metadatas,
560        )
561        .unwrap_err();
562        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
563
564        let mut column_metadatas = new_test_column_metadatas();
565        // Remove partition key column.
566        column_metadatas.retain(|c| c.column_id != 2);
567
568        let err = build_table_meta_from_column_metadatas(
569            table_id,
570            table_ref,
571            &table_meta,
572            &name_to_ids,
573            &column_metadatas,
574        )
575        .unwrap_err();
576        assert_matches!(err, Error::MissingColumnInColumnMetadata { .. });
577    }
578
579    #[test]
580    fn test_check_column_metadatas_consistent() {
581        let column_metadatas = new_test_column_metadatas();
582        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
583        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
584        let result =
585            check_column_metadatas_consistent(&[region_metadata1, region_metadata2]).unwrap();
586        assert_eq!(result, column_metadatas);
587
588        let region_metadata1 = build_region_metadata(RegionId::new(1025, 0), &column_metadatas);
589        let region_metadata2 = build_region_metadata(RegionId::new(1024, 1), &column_metadatas);
590        let result = check_column_metadatas_consistent(&[region_metadata1, region_metadata2]);
591        assert!(result.is_none());
592    }
593
594    #[test]
595    fn test_check_column_metadata_invariants() {
596        let column_metadatas = new_test_column_metadatas();
597        let mut new_column_metadatas = column_metadatas.clone();
598        new_column_metadatas.push(ColumnMetadata {
599            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
600            semantic_type: SemanticType::Field,
601            column_id: 3,
602        });
603        assert!(check_column_metadata_invariants(
604            &new_column_metadatas,
605            &column_metadatas
606        ));
607    }
608
609    #[test]
610    fn test_check_column_metadata_invariants_missing_primary_key_column_or_ts_column() {
611        let column_metadatas = new_test_column_metadatas();
612        let mut new_column_metadatas = column_metadatas.clone();
613        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Timestamp);
614        assert!(!check_column_metadata_invariants(
615            &new_column_metadatas,
616            &column_metadatas
617        ));
618
619        let column_metadatas = new_test_column_metadatas();
620        let mut new_column_metadatas = column_metadatas.clone();
621        new_column_metadatas.retain(|c| c.semantic_type != SemanticType::Tag);
622        assert!(!check_column_metadata_invariants(
623            &new_column_metadatas,
624            &column_metadatas
625        ));
626    }
627
628    #[test]
629    fn test_check_column_metadata_invariants_mismatch_column_id() {
630        let column_metadatas = new_test_column_metadatas();
631        let mut new_column_metadatas = column_metadatas.clone();
632        if let Some(col) = new_column_metadatas
633            .iter_mut()
634            .find(|c| c.semantic_type == SemanticType::Timestamp)
635        {
636            col.column_id = 100;
637        }
638        assert!(!check_column_metadata_invariants(
639            &new_column_metadatas,
640            &column_metadatas
641        ));
642
643        let column_metadatas = new_test_column_metadatas();
644        let mut new_column_metadatas = column_metadatas.clone();
645        if let Some(col) = new_column_metadatas
646            .iter_mut()
647            .find(|c| c.semantic_type == SemanticType::Tag)
648        {
649            col.column_id = 100;
650        }
651        assert!(!check_column_metadata_invariants(
652            &new_column_metadatas,
653            &column_metadatas
654        ));
655    }
656
657    #[test]
658    fn test_resolve_column_metadatas_with_use_metasrv_strategy() {
659        let column_metadatas = new_test_column_metadatas();
660        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
661        let mut metasrv_column_metadatas = region_metadata1.column_metadatas.clone();
662        metasrv_column_metadatas.push(ColumnMetadata {
663            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
664            semantic_type: SemanticType::Field,
665            column_id: 3,
666        });
667        let result =
668            resolve_column_metadatas_with_metasrv(&metasrv_column_metadatas, &[region_metadata1])
669                .unwrap();
670
671        assert_eq!(result, vec![RegionId::new(1024, 0)]);
672    }
673
674    #[test]
675    fn test_resolve_column_metadatas_with_use_latest_strategy() {
676        let column_metadatas = new_test_column_metadatas();
677        let region_metadata1 = build_region_metadata(RegionId::new(1024, 0), &column_metadatas);
678        let mut new_column_metadatas = column_metadatas.clone();
679        new_column_metadatas.push(ColumnMetadata {
680            column_schema: ColumnSchema::new("col3", ConcreteDataType::int32_datatype(), true),
681            semantic_type: SemanticType::Field,
682            column_id: 3,
683        });
684
685        let mut region_metadata2 =
686            build_region_metadata(RegionId::new(1024, 1), &new_column_metadatas);
687        region_metadata2.schema_version = 2;
688
689        let (resolved_column_metadatas, region_ids) =
690            resolve_column_metadatas_with_latest(&[region_metadata1, region_metadata2]).unwrap();
691        assert_eq!(region_ids, vec![RegionId::new(1024, 0)]);
692        assert_eq!(resolved_column_metadatas, new_column_metadatas);
693    }
694}