mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
33    build_primary_key_codec_with_fields,
34};
35use snafu::{OptionExt, ResultExt, ensure};
36use store_api::codec::PrimaryKeyEncoding;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39
40use crate::error::{
41    CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
42    NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
43};
44use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
45use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
46use crate::read::{Batch, BatchColumn, BatchReader};
47use crate::sst::parquet::flat_format::primary_key_column_index;
48use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
50
51/// Reader to adapt schema of underlying reader to expected schema.
52pub struct CompatReader<R> {
53    /// Underlying reader.
54    reader: R,
55    /// Helper to compat batches.
56    compat: PrimaryKeyCompatBatch,
57}
58
59impl<R> CompatReader<R> {
60    /// Creates a new compat reader.
61    /// - `mapper` is built from the metadata users expect to see.
62    /// - `reader_meta` is the metadata of the input reader.
63    /// - `reader` is the input reader.
64    pub fn new(
65        mapper: &ProjectionMapper,
66        reader_meta: RegionMetadataRef,
67        reader: R,
68    ) -> Result<CompatReader<R>> {
69        Ok(CompatReader {
70            reader,
71            compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
72        })
73    }
74}
75
76#[async_trait::async_trait]
77impl<R: BatchReader> BatchReader for CompatReader<R> {
78    async fn next_batch(&mut self) -> Result<Option<Batch>> {
79        let Some(mut batch) = self.reader.next_batch().await? else {
80            return Ok(None);
81        };
82
83        batch = self.compat.compat_batch(batch)?;
84
85        Ok(Some(batch))
86    }
87}
88
89/// Helper to adapt schema of the batch to an expected schema.
90pub(crate) enum CompatBatch {
91    /// Adapter for primary key format.
92    PrimaryKey(PrimaryKeyCompatBatch),
93    /// Adapter for flat format.
94    Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98    /// Returns the inner primary key batch adapter if this is a PrimaryKey format.
99    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100        match self {
101            CompatBatch::PrimaryKey(batch) => Some(batch),
102            _ => None,
103        }
104    }
105
106    /// Returns the inner flat batch adapter if this is a Flat format.
107    pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
108        match self {
109            CompatBatch::Flat(batch) => Some(batch),
110            _ => None,
111        }
112    }
113}
114
115/// A helper struct to adapt schema of the batch to an expected schema.
116pub(crate) struct PrimaryKeyCompatBatch {
117    /// Optional primary key adapter.
118    rewrite_pk: Option<RewritePrimaryKey>,
119    /// Optional primary key adapter.
120    compat_pk: Option<CompatPrimaryKey>,
121    /// Optional fields adapter.
122    compat_fields: Option<CompatFields>,
123}
124
125impl PrimaryKeyCompatBatch {
126    /// Creates a new [CompatBatch].
127    /// - `mapper` is built from the metadata users expect to see.
128    /// - `reader_meta` is the metadata of the input reader.
129    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
130        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
131        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
132        let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
133            reason: "Unexpected format",
134        })?;
135        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
136
137        Ok(Self {
138            rewrite_pk,
139            compat_pk,
140            compat_fields,
141        })
142    }
143
144    /// Adapts the `batch` to the expected schema.
145    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
146        if let Some(rewrite_pk) = &self.rewrite_pk {
147            batch = rewrite_pk.compat(batch)?;
148        }
149        if let Some(compat_pk) = &self.compat_pk {
150            batch = compat_pk.compat(batch)?;
151        }
152        if let Some(compat_fields) = &self.compat_fields {
153            batch = compat_fields.compat(batch);
154        }
155
156        Ok(batch)
157    }
158}
159
160/// Returns true if `left` and `right` have same columns and primary key encoding.
161pub(crate) fn has_same_columns_and_pk_encoding(
162    left: &RegionMetadata,
163    right: &RegionMetadata,
164) -> bool {
165    if left.primary_key_encoding != right.primary_key_encoding {
166        return false;
167    }
168
169    if left.column_metadatas.len() != right.column_metadatas.len() {
170        return false;
171    }
172
173    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
174        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
175            return false;
176        }
177        debug_assert_eq!(
178            left_col.column_schema.data_type,
179            right_col.column_schema.data_type
180        );
181        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
182    }
183
184    true
185}
186
187/// A helper struct to adapt schema of the batch to an expected schema.
188pub(crate) struct FlatCompatBatch {
189    /// Indices to convert actual fields to expect fields.
190    index_or_defaults: Vec<IndexOrDefault>,
191    /// Expected arrow schema.
192    arrow_schema: SchemaRef,
193    /// Primary key adapter.
194    compat_pk: FlatCompatPrimaryKey,
195}
196
197impl FlatCompatBatch {
198    /// Creates a [FlatCompatBatch].
199    ///
200    /// - `mapper` is built from the metadata users expect to see.
201    /// - `actual` is the [RegionMetadata] of the input parquet.
202    /// - `format_projection` is the projection of the read format for the input parquet.
203    /// - `compaction` indicates whether the reader is for compaction.
204    pub(crate) fn try_new(
205        mapper: &FlatProjectionMapper,
206        actual: &RegionMetadataRef,
207        format_projection: &FormatProjection,
208        compaction: bool,
209    ) -> Result<Option<Self>> {
210        let actual_schema = flat_projected_columns(actual, format_projection);
211        let expect_schema = mapper.batch_schema();
212        if expect_schema == actual_schema {
213            // Although the SST has a different schema, but the schema after projection is the same
214            // as expected schema.
215            return Ok(None);
216        }
217
218        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
219            // Special handling for sparse encoding in compaction.
220            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
221        }
222
223        let (index_or_defaults, fields) =
224            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
225
226        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
227
228        Ok(Some(Self {
229            index_or_defaults,
230            arrow_schema: Arc::new(Schema::new(fields)),
231            compat_pk,
232        }))
233    }
234
235    fn compute_index_and_fields(
236        actual_schema: &[(ColumnId, ConcreteDataType)],
237        expect_schema: &[(ColumnId, ConcreteDataType)],
238        expect_metadata: &RegionMetadata,
239    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
240        // Maps column id to the index and data type in the actual schema.
241        let actual_schema_index: HashMap<_, _> = actual_schema
242            .iter()
243            .enumerate()
244            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
245            .collect();
246
247        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
248        let mut fields = Vec::with_capacity(expect_schema.len());
249        for (column_id, expect_data_type) in expect_schema {
250            // Safety: expect_schema comes from the same mapper.
251            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
252            let expect_column = &expect_metadata.column_metadatas[column_index];
253            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
254            // For tag columns, we need to create a dictionary field.
255            if expect_column.semantic_type == SemanticType::Tag {
256                fields.push(tag_maybe_to_dictionary_field(
257                    &expect_column.column_schema.data_type,
258                    column_field,
259                ));
260            } else {
261                fields.push(column_field.clone());
262            };
263
264            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
265                let mut cast_type = None;
266
267                // Same column different type.
268                if expect_data_type != *actual_data_type {
269                    cast_type = Some(expect_data_type.clone())
270                }
271                // Source has this column.
272                index_or_defaults.push(IndexOrDefault::Index {
273                    pos: *index,
274                    cast_type,
275                });
276            } else {
277                // Create a default vector with 1 element for that column.
278                let default_vector = expect_column
279                    .column_schema
280                    .create_default_vector(1)
281                    .context(CreateDefaultSnafu {
282                        region_id: expect_metadata.region_id,
283                        column: &expect_column.column_schema.name,
284                    })?
285                    .with_context(|| CompatReaderSnafu {
286                        region_id: expect_metadata.region_id,
287                        reason: format!(
288                            "column {} does not have a default value to read",
289                            expect_column.column_schema.name
290                        ),
291                    })?;
292                index_or_defaults.push(IndexOrDefault::DefaultValue {
293                    column_id: expect_column.column_id,
294                    default_vector,
295                    semantic_type: expect_column.semantic_type,
296                });
297            };
298        }
299        fields.extend_from_slice(&internal_fields());
300
301        Ok((index_or_defaults, fields))
302    }
303
304    fn try_new_compact_sparse(
305        mapper: &FlatProjectionMapper,
306        actual: &RegionMetadataRef,
307    ) -> Result<Option<Self>> {
308        // Currently, we don't support converting sparse encoding back to dense encoding in
309        // flat format.
310        ensure!(
311            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
312            UnsupportedOperationSnafu {
313                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
314            }
315        );
316
317        // For sparse encoding, we don't need to check the primary keys.
318        // Since this is for compaction, we always read all columns.
319        let actual_schema: Vec<_> = actual
320            .field_columns()
321            .chain([actual.time_index_column()])
322            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
323            .collect();
324        let expect_schema: Vec<_> = mapper
325            .metadata()
326            .field_columns()
327            .chain([mapper.metadata().time_index_column()])
328            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
329            .collect();
330
331        let (index_or_defaults, fields) =
332            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
333
334        let compat_pk = FlatCompatPrimaryKey::default();
335
336        Ok(Some(Self {
337            index_or_defaults,
338            arrow_schema: Arc::new(Schema::new(fields)),
339            compat_pk,
340        }))
341    }
342
343    /// Make columns of the `batch` compatible.
344    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
345        let len = batch.num_rows();
346        let columns = self
347            .index_or_defaults
348            .iter()
349            .map(|index_or_default| match index_or_default {
350                IndexOrDefault::Index { pos, cast_type } => {
351                    let old_column = batch.column(*pos);
352
353                    if let Some(ty) = cast_type {
354                        // Safety: We ensure type can be converted and the new batch should be valid.
355                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
356                        let casted =
357                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
358                                .context(ComputeArrowSnafu)?;
359                        Ok(casted)
360                    } else {
361                        Ok(old_column.clone())
362                    }
363                }
364                IndexOrDefault::DefaultValue {
365                    column_id: _,
366                    default_vector,
367                    semantic_type,
368                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
369            })
370            .chain(
371                // Adds internal columns.
372                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
373                    .iter()
374                    .map(|col| Ok(col.clone())),
375            )
376            .collect::<Result<Vec<_>>>()?;
377
378        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
379            .context(NewRecordBatchSnafu)?;
380
381        // Handles primary keys.
382        self.compat_pk.compat(compat_batch)
383    }
384}
385
386/// Repeats the vector value `to_len` times.
387fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
388    assert_eq!(1, vector.len());
389    if is_tag {
390        let values = vector.to_arrow_array();
391        if values.is_null(0) {
392            // Creates a dictionary array with `to_len` null keys.
393            let keys = UInt32Array::new_null(to_len);
394            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
395        } else {
396            let keys = UInt32Array::from_value(0, to_len);
397            Ok(Arc::new(DictionaryArray::new(keys, values)))
398        }
399    } else {
400        let keys = UInt32Array::from_value(0, to_len);
401        take(
402            &vector.to_arrow_array(),
403            &keys,
404            Some(TakeOptions {
405                check_bounds: false,
406            }),
407        )
408        .context(ComputeArrowSnafu)
409    }
410}
411
412/// Helper to make primary key compatible.
413#[derive(Debug)]
414struct CompatPrimaryKey {
415    /// Row converter to append values to primary keys.
416    converter: Arc<dyn PrimaryKeyCodec>,
417    /// Default values to append.
418    values: Vec<(ColumnId, Value)>,
419}
420
421impl CompatPrimaryKey {
422    /// Make primary key of the `batch` compatible.
423    fn compat(&self, mut batch: Batch) -> Result<Batch> {
424        let mut buffer = Vec::with_capacity(
425            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
426        );
427        buffer.extend_from_slice(batch.primary_key());
428        self.converter
429            .encode_values(&self.values, &mut buffer)
430            .context(EncodeSnafu)?;
431
432        batch.set_primary_key(buffer);
433
434        // update cache
435        if let Some(pk_values) = &mut batch.pk_values {
436            pk_values.extend(&self.values);
437        }
438
439        Ok(batch)
440    }
441}
442
443/// Helper to make fields compatible.
444#[derive(Debug)]
445struct CompatFields {
446    /// Column Ids and DataTypes the reader actually returns.
447    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
448    /// Indices to convert actual fields to expect fields.
449    index_or_defaults: Vec<IndexOrDefault>,
450}
451
452impl CompatFields {
453    /// Make fields of the `batch` compatible.
454    #[must_use]
455    fn compat(&self, batch: Batch) -> Batch {
456        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
457        debug_assert!(
458            self.actual_fields
459                .iter()
460                .zip(batch.fields())
461                .all(|((id, _), batch_column)| *id == batch_column.column_id)
462        );
463
464        let len = batch.num_rows();
465        let fields = self
466            .index_or_defaults
467            .iter()
468            .map(|index_or_default| match index_or_default {
469                IndexOrDefault::Index { pos, cast_type } => {
470                    let old_column = &batch.fields()[*pos];
471
472                    let data = if let Some(ty) = cast_type {
473                        // Safety: We ensure type can be converted and the new batch should be valid.
474                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
475                        old_column.data.cast(ty).unwrap()
476                    } else {
477                        old_column.data.clone()
478                    };
479                    BatchColumn {
480                        column_id: old_column.column_id,
481                        data,
482                    }
483                }
484                IndexOrDefault::DefaultValue {
485                    column_id,
486                    default_vector,
487                    semantic_type: _,
488                } => {
489                    let data = default_vector.replicate(&[len]);
490                    BatchColumn {
491                        column_id: *column_id,
492                        data,
493                    }
494                }
495            })
496            .collect();
497
498        // Safety: We ensure all columns have the same length and the new batch should be valid.
499        batch.with_fields(fields).unwrap()
500    }
501}
502
503fn may_rewrite_primary_key(
504    expect: &RegionMetadata,
505    actual: &RegionMetadata,
506) -> Option<RewritePrimaryKey> {
507    if expect.primary_key_encoding == actual.primary_key_encoding {
508        return None;
509    }
510
511    let fields = expect.primary_key.clone();
512    let original = build_primary_key_codec(actual);
513    let new = build_primary_key_codec(expect);
514
515    Some(RewritePrimaryKey {
516        original,
517        new,
518        fields,
519    })
520}
521
522/// Returns true if the actual primary keys is the same as expected.
523fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
524    ensure!(
525        actual.primary_key.len() <= expect.primary_key.len(),
526        CompatReaderSnafu {
527            region_id: expect.region_id,
528            reason: format!(
529                "primary key has more columns {} than expect {}",
530                actual.primary_key.len(),
531                expect.primary_key.len()
532            ),
533        }
534    );
535    ensure!(
536        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
537        CompatReaderSnafu {
538            region_id: expect.region_id,
539            reason: format!(
540                "primary key has different prefix, expect: {:?}, actual: {:?}",
541                expect.primary_key, actual.primary_key
542            ),
543        }
544    );
545
546    Ok(actual.primary_key.len() == expect.primary_key.len())
547}
548
549/// Creates a [CompatPrimaryKey] if needed.
550fn may_compat_primary_key(
551    expect: &RegionMetadata,
552    actual: &RegionMetadata,
553) -> Result<Option<CompatPrimaryKey>> {
554    if is_primary_key_same(expect, actual)? {
555        return Ok(None);
556    }
557
558    // We need to append default values to the primary key.
559    let to_add = &expect.primary_key[actual.primary_key.len()..];
560    let mut fields = Vec::with_capacity(to_add.len());
561    let mut values = Vec::with_capacity(to_add.len());
562    for column_id in to_add {
563        // Safety: The id comes from expect region metadata.
564        let column = expect.column_by_id(*column_id).unwrap();
565        fields.push((
566            *column_id,
567            SortField::new(column.column_schema.data_type.clone()),
568        ));
569        let default_value = column
570            .column_schema
571            .create_default()
572            .context(CreateDefaultSnafu {
573                region_id: expect.region_id,
574                column: &column.column_schema.name,
575            })?
576            .with_context(|| CompatReaderSnafu {
577                region_id: expect.region_id,
578                reason: format!(
579                    "key column {} does not have a default value to read",
580                    column.column_schema.name
581                ),
582            })?;
583        values.push((*column_id, default_value));
584    }
585    // Using expect primary key encoding to build the converter
586    let converter =
587        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
588
589    Ok(Some(CompatPrimaryKey { converter, values }))
590}
591
592/// Creates a [CompatFields] if needed.
593fn may_compat_fields(
594    mapper: &PrimaryKeyProjectionMapper,
595    actual: &RegionMetadata,
596) -> Result<Option<CompatFields>> {
597    let expect_fields = mapper.batch_fields();
598    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
599    if expect_fields == actual_fields {
600        return Ok(None);
601    }
602
603    let source_field_index: HashMap<_, _> = actual_fields
604        .iter()
605        .enumerate()
606        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
607        .collect();
608
609    let index_or_defaults = expect_fields
610        .iter()
611        .map(|(column_id, expect_data_type)| {
612            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
613                let mut cast_type = None;
614
615                if expect_data_type != *actual_data_type {
616                    cast_type = Some(expect_data_type.clone())
617                }
618                // Source has this field.
619                Ok(IndexOrDefault::Index {
620                    pos: *index,
621                    cast_type,
622                })
623            } else {
624                // Safety: mapper must have this column.
625                let column = mapper.metadata().column_by_id(*column_id).unwrap();
626                // Create a default vector with 1 element for that column.
627                let default_vector = column
628                    .column_schema
629                    .create_default_vector(1)
630                    .context(CreateDefaultSnafu {
631                        region_id: mapper.metadata().region_id,
632                        column: &column.column_schema.name,
633                    })?
634                    .with_context(|| CompatReaderSnafu {
635                        region_id: mapper.metadata().region_id,
636                        reason: format!(
637                            "column {} does not have a default value to read",
638                            column.column_schema.name
639                        ),
640                    })?;
641                Ok(IndexOrDefault::DefaultValue {
642                    column_id: column.column_id,
643                    default_vector,
644                    semantic_type: SemanticType::Field,
645                })
646            }
647        })
648        .collect::<Result<Vec<_>>>()?;
649
650    Ok(Some(CompatFields {
651        actual_fields,
652        index_or_defaults,
653    }))
654}
655
656/// Index in source batch or a default value to fill a column.
657#[derive(Debug)]
658enum IndexOrDefault {
659    /// Index of the column in source batch.
660    Index {
661        pos: usize,
662        cast_type: Option<ConcreteDataType>,
663    },
664    /// Default value for the column.
665    DefaultValue {
666        /// Id of the column.
667        column_id: ColumnId,
668        /// Default value. The vector has only 1 element.
669        default_vector: VectorRef,
670        /// Semantic type of the column.
671        semantic_type: SemanticType,
672    },
673}
674
675/// Adapter to rewrite primary key.
676struct RewritePrimaryKey {
677    /// Original primary key codec.
678    original: Arc<dyn PrimaryKeyCodec>,
679    /// New primary key codec.
680    new: Arc<dyn PrimaryKeyCodec>,
681    /// Order of the fields in the new primary key.
682    fields: Vec<ColumnId>,
683}
684
685impl RewritePrimaryKey {
686    /// Make primary key of the `batch` compatible.
687    fn compat(&self, mut batch: Batch) -> Result<Batch> {
688        if batch.pk_values().is_none() {
689            let new_pk_values = self
690                .original
691                .decode(batch.primary_key())
692                .context(DecodeSnafu)?;
693            batch.set_pk_values(new_pk_values);
694        }
695        // Safety: We ensure pk_values is not None.
696        let values = batch.pk_values().unwrap();
697
698        let mut buffer = Vec::with_capacity(
699            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
700        );
701        match values {
702            CompositeValues::Dense(values) => {
703                self.new
704                    .encode_values(values.as_slice(), &mut buffer)
705                    .context(EncodeSnafu)?;
706            }
707            CompositeValues::Sparse(values) => {
708                let values = self
709                    .fields
710                    .iter()
711                    .map(|id| {
712                        let value = values.get_or_null(*id);
713                        (*id, value.as_value_ref())
714                    })
715                    .collect::<Vec<_>>();
716                self.new
717                    .encode_value_refs(&values, &mut buffer)
718                    .context(EncodeSnafu)?;
719            }
720        }
721        batch.set_primary_key(buffer);
722
723        Ok(batch)
724    }
725}
726
727/// Helper to rewrite primary key to another encoding for flat format.
728struct FlatRewritePrimaryKey {
729    /// New primary key encoder.
730    codec: Arc<dyn PrimaryKeyCodec>,
731    /// Metadata of the expected region.
732    metadata: RegionMetadataRef,
733    /// Original primary key codec.
734    /// If we need to rewrite the primary key.
735    old_codec: Arc<dyn PrimaryKeyCodec>,
736}
737
738impl FlatRewritePrimaryKey {
739    fn new(
740        expect: &RegionMetadataRef,
741        actual: &RegionMetadataRef,
742    ) -> Option<FlatRewritePrimaryKey> {
743        if expect.primary_key_encoding == actual.primary_key_encoding {
744            return None;
745        }
746        let codec = build_primary_key_codec(expect);
747        let old_codec = build_primary_key_codec(actual);
748
749        Some(FlatRewritePrimaryKey {
750            codec,
751            metadata: expect.clone(),
752            old_codec,
753        })
754    }
755
756    /// Rewrites the primary key of the `batch`.
757    /// It also appends the values to the primary key.
758    fn rewrite_key(
759        &self,
760        append_values: &[(ColumnId, Value)],
761        batch: RecordBatch,
762    ) -> Result<RecordBatch> {
763        let old_pk_dict_array = batch
764            .column(primary_key_column_index(batch.num_columns()))
765            .as_any()
766            .downcast_ref::<PrimaryKeyArray>()
767            .unwrap();
768        let old_pk_values_array = old_pk_dict_array
769            .values()
770            .as_any()
771            .downcast_ref::<BinaryArray>()
772            .unwrap();
773        let mut builder = BinaryBuilder::with_capacity(
774            old_pk_values_array.len(),
775            old_pk_values_array.value_data().len(),
776        );
777
778        // Binary buffer for the primary key.
779        let mut buffer = Vec::with_capacity(
780            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
781        );
782        let mut column_id_values = Vec::new();
783        // Iterates the binary array and rewrites the primary key.
784        for value in old_pk_values_array.iter() {
785            let Some(old_pk) = value else {
786                builder.append_null();
787                continue;
788            };
789            // Decodes the old primary key.
790            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
791            pk_values.extend(append_values);
792
793            buffer.clear();
794            column_id_values.clear();
795            // Encodes the new primary key.
796            match pk_values {
797                CompositeValues::Dense(dense_values) => {
798                    self.codec
799                        .encode_values(dense_values.as_slice(), &mut buffer)
800                        .context(EncodeSnafu)?;
801                }
802                CompositeValues::Sparse(sparse_values) => {
803                    for id in &self.metadata.primary_key {
804                        let value = sparse_values.get_or_null(*id);
805                        column_id_values.push((*id, value.clone()));
806                    }
807                    self.codec
808                        .encode_values(&column_id_values, &mut buffer)
809                        .context(EncodeSnafu)?;
810                }
811            }
812            builder.append_value(&buffer);
813        }
814        let new_pk_values_array = Arc::new(builder.finish());
815        let new_pk_dict_array =
816            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
817
818        let mut columns = batch.columns().to_vec();
819        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
820
821        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
822    }
823}
824
825/// Helper to make primary key compatible for flat format.
826#[derive(Default)]
827struct FlatCompatPrimaryKey {
828    /// Primary key rewriter.
829    rewriter: Option<FlatRewritePrimaryKey>,
830    /// Converter to append values to primary keys.
831    converter: Option<Arc<dyn PrimaryKeyCodec>>,
832    /// Default values to append.
833    values: Vec<(ColumnId, Value)>,
834}
835
836impl FlatCompatPrimaryKey {
837    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
838        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
839
840        if is_primary_key_same(expect, actual)? {
841            return Ok(Self {
842                rewriter,
843                converter: None,
844                values: Vec::new(),
845            });
846        }
847
848        // We need to append default values to the primary key.
849        let to_add = &expect.primary_key[actual.primary_key.len()..];
850        let mut values = Vec::with_capacity(to_add.len());
851        let mut fields = Vec::with_capacity(to_add.len());
852        for column_id in to_add {
853            // Safety: The id comes from expect region metadata.
854            let column = expect.column_by_id(*column_id).unwrap();
855            fields.push((
856                *column_id,
857                SortField::new(column.column_schema.data_type.clone()),
858            ));
859            let default_value = column
860                .column_schema
861                .create_default()
862                .context(CreateDefaultSnafu {
863                    region_id: expect.region_id,
864                    column: &column.column_schema.name,
865                })?
866                .with_context(|| CompatReaderSnafu {
867                    region_id: expect.region_id,
868                    reason: format!(
869                        "key column {} does not have a default value to read",
870                        column.column_schema.name
871                    ),
872                })?;
873            values.push((*column_id, default_value));
874        }
875        // is_primary_key_same() is false so we have different number of primary key columns.
876        debug_assert!(!fields.is_empty());
877
878        // Create converter to append values.
879        let converter = Some(build_primary_key_codec_with_fields(
880            expect.primary_key_encoding,
881            fields.into_iter(),
882        ));
883
884        Ok(Self {
885            rewriter,
886            converter,
887            values,
888        })
889    }
890
891    /// Makes primary key of the `batch` compatible.
892    ///
893    /// Callers must ensure other columns except the `__primary_key` column is compatible.
894    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
895        if let Some(rewriter) = &self.rewriter {
896            // If we have different encoding, rewrite the whole primary key.
897            return rewriter.rewrite_key(&self.values, batch);
898        }
899
900        self.append_key(batch)
901    }
902
903    /// Appends values to the primary key of the `batch`.
904    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
905        let Some(converter) = &self.converter else {
906            return Ok(batch);
907        };
908
909        let old_pk_dict_array = batch
910            .column(primary_key_column_index(batch.num_columns()))
911            .as_any()
912            .downcast_ref::<PrimaryKeyArray>()
913            .unwrap();
914        let old_pk_values_array = old_pk_dict_array
915            .values()
916            .as_any()
917            .downcast_ref::<BinaryArray>()
918            .unwrap();
919        let mut builder = BinaryBuilder::with_capacity(
920            old_pk_values_array.len(),
921            old_pk_values_array.value_data().len()
922                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
923        );
924
925        // Binary buffer for the primary key.
926        let mut buffer = Vec::with_capacity(
927            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
928                + converter.estimated_size().unwrap_or_default(),
929        );
930
931        // Iterates the binary array and appends values to the primary key.
932        for value in old_pk_values_array.iter() {
933            let Some(old_pk) = value else {
934                builder.append_null();
935                continue;
936            };
937
938            buffer.clear();
939            buffer.extend_from_slice(old_pk);
940            converter
941                .encode_values(&self.values, &mut buffer)
942                .context(EncodeSnafu)?;
943
944            builder.append_value(&buffer);
945        }
946
947        let new_pk_values_array = Arc::new(builder.finish());
948        let new_pk_dict_array =
949            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
950
951        // Overrides the primary key column.
952        let mut columns = batch.columns().to_vec();
953        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
954
955        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
956    }
957}
958
959#[cfg(test)]
960mod tests {
961    use std::sync::Arc;
962
963    use api::v1::{OpType, SemanticType};
964    use datatypes::arrow::array::{
965        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
966        TimestampMillisecondArray, UInt8Array, UInt64Array,
967    };
968    use datatypes::arrow::datatypes::UInt32Type;
969    use datatypes::arrow::record_batch::RecordBatch;
970    use datatypes::prelude::ConcreteDataType;
971    use datatypes::schema::ColumnSchema;
972    use datatypes::value::ValueRef;
973    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
974    use mito_codec::row_converter::{
975        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
976    };
977    use store_api::codec::PrimaryKeyEncoding;
978    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
979    use store_api::storage::RegionId;
980
981    use super::*;
982    use crate::read::flat_projection::FlatProjectionMapper;
983    use crate::sst::parquet::flat_format::FlatReadFormat;
984    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
985    use crate::test_util::{VecBatchReader, check_reader_result};
986
987    /// Creates a new [RegionMetadata].
988    fn new_metadata(
989        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
990        primary_key: &[ColumnId],
991    ) -> RegionMetadata {
992        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
993        for (id, semantic_type, data_type) in semantic_types {
994            let column_schema = match semantic_type {
995                SemanticType::Tag => {
996                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
997                }
998                SemanticType::Field => {
999                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1000                }
1001                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1002            };
1003
1004            builder.push_column_metadata(ColumnMetadata {
1005                column_schema,
1006                semantic_type: *semantic_type,
1007                column_id: *id,
1008            });
1009        }
1010        builder.primary_key(primary_key.to_vec());
1011        builder.build().unwrap()
1012    }
1013
1014    /// Encode primary key.
1015    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1016        let fields = (0..keys.len())
1017            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1018            .collect();
1019        let converter = DensePrimaryKeyCodec::with_fields(fields);
1020        let row = keys.iter().map(|str_opt| match str_opt {
1021            Some(v) => ValueRef::String(v),
1022            None => ValueRef::Null,
1023        });
1024
1025        converter.encode(row).unwrap()
1026    }
1027
1028    /// Encode sparse primary key.
1029    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1030        let fields = (0..keys.len())
1031            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1032            .collect();
1033        let converter = SparsePrimaryKeyCodec::with_fields(fields);
1034        let row = keys
1035            .iter()
1036            .map(|(id, str_opt)| match str_opt {
1037                Some(v) => (*id, ValueRef::String(v)),
1038                None => (*id, ValueRef::Null),
1039            })
1040            .collect::<Vec<_>>();
1041        let mut buffer = vec![];
1042        converter.encode_value_refs(&row, &mut buffer).unwrap();
1043        buffer
1044    }
1045
1046    /// Creates a batch for specific primary `key`.
1047    ///
1048    /// `fields`: [(column_id of the field, is null)]
1049    fn new_batch(
1050        primary_key: &[u8],
1051        fields: &[(ColumnId, bool)],
1052        start_ts: i64,
1053        num_rows: usize,
1054    ) -> Batch {
1055        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1056            start_ts..start_ts + num_rows as i64,
1057        ));
1058        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1059        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1060        let field_columns = fields
1061            .iter()
1062            .map(|(id, is_null)| {
1063                let data = if *is_null {
1064                    Arc::new(Int64Vector::from(vec![None; num_rows]))
1065                } else {
1066                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1067                };
1068                BatchColumn {
1069                    column_id: *id,
1070                    data,
1071                }
1072            })
1073            .collect();
1074        Batch::new(
1075            primary_key.to_vec(),
1076            timestamps,
1077            sequences,
1078            op_types,
1079            field_columns,
1080        )
1081        .unwrap()
1082    }
1083
1084    #[test]
1085    fn test_invalid_pk_len() {
1086        let reader_meta = new_metadata(
1087            &[
1088                (
1089                    0,
1090                    SemanticType::Timestamp,
1091                    ConcreteDataType::timestamp_millisecond_datatype(),
1092                ),
1093                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1094                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1096            ],
1097            &[1, 2],
1098        );
1099        let expect_meta = new_metadata(
1100            &[
1101                (
1102                    0,
1103                    SemanticType::Timestamp,
1104                    ConcreteDataType::timestamp_millisecond_datatype(),
1105                ),
1106                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1107                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1108            ],
1109            &[1],
1110        );
1111        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1112    }
1113
1114    #[test]
1115    fn test_different_pk() {
1116        let reader_meta = new_metadata(
1117            &[
1118                (
1119                    0,
1120                    SemanticType::Timestamp,
1121                    ConcreteDataType::timestamp_millisecond_datatype(),
1122                ),
1123                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1124                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1125                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1126            ],
1127            &[2, 1],
1128        );
1129        let expect_meta = new_metadata(
1130            &[
1131                (
1132                    0,
1133                    SemanticType::Timestamp,
1134                    ConcreteDataType::timestamp_millisecond_datatype(),
1135                ),
1136                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1137                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1138                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1139                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1140            ],
1141            &[1, 2, 4],
1142        );
1143        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1144    }
1145
1146    #[test]
1147    fn test_same_pk() {
1148        let reader_meta = new_metadata(
1149            &[
1150                (
1151                    0,
1152                    SemanticType::Timestamp,
1153                    ConcreteDataType::timestamp_millisecond_datatype(),
1154                ),
1155                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1156                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1157            ],
1158            &[1],
1159        );
1160        assert!(
1161            may_compat_primary_key(&reader_meta, &reader_meta)
1162                .unwrap()
1163                .is_none()
1164        );
1165    }
1166
1167    #[test]
1168    fn test_same_pk_encoding() {
1169        let reader_meta = Arc::new(new_metadata(
1170            &[
1171                (
1172                    0,
1173                    SemanticType::Timestamp,
1174                    ConcreteDataType::timestamp_millisecond_datatype(),
1175                ),
1176                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1177            ],
1178            &[1],
1179        ));
1180
1181        assert!(
1182            may_compat_primary_key(&reader_meta, &reader_meta)
1183                .unwrap()
1184                .is_none()
1185        );
1186    }
1187
1188    #[test]
1189    fn test_same_fields() {
1190        let reader_meta = Arc::new(new_metadata(
1191            &[
1192                (
1193                    0,
1194                    SemanticType::Timestamp,
1195                    ConcreteDataType::timestamp_millisecond_datatype(),
1196                ),
1197                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1198                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1199            ],
1200            &[1],
1201        ));
1202        let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1203        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1204    }
1205
1206    #[tokio::test]
1207    async fn test_compat_reader() {
1208        let reader_meta = Arc::new(new_metadata(
1209            &[
1210                (
1211                    0,
1212                    SemanticType::Timestamp,
1213                    ConcreteDataType::timestamp_millisecond_datatype(),
1214                ),
1215                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1216                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1217            ],
1218            &[1],
1219        ));
1220        let expect_meta = Arc::new(new_metadata(
1221            &[
1222                (
1223                    0,
1224                    SemanticType::Timestamp,
1225                    ConcreteDataType::timestamp_millisecond_datatype(),
1226                ),
1227                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1228                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1229                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1230                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1231            ],
1232            &[1, 3],
1233        ));
1234        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1235        let k1 = encode_key(&[Some("a")]);
1236        let k2 = encode_key(&[Some("b")]);
1237        let source_reader = VecBatchReader::new(&[
1238            new_batch(&k1, &[(2, false)], 1000, 3),
1239            new_batch(&k2, &[(2, false)], 1000, 3),
1240        ]);
1241
1242        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1243        let k1 = encode_key(&[Some("a"), None]);
1244        let k2 = encode_key(&[Some("b"), None]);
1245        check_reader_result(
1246            &mut compat_reader,
1247            &[
1248                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1249                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1250            ],
1251        )
1252        .await;
1253    }
1254
1255    #[tokio::test]
1256    async fn test_compat_reader_different_order() {
1257        let reader_meta = Arc::new(new_metadata(
1258            &[
1259                (
1260                    0,
1261                    SemanticType::Timestamp,
1262                    ConcreteDataType::timestamp_millisecond_datatype(),
1263                ),
1264                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1265                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1266            ],
1267            &[1],
1268        ));
1269        let expect_meta = Arc::new(new_metadata(
1270            &[
1271                (
1272                    0,
1273                    SemanticType::Timestamp,
1274                    ConcreteDataType::timestamp_millisecond_datatype(),
1275                ),
1276                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1277                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1278                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1279                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1280            ],
1281            &[1],
1282        ));
1283        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1284        let k1 = encode_key(&[Some("a")]);
1285        let k2 = encode_key(&[Some("b")]);
1286        let source_reader = VecBatchReader::new(&[
1287            new_batch(&k1, &[(2, false)], 1000, 3),
1288            new_batch(&k2, &[(2, false)], 1000, 3),
1289        ]);
1290
1291        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1292        check_reader_result(
1293            &mut compat_reader,
1294            &[
1295                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1296                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1297            ],
1298        )
1299        .await;
1300    }
1301
1302    #[tokio::test]
1303    async fn test_compat_reader_different_types() {
1304        let actual_meta = Arc::new(new_metadata(
1305            &[
1306                (
1307                    0,
1308                    SemanticType::Timestamp,
1309                    ConcreteDataType::timestamp_millisecond_datatype(),
1310                ),
1311                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1312                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1313            ],
1314            &[1],
1315        ));
1316        let expect_meta = Arc::new(new_metadata(
1317            &[
1318                (
1319                    0,
1320                    SemanticType::Timestamp,
1321                    ConcreteDataType::timestamp_millisecond_datatype(),
1322                ),
1323                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1324                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1325            ],
1326            &[1],
1327        ));
1328        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1329        let k1 = encode_key(&[Some("a")]);
1330        let k2 = encode_key(&[Some("b")]);
1331        let source_reader = VecBatchReader::new(&[
1332            new_batch(&k1, &[(2, false)], 1000, 3),
1333            new_batch(&k2, &[(2, false)], 1000, 3),
1334        ]);
1335
1336        let fn_batch_cast = |batch: Batch| {
1337            let mut new_fields = batch.fields.clone();
1338            new_fields[0].data = new_fields[0]
1339                .data
1340                .cast(&ConcreteDataType::string_datatype())
1341                .unwrap();
1342
1343            batch.with_fields(new_fields).unwrap()
1344        };
1345        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1346        check_reader_result(
1347            &mut compat_reader,
1348            &[
1349                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1350                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1351            ],
1352        )
1353        .await;
1354    }
1355
1356    #[tokio::test]
1357    async fn test_compat_reader_projection() {
1358        let reader_meta = Arc::new(new_metadata(
1359            &[
1360                (
1361                    0,
1362                    SemanticType::Timestamp,
1363                    ConcreteDataType::timestamp_millisecond_datatype(),
1364                ),
1365                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1366                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1367            ],
1368            &[1],
1369        ));
1370        let expect_meta = Arc::new(new_metadata(
1371            &[
1372                (
1373                    0,
1374                    SemanticType::Timestamp,
1375                    ConcreteDataType::timestamp_millisecond_datatype(),
1376                ),
1377                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1378                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1379                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1381            ],
1382            &[1],
1383        ));
1384        // tag_1, field_2, field_3
1385        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1386        let k1 = encode_key(&[Some("a")]);
1387        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1388
1389        let mut compat_reader =
1390            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1391        check_reader_result(
1392            &mut compat_reader,
1393            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1394        )
1395        .await;
1396
1397        // tag_1, field_4, field_3
1398        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1399        let k1 = encode_key(&[Some("a")]);
1400        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1401
1402        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1403        check_reader_result(
1404            &mut compat_reader,
1405            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1406        )
1407        .await;
1408    }
1409
1410    #[tokio::test]
1411    async fn test_compat_reader_different_pk_encoding() {
1412        let mut reader_meta = new_metadata(
1413            &[
1414                (
1415                    0,
1416                    SemanticType::Timestamp,
1417                    ConcreteDataType::timestamp_millisecond_datatype(),
1418                ),
1419                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1420                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1421            ],
1422            &[1],
1423        );
1424        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1425        let reader_meta = Arc::new(reader_meta);
1426        let mut expect_meta = new_metadata(
1427            &[
1428                (
1429                    0,
1430                    SemanticType::Timestamp,
1431                    ConcreteDataType::timestamp_millisecond_datatype(),
1432                ),
1433                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1434                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1435                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1436                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1437            ],
1438            &[1, 3],
1439        );
1440        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1441        let expect_meta = Arc::new(expect_meta);
1442
1443        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1444        let k1 = encode_key(&[Some("a")]);
1445        let k2 = encode_key(&[Some("b")]);
1446        let source_reader = VecBatchReader::new(&[
1447            new_batch(&k1, &[(2, false)], 1000, 3),
1448            new_batch(&k2, &[(2, false)], 1000, 3),
1449        ]);
1450
1451        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1452        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1453        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1454        check_reader_result(
1455            &mut compat_reader,
1456            &[
1457                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1458                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1459            ],
1460        )
1461        .await;
1462    }
1463
1464    /// Creates a primary key array for flat format testing.
1465    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1466        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1467        for &pk in primary_keys {
1468            builder.append(pk).unwrap();
1469        }
1470        Arc::new(builder.finish())
1471    }
1472
1473    #[test]
1474    fn test_flat_compat_batch_with_missing_columns() {
1475        let actual_metadata = Arc::new(new_metadata(
1476            &[
1477                (
1478                    0,
1479                    SemanticType::Timestamp,
1480                    ConcreteDataType::timestamp_millisecond_datatype(),
1481                ),
1482                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1483                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1484            ],
1485            &[1],
1486        ));
1487
1488        let expected_metadata = Arc::new(new_metadata(
1489            &[
1490                (
1491                    0,
1492                    SemanticType::Timestamp,
1493                    ConcreteDataType::timestamp_millisecond_datatype(),
1494                ),
1495                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1496                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1497                // Adds a new field.
1498                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1499            ],
1500            &[1],
1501        ));
1502
1503        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1504        let read_format = FlatReadFormat::new(
1505            actual_metadata.clone(),
1506            [0, 1, 2, 3].into_iter(),
1507            None,
1508            "test",
1509            false,
1510        )
1511        .unwrap();
1512        let format_projection = read_format.format_projection();
1513
1514        let compat_batch =
1515            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1516                .unwrap()
1517                .unwrap();
1518
1519        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1520        tag_builder.append_value("tag1");
1521        tag_builder.append_value("tag1");
1522        let tag_dict_array = Arc::new(tag_builder.finish());
1523
1524        let k1 = encode_key(&[Some("tag1")]);
1525        let input_columns: Vec<ArrayRef> = vec![
1526            tag_dict_array.clone(),
1527            Arc::new(Int64Array::from(vec![100, 200])),
1528            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1529            build_flat_test_pk_array(&[&k1, &k1]),
1530            Arc::new(UInt64Array::from_iter_values([1, 2])),
1531            Arc::new(UInt8Array::from_iter_values([
1532                OpType::Put as u8,
1533                OpType::Put as u8,
1534            ])),
1535        ];
1536        let input_schema =
1537            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1538        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1539
1540        let result = compat_batch.compat(input_batch).unwrap();
1541
1542        let expected_schema =
1543            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1544
1545        let expected_columns: Vec<ArrayRef> = vec![
1546            tag_dict_array.clone(),
1547            Arc::new(Int64Array::from(vec![100, 200])),
1548            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1549            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1550            build_flat_test_pk_array(&[&k1, &k1]),
1551            Arc::new(UInt64Array::from_iter_values([1, 2])),
1552            Arc::new(UInt8Array::from_iter_values([
1553                OpType::Put as u8,
1554                OpType::Put as u8,
1555            ])),
1556        ];
1557        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1558
1559        assert_eq!(expected_batch, result);
1560    }
1561
1562    #[test]
1563    fn test_flat_compat_batch_with_different_pk_encoding() {
1564        let mut actual_metadata = new_metadata(
1565            &[
1566                (
1567                    0,
1568                    SemanticType::Timestamp,
1569                    ConcreteDataType::timestamp_millisecond_datatype(),
1570                ),
1571                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1572                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1573            ],
1574            &[1],
1575        );
1576        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1577        let actual_metadata = Arc::new(actual_metadata);
1578
1579        let mut expected_metadata = new_metadata(
1580            &[
1581                (
1582                    0,
1583                    SemanticType::Timestamp,
1584                    ConcreteDataType::timestamp_millisecond_datatype(),
1585                ),
1586                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1587                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1588                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1589            ],
1590            &[1, 3],
1591        );
1592        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1593        let expected_metadata = Arc::new(expected_metadata);
1594
1595        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1596        let read_format = FlatReadFormat::new(
1597            actual_metadata.clone(),
1598            [0, 1, 2, 3].into_iter(),
1599            None,
1600            "test",
1601            false,
1602        )
1603        .unwrap();
1604        let format_projection = read_format.format_projection();
1605
1606        let compat_batch =
1607            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1608                .unwrap()
1609                .unwrap();
1610
1611        // Tag array.
1612        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1613        tag1_builder.append_value("tag1");
1614        tag1_builder.append_value("tag1");
1615        let tag1_dict_array = Arc::new(tag1_builder.finish());
1616
1617        let k1 = encode_key(&[Some("tag1")]);
1618        let input_columns: Vec<ArrayRef> = vec![
1619            tag1_dict_array.clone(),
1620            Arc::new(Int64Array::from(vec![100, 200])),
1621            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1622            build_flat_test_pk_array(&[&k1, &k1]),
1623            Arc::new(UInt64Array::from_iter_values([1, 2])),
1624            Arc::new(UInt8Array::from_iter_values([
1625                OpType::Put as u8,
1626                OpType::Put as u8,
1627            ])),
1628        ];
1629        let input_schema =
1630            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1631        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1632
1633        let result = compat_batch.compat(input_batch).unwrap();
1634
1635        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1636        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1637        null_tag_builder.append_nulls(2);
1638        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1639        let expected_columns: Vec<ArrayRef> = vec![
1640            tag1_dict_array.clone(),
1641            null_tag_dict_array,
1642            Arc::new(Int64Array::from(vec![100, 200])),
1643            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1644            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1645            Arc::new(UInt64Array::from_iter_values([1, 2])),
1646            Arc::new(UInt8Array::from_iter_values([
1647                OpType::Put as u8,
1648                OpType::Put as u8,
1649            ])),
1650        ];
1651        let output_schema =
1652            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1653        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1654
1655        assert_eq!(expected_batch, result);
1656    }
1657
1658    #[test]
1659    fn test_flat_compat_batch_compact_sparse() {
1660        let mut actual_metadata = new_metadata(
1661            &[
1662                (
1663                    0,
1664                    SemanticType::Timestamp,
1665                    ConcreteDataType::timestamp_millisecond_datatype(),
1666                ),
1667                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1668            ],
1669            &[],
1670        );
1671        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1672        let actual_metadata = Arc::new(actual_metadata);
1673
1674        let mut expected_metadata = new_metadata(
1675            &[
1676                (
1677                    0,
1678                    SemanticType::Timestamp,
1679                    ConcreteDataType::timestamp_millisecond_datatype(),
1680                ),
1681                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1682                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1683            ],
1684            &[],
1685        );
1686        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1687        let expected_metadata = Arc::new(expected_metadata);
1688
1689        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1690        let read_format = FlatReadFormat::new(
1691            actual_metadata.clone(),
1692            [0, 2, 3].into_iter(),
1693            None,
1694            "test",
1695            true,
1696        )
1697        .unwrap();
1698        let format_projection = read_format.format_projection();
1699
1700        let compat_batch =
1701            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1702                .unwrap()
1703                .unwrap();
1704
1705        let sparse_k1 = encode_sparse_key(&[]);
1706        let input_columns: Vec<ArrayRef> = vec![
1707            Arc::new(Int64Array::from(vec![100, 200])),
1708            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1709            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1710            Arc::new(UInt64Array::from_iter_values([1, 2])),
1711            Arc::new(UInt8Array::from_iter_values([
1712                OpType::Put as u8,
1713                OpType::Put as u8,
1714            ])),
1715        ];
1716        let input_schema =
1717            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1718        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1719
1720        let result = compat_batch.compat(input_batch).unwrap();
1721
1722        let expected_columns: Vec<ArrayRef> = vec![
1723            Arc::new(Int64Array::from(vec![100, 200])),
1724            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1725            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1726            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1727            Arc::new(UInt64Array::from_iter_values([1, 2])),
1728            Arc::new(UInt8Array::from_iter_values([
1729                OpType::Put as u8,
1730                OpType::Put as u8,
1731            ])),
1732        ];
1733        let output_schema =
1734            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1735        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1736
1737        assert_eq!(expected_batch, result);
1738    }
1739}