mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{take, TakeOptions};
25use datatypes::arrow::datatypes::{Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32    build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
33    SortField,
34};
35use snafu::{ensure, OptionExt, ResultExt};
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::storage::ColumnId;
38
39use crate::error::{
40    CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
41    NewRecordBatchSnafu, Result, UnexpectedSnafu,
42};
43use crate::read::flat_projection::{flat_projected_columns, FlatProjectionMapper};
44use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
45use crate::read::{Batch, BatchColumn, BatchReader};
46use crate::sst::parquet::flat_format::primary_key_column_index;
47use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM};
48use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
49
50/// Reader to adapt schema of underlying reader to expected schema.
51pub struct CompatReader<R> {
52    /// Underlying reader.
53    reader: R,
54    /// Helper to compat batches.
55    compat: PrimaryKeyCompatBatch,
56}
57
58impl<R> CompatReader<R> {
59    /// Creates a new compat reader.
60    /// - `mapper` is built from the metadata users expect to see.
61    /// - `reader_meta` is the metadata of the input reader.
62    /// - `reader` is the input reader.
63    pub fn new(
64        mapper: &ProjectionMapper,
65        reader_meta: RegionMetadataRef,
66        reader: R,
67    ) -> Result<CompatReader<R>> {
68        Ok(CompatReader {
69            reader,
70            compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
71        })
72    }
73}
74
75#[async_trait::async_trait]
76impl<R: BatchReader> BatchReader for CompatReader<R> {
77    async fn next_batch(&mut self) -> Result<Option<Batch>> {
78        let Some(mut batch) = self.reader.next_batch().await? else {
79            return Ok(None);
80        };
81
82        batch = self.compat.compat_batch(batch)?;
83
84        Ok(Some(batch))
85    }
86}
87
88/// Helper to adapt schema of the batch to an expected schema.
89pub(crate) enum CompatBatch {
90    /// Adapter for primary key format.
91    PrimaryKey(PrimaryKeyCompatBatch),
92    /// Adapter for flat format.
93    #[allow(dead_code)]
94    Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98    /// Returns the inner primary key batch adapter if this is a PrimaryKey format.
99    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100        match self {
101            CompatBatch::PrimaryKey(batch) => Some(batch),
102            _ => None,
103        }
104    }
105
106    /// Returns the inner flat batch adapter if this is a Flat format.
107    #[allow(dead_code)]
108    pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
109        match self {
110            CompatBatch::Flat(batch) => Some(batch),
111            _ => None,
112        }
113    }
114}
115
116/// A helper struct to adapt schema of the batch to an expected schema.
117pub(crate) struct PrimaryKeyCompatBatch {
118    /// Optional primary key adapter.
119    rewrite_pk: Option<RewritePrimaryKey>,
120    /// Optional primary key adapter.
121    compat_pk: Option<CompatPrimaryKey>,
122    /// Optional fields adapter.
123    compat_fields: Option<CompatFields>,
124}
125
126impl PrimaryKeyCompatBatch {
127    /// Creates a new [CompatBatch].
128    /// - `mapper` is built from the metadata users expect to see.
129    /// - `reader_meta` is the metadata of the input reader.
130    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
131        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
132        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
133        let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
134            reason: "Unexpected format",
135        })?;
136        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
137
138        Ok(Self {
139            rewrite_pk,
140            compat_pk,
141            compat_fields,
142        })
143    }
144
145    /// Adapts the `batch` to the expected schema.
146    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
147        if let Some(rewrite_pk) = &self.rewrite_pk {
148            batch = rewrite_pk.compat(batch)?;
149        }
150        if let Some(compat_pk) = &self.compat_pk {
151            batch = compat_pk.compat(batch)?;
152        }
153        if let Some(compat_fields) = &self.compat_fields {
154            batch = compat_fields.compat(batch);
155        }
156
157        Ok(batch)
158    }
159}
160
161/// Returns true if `left` and `right` have same columns and primary key encoding.
162pub(crate) fn has_same_columns_and_pk_encoding(
163    left: &RegionMetadata,
164    right: &RegionMetadata,
165) -> bool {
166    if left.primary_key_encoding != right.primary_key_encoding {
167        return false;
168    }
169
170    if left.column_metadatas.len() != right.column_metadatas.len() {
171        return false;
172    }
173
174    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
175        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
176            return false;
177        }
178        debug_assert_eq!(
179            left_col.column_schema.data_type,
180            right_col.column_schema.data_type
181        );
182        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
183    }
184
185    true
186}
187
188/// A helper struct to adapt schema of the batch to an expected schema.
189#[allow(dead_code)]
190pub(crate) struct FlatCompatBatch {
191    /// Indices to convert actual fields to expect fields.
192    index_or_defaults: Vec<IndexOrDefault>,
193    /// Expected arrow schema.
194    arrow_schema: SchemaRef,
195    /// Primary key adapter.
196    compat_pk: FlatCompatPrimaryKey,
197}
198
199impl FlatCompatBatch {
200    /// Creates a [FlatCompatBatch].
201    ///
202    /// - `mapper` is built from the metadata users expect to see.
203    /// - `actual` is the [RegionMetadata] of the input parquet.
204    /// - `format_projection` is the projection of the read format for the input parquet.
205    pub(crate) fn try_new(
206        mapper: &FlatProjectionMapper,
207        actual: &RegionMetadataRef,
208        format_projection: &FormatProjection,
209    ) -> Result<Self> {
210        let actual_schema = flat_projected_columns(actual, format_projection);
211        let expect_schema = mapper.batch_schema();
212        // has_same_columns_and_pk_encoding() already checks columns and encodings.
213        debug_assert_ne!(expect_schema, actual_schema);
214
215        // Maps column id to the index and data type in the actual schema.
216        let actual_schema_index: HashMap<_, _> = actual_schema
217            .iter()
218            .enumerate()
219            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
220            .collect();
221
222        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
223        let mut fields = Vec::with_capacity(expect_schema.len());
224        for (column_id, expect_data_type) in expect_schema {
225            // Safety: expect_schema comes from the same mapper.
226            let column_index = mapper.metadata().column_index_by_id(*column_id).unwrap();
227            let expect_column = &mapper.metadata().column_metadatas[column_index];
228            let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index];
229            // For tag columns, we need to create a dictionary field.
230            if expect_column.semantic_type == SemanticType::Tag {
231                fields.push(tag_maybe_to_dictionary_field(
232                    &expect_column.column_schema.data_type,
233                    column_field,
234                ));
235            } else {
236                fields.push(column_field.clone());
237            };
238
239            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
240                let mut cast_type = None;
241
242                // Same column different type.
243                if expect_data_type != *actual_data_type {
244                    cast_type = Some(expect_data_type.clone())
245                }
246                // Source has this column.
247                index_or_defaults.push(IndexOrDefault::Index {
248                    pos: *index,
249                    cast_type,
250                });
251            } else {
252                // Create a default vector with 1 element for that column.
253                let default_vector = expect_column
254                    .column_schema
255                    .create_default_vector(1)
256                    .context(CreateDefaultSnafu {
257                        region_id: mapper.metadata().region_id,
258                        column: &expect_column.column_schema.name,
259                    })?
260                    .with_context(|| CompatReaderSnafu {
261                        region_id: mapper.metadata().region_id,
262                        reason: format!(
263                            "column {} does not have a default value to read",
264                            expect_column.column_schema.name
265                        ),
266                    })?;
267                index_or_defaults.push(IndexOrDefault::DefaultValue {
268                    column_id: expect_column.column_id,
269                    default_vector,
270                    semantic_type: expect_column.semantic_type,
271                });
272            };
273        }
274        fields.extend_from_slice(&internal_fields());
275
276        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
277
278        Ok(Self {
279            index_or_defaults,
280            arrow_schema: Arc::new(Schema::new(fields)),
281            compat_pk,
282        })
283    }
284
285    /// Make columns of the `batch` compatible.
286    #[allow(dead_code)]
287    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
288        let len = batch.num_rows();
289        let columns = self
290            .index_or_defaults
291            .iter()
292            .map(|index_or_default| match index_or_default {
293                IndexOrDefault::Index { pos, cast_type } => {
294                    let old_column = batch.column(*pos);
295
296                    if let Some(ty) = cast_type {
297                        // Safety: We ensure type can be converted and the new batch should be valid.
298                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
299                        let casted =
300                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
301                                .context(ComputeArrowSnafu)?;
302                        Ok(casted)
303                    } else {
304                        Ok(old_column.clone())
305                    }
306                }
307                IndexOrDefault::DefaultValue {
308                    column_id: _,
309                    default_vector,
310                    semantic_type,
311                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
312            })
313            .chain(
314                // Adds internal columns.
315                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
316                    .iter()
317                    .map(|col| Ok(col.clone())),
318            )
319            .collect::<Result<Vec<_>>>()?;
320
321        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
322            .context(NewRecordBatchSnafu)?;
323
324        // Handles primary keys.
325        self.compat_pk.compat(compat_batch)
326    }
327}
328
329/// Repeats the vector value `to_len` times.
330fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
331    assert_eq!(1, vector.len());
332    if is_tag {
333        let values = vector.to_arrow_array();
334        if values.is_null(0) {
335            // Creates a dictionary array with `to_len` null keys.
336            let keys = UInt32Array::new_null(to_len);
337            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
338        } else {
339            let keys = UInt32Array::from_value(0, to_len);
340            Ok(Arc::new(DictionaryArray::new(keys, values)))
341        }
342    } else {
343        let keys = UInt32Array::from_value(0, to_len);
344        take(
345            &vector.to_arrow_array(),
346            &keys,
347            Some(TakeOptions {
348                check_bounds: false,
349            }),
350        )
351        .context(ComputeArrowSnafu)
352    }
353}
354
355/// Helper to make primary key compatible.
356#[derive(Debug)]
357struct CompatPrimaryKey {
358    /// Row converter to append values to primary keys.
359    converter: Arc<dyn PrimaryKeyCodec>,
360    /// Default values to append.
361    values: Vec<(ColumnId, Value)>,
362}
363
364impl CompatPrimaryKey {
365    /// Make primary key of the `batch` compatible.
366    fn compat(&self, mut batch: Batch) -> Result<Batch> {
367        let mut buffer = Vec::with_capacity(
368            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
369        );
370        buffer.extend_from_slice(batch.primary_key());
371        self.converter
372            .encode_values(&self.values, &mut buffer)
373            .context(EncodeSnafu)?;
374
375        batch.set_primary_key(buffer);
376
377        // update cache
378        if let Some(pk_values) = &mut batch.pk_values {
379            pk_values.extend(&self.values);
380        }
381
382        Ok(batch)
383    }
384}
385
386/// Helper to make fields compatible.
387#[derive(Debug)]
388struct CompatFields {
389    /// Column Ids and DataTypes the reader actually returns.
390    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
391    /// Indices to convert actual fields to expect fields.
392    index_or_defaults: Vec<IndexOrDefault>,
393}
394
395impl CompatFields {
396    /// Make fields of the `batch` compatible.
397    #[must_use]
398    fn compat(&self, batch: Batch) -> Batch {
399        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
400        debug_assert!(self
401            .actual_fields
402            .iter()
403            .zip(batch.fields())
404            .all(|((id, _), batch_column)| *id == batch_column.column_id));
405
406        let len = batch.num_rows();
407        let fields = self
408            .index_or_defaults
409            .iter()
410            .map(|index_or_default| match index_or_default {
411                IndexOrDefault::Index { pos, cast_type } => {
412                    let old_column = &batch.fields()[*pos];
413
414                    let data = if let Some(ty) = cast_type {
415                        // Safety: We ensure type can be converted and the new batch should be valid.
416                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
417                        old_column.data.cast(ty).unwrap()
418                    } else {
419                        old_column.data.clone()
420                    };
421                    BatchColumn {
422                        column_id: old_column.column_id,
423                        data,
424                    }
425                }
426                IndexOrDefault::DefaultValue {
427                    column_id,
428                    default_vector,
429                    semantic_type: _,
430                } => {
431                    let data = default_vector.replicate(&[len]);
432                    BatchColumn {
433                        column_id: *column_id,
434                        data,
435                    }
436                }
437            })
438            .collect();
439
440        // Safety: We ensure all columns have the same length and the new batch should be valid.
441        batch.with_fields(fields).unwrap()
442    }
443}
444
445fn may_rewrite_primary_key(
446    expect: &RegionMetadata,
447    actual: &RegionMetadata,
448) -> Option<RewritePrimaryKey> {
449    if expect.primary_key_encoding == actual.primary_key_encoding {
450        return None;
451    }
452
453    let fields = expect.primary_key.clone();
454    let original = build_primary_key_codec(actual);
455    let new = build_primary_key_codec(expect);
456
457    Some(RewritePrimaryKey {
458        original,
459        new,
460        fields,
461    })
462}
463
464/// Returns true if the actual primary keys is the same as expected.
465fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
466    ensure!(
467        actual.primary_key.len() <= expect.primary_key.len(),
468        CompatReaderSnafu {
469            region_id: expect.region_id,
470            reason: format!(
471                "primary key has more columns {} than expect {}",
472                actual.primary_key.len(),
473                expect.primary_key.len()
474            ),
475        }
476    );
477    ensure!(
478        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
479        CompatReaderSnafu {
480            region_id: expect.region_id,
481            reason: format!(
482                "primary key has different prefix, expect: {:?}, actual: {:?}",
483                expect.primary_key, actual.primary_key
484            ),
485        }
486    );
487
488    Ok(actual.primary_key.len() == expect.primary_key.len())
489}
490
491/// Creates a [CompatPrimaryKey] if needed.
492fn may_compat_primary_key(
493    expect: &RegionMetadata,
494    actual: &RegionMetadata,
495) -> Result<Option<CompatPrimaryKey>> {
496    if is_primary_key_same(expect, actual)? {
497        return Ok(None);
498    }
499
500    // We need to append default values to the primary key.
501    let to_add = &expect.primary_key[actual.primary_key.len()..];
502    let mut fields = Vec::with_capacity(to_add.len());
503    let mut values = Vec::with_capacity(to_add.len());
504    for column_id in to_add {
505        // Safety: The id comes from expect region metadata.
506        let column = expect.column_by_id(*column_id).unwrap();
507        fields.push((
508            *column_id,
509            SortField::new(column.column_schema.data_type.clone()),
510        ));
511        let default_value = column
512            .column_schema
513            .create_default()
514            .context(CreateDefaultSnafu {
515                region_id: expect.region_id,
516                column: &column.column_schema.name,
517            })?
518            .with_context(|| CompatReaderSnafu {
519                region_id: expect.region_id,
520                reason: format!(
521                    "key column {} does not have a default value to read",
522                    column.column_schema.name
523                ),
524            })?;
525        values.push((*column_id, default_value));
526    }
527    // Using expect primary key encoding to build the converter
528    let converter =
529        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
530
531    Ok(Some(CompatPrimaryKey { converter, values }))
532}
533
534/// Creates a [CompatFields] if needed.
535fn may_compat_fields(
536    mapper: &PrimaryKeyProjectionMapper,
537    actual: &RegionMetadata,
538) -> Result<Option<CompatFields>> {
539    let expect_fields = mapper.batch_fields();
540    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
541    if expect_fields == actual_fields {
542        return Ok(None);
543    }
544
545    let source_field_index: HashMap<_, _> = actual_fields
546        .iter()
547        .enumerate()
548        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
549        .collect();
550
551    let index_or_defaults = expect_fields
552        .iter()
553        .map(|(column_id, expect_data_type)| {
554            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
555                let mut cast_type = None;
556
557                if expect_data_type != *actual_data_type {
558                    cast_type = Some(expect_data_type.clone())
559                }
560                // Source has this field.
561                Ok(IndexOrDefault::Index {
562                    pos: *index,
563                    cast_type,
564                })
565            } else {
566                // Safety: mapper must have this column.
567                let column = mapper.metadata().column_by_id(*column_id).unwrap();
568                // Create a default vector with 1 element for that column.
569                let default_vector = column
570                    .column_schema
571                    .create_default_vector(1)
572                    .context(CreateDefaultSnafu {
573                        region_id: mapper.metadata().region_id,
574                        column: &column.column_schema.name,
575                    })?
576                    .with_context(|| CompatReaderSnafu {
577                        region_id: mapper.metadata().region_id,
578                        reason: format!(
579                            "column {} does not have a default value to read",
580                            column.column_schema.name
581                        ),
582                    })?;
583                Ok(IndexOrDefault::DefaultValue {
584                    column_id: column.column_id,
585                    default_vector,
586                    semantic_type: SemanticType::Field,
587                })
588            }
589        })
590        .collect::<Result<Vec<_>>>()?;
591
592    Ok(Some(CompatFields {
593        actual_fields,
594        index_or_defaults,
595    }))
596}
597
598/// Index in source batch or a default value to fill a column.
599#[derive(Debug)]
600enum IndexOrDefault {
601    /// Index of the column in source batch.
602    Index {
603        pos: usize,
604        cast_type: Option<ConcreteDataType>,
605    },
606    /// Default value for the column.
607    DefaultValue {
608        /// Id of the column.
609        column_id: ColumnId,
610        /// Default value. The vector has only 1 element.
611        default_vector: VectorRef,
612        /// Semantic type of the column.
613        semantic_type: SemanticType,
614    },
615}
616
617/// Adapter to rewrite primary key.
618struct RewritePrimaryKey {
619    /// Original primary key codec.
620    original: Arc<dyn PrimaryKeyCodec>,
621    /// New primary key codec.
622    new: Arc<dyn PrimaryKeyCodec>,
623    /// Order of the fields in the new primary key.
624    fields: Vec<ColumnId>,
625}
626
627impl RewritePrimaryKey {
628    /// Make primary key of the `batch` compatible.
629    fn compat(&self, mut batch: Batch) -> Result<Batch> {
630        let values = if let Some(pk_values) = batch.pk_values() {
631            pk_values
632        } else {
633            let new_pk_values = self
634                .original
635                .decode(batch.primary_key())
636                .context(DecodeSnafu)?;
637            batch.set_pk_values(new_pk_values);
638            // Safety: We ensure pk_values is not None.
639            batch.pk_values().as_ref().unwrap()
640        };
641
642        let mut buffer = Vec::with_capacity(
643            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
644        );
645        match values {
646            CompositeValues::Dense(values) => {
647                self.new
648                    .encode_values(values.as_slice(), &mut buffer)
649                    .context(EncodeSnafu)?;
650            }
651            CompositeValues::Sparse(values) => {
652                let values = self
653                    .fields
654                    .iter()
655                    .map(|id| {
656                        let value = values.get_or_null(*id);
657                        (*id, value.as_value_ref())
658                    })
659                    .collect::<Vec<_>>();
660                self.new
661                    .encode_value_refs(&values, &mut buffer)
662                    .context(EncodeSnafu)?;
663            }
664        }
665        batch.set_primary_key(buffer);
666
667        Ok(batch)
668    }
669}
670
671/// Helper to rewrite primary key to another encoding for flat format.
672struct FlatRewritePrimaryKey {
673    /// New primary key encoder.
674    codec: Arc<dyn PrimaryKeyCodec>,
675    /// Metadata of the expected region.
676    metadata: RegionMetadataRef,
677    /// Original primary key codec.
678    /// If we need to rewrite the primary key.
679    old_codec: Arc<dyn PrimaryKeyCodec>,
680}
681
682impl FlatRewritePrimaryKey {
683    fn new(
684        expect: &RegionMetadataRef,
685        actual: &RegionMetadataRef,
686    ) -> Option<FlatRewritePrimaryKey> {
687        if expect.primary_key_encoding == actual.primary_key_encoding {
688            return None;
689        }
690        let codec = build_primary_key_codec(expect);
691        let old_codec = build_primary_key_codec(actual);
692
693        Some(FlatRewritePrimaryKey {
694            codec,
695            metadata: expect.clone(),
696            old_codec,
697        })
698    }
699
700    /// Rewrites the primary key of the `batch`.
701    /// It also appends the values to the primary key.
702    fn rewrite_key(
703        &self,
704        append_values: &[(ColumnId, Value)],
705        batch: RecordBatch,
706    ) -> Result<RecordBatch> {
707        let old_pk_dict_array = batch
708            .column(primary_key_column_index(batch.num_columns()))
709            .as_any()
710            .downcast_ref::<PrimaryKeyArray>()
711            .unwrap();
712        let old_pk_values_array = old_pk_dict_array
713            .values()
714            .as_any()
715            .downcast_ref::<BinaryArray>()
716            .unwrap();
717        let mut builder = BinaryBuilder::with_capacity(
718            old_pk_values_array.len(),
719            old_pk_values_array.value_data().len(),
720        );
721
722        // Binary buffer for the primary key.
723        let mut buffer = Vec::with_capacity(
724            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
725        );
726        let mut column_id_values = Vec::new();
727        // Iterates the binary array and rewrites the primary key.
728        for value in old_pk_values_array.iter() {
729            let Some(old_pk) = value else {
730                builder.append_null();
731                continue;
732            };
733            // Decodes the old primary key.
734            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
735            pk_values.extend(append_values);
736
737            buffer.clear();
738            column_id_values.clear();
739            // Encodes the new primary key.
740            match pk_values {
741                CompositeValues::Dense(dense_values) => {
742                    self.codec
743                        .encode_values(dense_values.as_slice(), &mut buffer)
744                        .context(EncodeSnafu)?;
745                }
746                CompositeValues::Sparse(sparse_values) => {
747                    for id in &self.metadata.primary_key {
748                        let value = sparse_values.get_or_null(*id);
749                        column_id_values.push((*id, value.clone()));
750                    }
751                    self.codec
752                        .encode_values(&column_id_values, &mut buffer)
753                        .context(EncodeSnafu)?;
754                }
755            }
756            builder.append_value(&buffer);
757        }
758        let new_pk_values_array = Arc::new(builder.finish());
759        let new_pk_dict_array =
760            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
761
762        let mut columns = batch.columns().to_vec();
763        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
764
765        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
766    }
767}
768
769/// Helper to make primary key compatible for flat format.
770struct FlatCompatPrimaryKey {
771    /// Primary key rewriter.
772    rewriter: Option<FlatRewritePrimaryKey>,
773    /// Converter to append values to primary keys.
774    converter: Option<Arc<dyn PrimaryKeyCodec>>,
775    /// Default values to append.
776    values: Vec<(ColumnId, Value)>,
777}
778
779impl FlatCompatPrimaryKey {
780    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
781        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
782
783        if is_primary_key_same(expect, actual)? {
784            return Ok(Self {
785                rewriter,
786                converter: None,
787                values: Vec::new(),
788            });
789        }
790
791        // We need to append default values to the primary key.
792        let to_add = &expect.primary_key[actual.primary_key.len()..];
793        let mut values = Vec::with_capacity(to_add.len());
794        let mut fields = Vec::with_capacity(to_add.len());
795        for column_id in to_add {
796            // Safety: The id comes from expect region metadata.
797            let column = expect.column_by_id(*column_id).unwrap();
798            fields.push((
799                *column_id,
800                SortField::new(column.column_schema.data_type.clone()),
801            ));
802            let default_value = column
803                .column_schema
804                .create_default()
805                .context(CreateDefaultSnafu {
806                    region_id: expect.region_id,
807                    column: &column.column_schema.name,
808                })?
809                .with_context(|| CompatReaderSnafu {
810                    region_id: expect.region_id,
811                    reason: format!(
812                        "key column {} does not have a default value to read",
813                        column.column_schema.name
814                    ),
815                })?;
816            values.push((*column_id, default_value));
817        }
818        // is_primary_key_same() is false so we have different number of primary key columns.
819        debug_assert!(!fields.is_empty());
820
821        // Create converter to append values.
822        let converter = Some(build_primary_key_codec_with_fields(
823            expect.primary_key_encoding,
824            fields.into_iter(),
825        ));
826
827        Ok(Self {
828            rewriter,
829            converter,
830            values,
831        })
832    }
833
834    /// Makes primary key of the `batch` compatible.
835    ///
836    /// Callers must ensure other columns except the `__primary_key` column is compatible.
837    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
838        if let Some(rewriter) = &self.rewriter {
839            // If we have different encoding, rewrite the whole primary key.
840            return rewriter.rewrite_key(&self.values, batch);
841        }
842
843        self.append_key(batch)
844    }
845
846    /// Appends values to the primary key of the `batch`.
847    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
848        let Some(converter) = &self.converter else {
849            return Ok(batch);
850        };
851
852        let old_pk_dict_array = batch
853            .column(primary_key_column_index(batch.num_columns()))
854            .as_any()
855            .downcast_ref::<PrimaryKeyArray>()
856            .unwrap();
857        let old_pk_values_array = old_pk_dict_array
858            .values()
859            .as_any()
860            .downcast_ref::<BinaryArray>()
861            .unwrap();
862        let mut builder = BinaryBuilder::with_capacity(
863            old_pk_values_array.len(),
864            old_pk_values_array.value_data().len()
865                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
866        );
867
868        // Binary buffer for the primary key.
869        let mut buffer = Vec::with_capacity(
870            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
871                + converter.estimated_size().unwrap_or_default(),
872        );
873
874        // Iterates the binary array and appends values to the primary key.
875        for value in old_pk_values_array.iter() {
876            let Some(old_pk) = value else {
877                builder.append_null();
878                continue;
879            };
880
881            buffer.clear();
882            buffer.extend_from_slice(old_pk);
883            converter
884                .encode_values(&self.values, &mut buffer)
885                .context(EncodeSnafu)?;
886
887            builder.append_value(&buffer);
888        }
889
890        let new_pk_values_array = Arc::new(builder.finish());
891        let new_pk_dict_array =
892            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
893
894        // Overrides the primary key column.
895        let mut columns = batch.columns().to_vec();
896        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
897
898        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
899    }
900}
901
902#[cfg(test)]
903mod tests {
904    use std::sync::Arc;
905
906    use api::v1::{OpType, SemanticType};
907    use datatypes::arrow::array::{
908        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
909        TimestampMillisecondArray, UInt64Array, UInt8Array,
910    };
911    use datatypes::arrow::datatypes::UInt32Type;
912    use datatypes::arrow::record_batch::RecordBatch;
913    use datatypes::prelude::ConcreteDataType;
914    use datatypes::schema::ColumnSchema;
915    use datatypes::value::ValueRef;
916    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
917    use mito_codec::row_converter::{
918        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
919    };
920    use store_api::codec::PrimaryKeyEncoding;
921    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
922    use store_api::storage::RegionId;
923
924    use super::*;
925    use crate::read::flat_projection::FlatProjectionMapper;
926    use crate::sst::parquet::flat_format::FlatReadFormat;
927    use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
928    use crate::test_util::{check_reader_result, VecBatchReader};
929
930    /// Creates a new [RegionMetadata].
931    fn new_metadata(
932        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
933        primary_key: &[ColumnId],
934    ) -> RegionMetadata {
935        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
936        for (id, semantic_type, data_type) in semantic_types {
937            let column_schema = match semantic_type {
938                SemanticType::Tag => {
939                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
940                }
941                SemanticType::Field => {
942                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
943                }
944                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
945            };
946
947            builder.push_column_metadata(ColumnMetadata {
948                column_schema,
949                semantic_type: *semantic_type,
950                column_id: *id,
951            });
952        }
953        builder.primary_key(primary_key.to_vec());
954        builder.build().unwrap()
955    }
956
957    /// Encode primary key.
958    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
959        let fields = (0..keys.len())
960            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
961            .collect();
962        let converter = DensePrimaryKeyCodec::with_fields(fields);
963        let row = keys.iter().map(|str_opt| match str_opt {
964            Some(v) => ValueRef::String(v),
965            None => ValueRef::Null,
966        });
967
968        converter.encode(row).unwrap()
969    }
970
971    /// Encode sparse primary key.
972    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
973        let fields = (0..keys.len())
974            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
975            .collect();
976        let converter = SparsePrimaryKeyCodec::with_fields(fields);
977        let row = keys
978            .iter()
979            .map(|(id, str_opt)| match str_opt {
980                Some(v) => (*id, ValueRef::String(v)),
981                None => (*id, ValueRef::Null),
982            })
983            .collect::<Vec<_>>();
984        let mut buffer = vec![];
985        converter.encode_value_refs(&row, &mut buffer).unwrap();
986        buffer
987    }
988
989    /// Creates a batch for specific primary `key`.
990    ///
991    /// `fields`: [(column_id of the field, is null)]
992    fn new_batch(
993        primary_key: &[u8],
994        fields: &[(ColumnId, bool)],
995        start_ts: i64,
996        num_rows: usize,
997    ) -> Batch {
998        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
999            start_ts..start_ts + num_rows as i64,
1000        ));
1001        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1002        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1003        let field_columns = fields
1004            .iter()
1005            .map(|(id, is_null)| {
1006                let data = if *is_null {
1007                    Arc::new(Int64Vector::from(vec![None; num_rows]))
1008                } else {
1009                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1010                };
1011                BatchColumn {
1012                    column_id: *id,
1013                    data,
1014                }
1015            })
1016            .collect();
1017        Batch::new(
1018            primary_key.to_vec(),
1019            timestamps,
1020            sequences,
1021            op_types,
1022            field_columns,
1023        )
1024        .unwrap()
1025    }
1026
1027    #[test]
1028    fn test_invalid_pk_len() {
1029        let reader_meta = new_metadata(
1030            &[
1031                (
1032                    0,
1033                    SemanticType::Timestamp,
1034                    ConcreteDataType::timestamp_millisecond_datatype(),
1035                ),
1036                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1037                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1038                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1039            ],
1040            &[1, 2],
1041        );
1042        let expect_meta = new_metadata(
1043            &[
1044                (
1045                    0,
1046                    SemanticType::Timestamp,
1047                    ConcreteDataType::timestamp_millisecond_datatype(),
1048                ),
1049                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1050                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1051            ],
1052            &[1],
1053        );
1054        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1055    }
1056
1057    #[test]
1058    fn test_different_pk() {
1059        let reader_meta = new_metadata(
1060            &[
1061                (
1062                    0,
1063                    SemanticType::Timestamp,
1064                    ConcreteDataType::timestamp_millisecond_datatype(),
1065                ),
1066                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1067                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1068                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1069            ],
1070            &[2, 1],
1071        );
1072        let expect_meta = new_metadata(
1073            &[
1074                (
1075                    0,
1076                    SemanticType::Timestamp,
1077                    ConcreteDataType::timestamp_millisecond_datatype(),
1078                ),
1079                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1080                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1081                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1082                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1083            ],
1084            &[1, 2, 4],
1085        );
1086        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1087    }
1088
1089    #[test]
1090    fn test_same_pk() {
1091        let reader_meta = new_metadata(
1092            &[
1093                (
1094                    0,
1095                    SemanticType::Timestamp,
1096                    ConcreteDataType::timestamp_millisecond_datatype(),
1097                ),
1098                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1099                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1100            ],
1101            &[1],
1102        );
1103        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
1104            .unwrap()
1105            .is_none());
1106    }
1107
1108    #[test]
1109    fn test_same_pk_encoding() {
1110        let reader_meta = Arc::new(new_metadata(
1111            &[
1112                (
1113                    0,
1114                    SemanticType::Timestamp,
1115                    ConcreteDataType::timestamp_millisecond_datatype(),
1116                ),
1117                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1118            ],
1119            &[1],
1120        ));
1121
1122        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
1123            .unwrap()
1124            .is_none());
1125    }
1126
1127    #[test]
1128    fn test_same_fields() {
1129        let reader_meta = Arc::new(new_metadata(
1130            &[
1131                (
1132                    0,
1133                    SemanticType::Timestamp,
1134                    ConcreteDataType::timestamp_millisecond_datatype(),
1135                ),
1136                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1137                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1138            ],
1139            &[1],
1140        ));
1141        let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1142        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1143    }
1144
1145    #[tokio::test]
1146    async fn test_compat_reader() {
1147        let reader_meta = Arc::new(new_metadata(
1148            &[
1149                (
1150                    0,
1151                    SemanticType::Timestamp,
1152                    ConcreteDataType::timestamp_millisecond_datatype(),
1153                ),
1154                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1155                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1156            ],
1157            &[1],
1158        ));
1159        let expect_meta = Arc::new(new_metadata(
1160            &[
1161                (
1162                    0,
1163                    SemanticType::Timestamp,
1164                    ConcreteDataType::timestamp_millisecond_datatype(),
1165                ),
1166                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1167                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1168                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1169                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1170            ],
1171            &[1, 3],
1172        ));
1173        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1174        let k1 = encode_key(&[Some("a")]);
1175        let k2 = encode_key(&[Some("b")]);
1176        let source_reader = VecBatchReader::new(&[
1177            new_batch(&k1, &[(2, false)], 1000, 3),
1178            new_batch(&k2, &[(2, false)], 1000, 3),
1179        ]);
1180
1181        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1182        let k1 = encode_key(&[Some("a"), None]);
1183        let k2 = encode_key(&[Some("b"), None]);
1184        check_reader_result(
1185            &mut compat_reader,
1186            &[
1187                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1188                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1189            ],
1190        )
1191        .await;
1192    }
1193
1194    #[tokio::test]
1195    async fn test_compat_reader_different_order() {
1196        let reader_meta = Arc::new(new_metadata(
1197            &[
1198                (
1199                    0,
1200                    SemanticType::Timestamp,
1201                    ConcreteDataType::timestamp_millisecond_datatype(),
1202                ),
1203                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1204                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1205            ],
1206            &[1],
1207        ));
1208        let expect_meta = Arc::new(new_metadata(
1209            &[
1210                (
1211                    0,
1212                    SemanticType::Timestamp,
1213                    ConcreteDataType::timestamp_millisecond_datatype(),
1214                ),
1215                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1216                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1217                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1218                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1219            ],
1220            &[1],
1221        ));
1222        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1223        let k1 = encode_key(&[Some("a")]);
1224        let k2 = encode_key(&[Some("b")]);
1225        let source_reader = VecBatchReader::new(&[
1226            new_batch(&k1, &[(2, false)], 1000, 3),
1227            new_batch(&k2, &[(2, false)], 1000, 3),
1228        ]);
1229
1230        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1231        check_reader_result(
1232            &mut compat_reader,
1233            &[
1234                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1235                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1236            ],
1237        )
1238        .await;
1239    }
1240
1241    #[tokio::test]
1242    async fn test_compat_reader_different_types() {
1243        let actual_meta = Arc::new(new_metadata(
1244            &[
1245                (
1246                    0,
1247                    SemanticType::Timestamp,
1248                    ConcreteDataType::timestamp_millisecond_datatype(),
1249                ),
1250                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1251                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1252            ],
1253            &[1],
1254        ));
1255        let expect_meta = Arc::new(new_metadata(
1256            &[
1257                (
1258                    0,
1259                    SemanticType::Timestamp,
1260                    ConcreteDataType::timestamp_millisecond_datatype(),
1261                ),
1262                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1263                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1264            ],
1265            &[1],
1266        ));
1267        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1268        let k1 = encode_key(&[Some("a")]);
1269        let k2 = encode_key(&[Some("b")]);
1270        let source_reader = VecBatchReader::new(&[
1271            new_batch(&k1, &[(2, false)], 1000, 3),
1272            new_batch(&k2, &[(2, false)], 1000, 3),
1273        ]);
1274
1275        let fn_batch_cast = |batch: Batch| {
1276            let mut new_fields = batch.fields.clone();
1277            new_fields[0].data = new_fields[0]
1278                .data
1279                .cast(&ConcreteDataType::string_datatype())
1280                .unwrap();
1281
1282            batch.with_fields(new_fields).unwrap()
1283        };
1284        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1285        check_reader_result(
1286            &mut compat_reader,
1287            &[
1288                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1289                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1290            ],
1291        )
1292        .await;
1293    }
1294
1295    #[tokio::test]
1296    async fn test_compat_reader_projection() {
1297        let reader_meta = Arc::new(new_metadata(
1298            &[
1299                (
1300                    0,
1301                    SemanticType::Timestamp,
1302                    ConcreteDataType::timestamp_millisecond_datatype(),
1303                ),
1304                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1305                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1306            ],
1307            &[1],
1308        ));
1309        let expect_meta = Arc::new(new_metadata(
1310            &[
1311                (
1312                    0,
1313                    SemanticType::Timestamp,
1314                    ConcreteDataType::timestamp_millisecond_datatype(),
1315                ),
1316                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1317                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1318                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1319                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1320            ],
1321            &[1],
1322        ));
1323        // tag_1, field_2, field_3
1324        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1325        let k1 = encode_key(&[Some("a")]);
1326        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1327
1328        let mut compat_reader =
1329            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1330        check_reader_result(
1331            &mut compat_reader,
1332            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1333        )
1334        .await;
1335
1336        // tag_1, field_4, field_3
1337        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1338        let k1 = encode_key(&[Some("a")]);
1339        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1340
1341        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1342        check_reader_result(
1343            &mut compat_reader,
1344            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1345        )
1346        .await;
1347    }
1348
1349    #[tokio::test]
1350    async fn test_compat_reader_different_pk_encoding() {
1351        let mut reader_meta = new_metadata(
1352            &[
1353                (
1354                    0,
1355                    SemanticType::Timestamp,
1356                    ConcreteDataType::timestamp_millisecond_datatype(),
1357                ),
1358                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1359                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1360            ],
1361            &[1],
1362        );
1363        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1364        let reader_meta = Arc::new(reader_meta);
1365        let mut expect_meta = new_metadata(
1366            &[
1367                (
1368                    0,
1369                    SemanticType::Timestamp,
1370                    ConcreteDataType::timestamp_millisecond_datatype(),
1371                ),
1372                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1373                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1374                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1375                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1376            ],
1377            &[1, 3],
1378        );
1379        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1380        let expect_meta = Arc::new(expect_meta);
1381
1382        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1383        let k1 = encode_key(&[Some("a")]);
1384        let k2 = encode_key(&[Some("b")]);
1385        let source_reader = VecBatchReader::new(&[
1386            new_batch(&k1, &[(2, false)], 1000, 3),
1387            new_batch(&k2, &[(2, false)], 1000, 3),
1388        ]);
1389
1390        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1391        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1392        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1393        check_reader_result(
1394            &mut compat_reader,
1395            &[
1396                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1397                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1398            ],
1399        )
1400        .await;
1401    }
1402
1403    /// Creates a primary key array for flat format testing.
1404    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1405        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1406        for &pk in primary_keys {
1407            builder.append(pk).unwrap();
1408        }
1409        Arc::new(builder.finish())
1410    }
1411
1412    #[test]
1413    fn test_flat_compat_batch_with_missing_columns() {
1414        let actual_metadata = Arc::new(new_metadata(
1415            &[
1416                (
1417                    0,
1418                    SemanticType::Timestamp,
1419                    ConcreteDataType::timestamp_millisecond_datatype(),
1420                ),
1421                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1422                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1423            ],
1424            &[1],
1425        ));
1426
1427        let expected_metadata = Arc::new(new_metadata(
1428            &[
1429                (
1430                    0,
1431                    SemanticType::Timestamp,
1432                    ConcreteDataType::timestamp_millisecond_datatype(),
1433                ),
1434                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1435                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1436                // Adds a new field.
1437                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1438            ],
1439            &[1],
1440        ));
1441
1442        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1443        let read_format =
1444            FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
1445        let format_projection = read_format.format_projection();
1446
1447        let compat_batch =
1448            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
1449
1450        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1451        tag_builder.append_value("tag1");
1452        tag_builder.append_value("tag1");
1453        let tag_dict_array = Arc::new(tag_builder.finish());
1454
1455        let k1 = encode_key(&[Some("tag1")]);
1456        let input_columns: Vec<ArrayRef> = vec![
1457            tag_dict_array.clone(),
1458            Arc::new(Int64Array::from(vec![100, 200])),
1459            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1460            build_flat_test_pk_array(&[&k1, &k1]),
1461            Arc::new(UInt64Array::from_iter_values([1, 2])),
1462            Arc::new(UInt8Array::from_iter_values([
1463                OpType::Put as u8,
1464                OpType::Put as u8,
1465            ])),
1466        ];
1467        let input_schema =
1468            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1469        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1470
1471        let result = compat_batch.compat(input_batch).unwrap();
1472
1473        let expected_schema =
1474            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1475
1476        let expected_columns: Vec<ArrayRef> = vec![
1477            tag_dict_array.clone(),
1478            Arc::new(Int64Array::from(vec![100, 200])),
1479            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1480            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1481            build_flat_test_pk_array(&[&k1, &k1]),
1482            Arc::new(UInt64Array::from_iter_values([1, 2])),
1483            Arc::new(UInt8Array::from_iter_values([
1484                OpType::Put as u8,
1485                OpType::Put as u8,
1486            ])),
1487        ];
1488        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1489
1490        assert_eq!(expected_batch, result);
1491    }
1492
1493    #[test]
1494    fn test_flat_compat_batch_with_different_pk_encoding() {
1495        let mut actual_metadata = new_metadata(
1496            &[
1497                (
1498                    0,
1499                    SemanticType::Timestamp,
1500                    ConcreteDataType::timestamp_millisecond_datatype(),
1501                ),
1502                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1503                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1504            ],
1505            &[1],
1506        );
1507        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1508        let actual_metadata = Arc::new(actual_metadata);
1509
1510        let mut expected_metadata = new_metadata(
1511            &[
1512                (
1513                    0,
1514                    SemanticType::Timestamp,
1515                    ConcreteDataType::timestamp_millisecond_datatype(),
1516                ),
1517                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1518                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1519                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1520            ],
1521            &[1, 3],
1522        );
1523        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1524        let expected_metadata = Arc::new(expected_metadata);
1525
1526        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1527        let read_format =
1528            FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
1529        let format_projection = read_format.format_projection();
1530
1531        let compat_batch =
1532            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
1533
1534        // Tag array.
1535        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1536        tag1_builder.append_value("tag1");
1537        tag1_builder.append_value("tag1");
1538        let tag1_dict_array = Arc::new(tag1_builder.finish());
1539
1540        let k1 = encode_key(&[Some("tag1")]);
1541        let input_columns: Vec<ArrayRef> = vec![
1542            tag1_dict_array.clone(),
1543            Arc::new(Int64Array::from(vec![100, 200])),
1544            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1545            build_flat_test_pk_array(&[&k1, &k1]),
1546            Arc::new(UInt64Array::from_iter_values([1, 2])),
1547            Arc::new(UInt8Array::from_iter_values([
1548                OpType::Put as u8,
1549                OpType::Put as u8,
1550            ])),
1551        ];
1552        let input_schema =
1553            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1554        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1555
1556        let result = compat_batch.compat(input_batch).unwrap();
1557
1558        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1559        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1560        null_tag_builder.append_nulls(2);
1561        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1562        let expected_columns: Vec<ArrayRef> = vec![
1563            tag1_dict_array.clone(),
1564            null_tag_dict_array,
1565            Arc::new(Int64Array::from(vec![100, 200])),
1566            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1567            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1568            Arc::new(UInt64Array::from_iter_values([1, 2])),
1569            Arc::new(UInt8Array::from_iter_values([
1570                OpType::Put as u8,
1571                OpType::Put as u8,
1572            ])),
1573        ];
1574        let output_schema =
1575            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1576        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1577
1578        assert_eq!(expected_batch, result);
1579    }
1580}