mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
33    build_primary_key_codec_with_fields,
34};
35use snafu::{OptionExt, ResultExt, ensure};
36use store_api::codec::PrimaryKeyEncoding;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39
40use crate::error::{
41    CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
42    NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
43};
44use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
45use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
46use crate::read::{Batch, BatchColumn, BatchReader};
47use crate::sst::parquet::flat_format::primary_key_column_index;
48use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
50
51/// Reader to adapt schema of underlying reader to expected schema.
52pub struct CompatReader<R> {
53    /// Underlying reader.
54    reader: R,
55    /// Helper to compat batches.
56    compat: PrimaryKeyCompatBatch,
57}
58
59impl<R> CompatReader<R> {
60    /// Creates a new compat reader.
61    /// - `mapper` is built from the metadata users expect to see.
62    /// - `reader_meta` is the metadata of the input reader.
63    /// - `reader` is the input reader.
64    pub fn new(
65        mapper: &ProjectionMapper,
66        reader_meta: RegionMetadataRef,
67        reader: R,
68    ) -> Result<CompatReader<R>> {
69        Ok(CompatReader {
70            reader,
71            compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
72        })
73    }
74}
75
76#[async_trait::async_trait]
77impl<R: BatchReader> BatchReader for CompatReader<R> {
78    async fn next_batch(&mut self) -> Result<Option<Batch>> {
79        let Some(mut batch) = self.reader.next_batch().await? else {
80            return Ok(None);
81        };
82
83        batch = self.compat.compat_batch(batch)?;
84
85        Ok(Some(batch))
86    }
87}
88
89/// Helper to adapt schema of the batch to an expected schema.
90pub(crate) enum CompatBatch {
91    /// Adapter for primary key format.
92    PrimaryKey(PrimaryKeyCompatBatch),
93    /// Adapter for flat format.
94    Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98    /// Returns the inner primary key batch adapter if this is a PrimaryKey format.
99    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100        match self {
101            CompatBatch::PrimaryKey(batch) => Some(batch),
102            _ => None,
103        }
104    }
105
106    /// Returns the inner flat batch adapter if this is a Flat format.
107    pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
108        match self {
109            CompatBatch::Flat(batch) => Some(batch),
110            _ => None,
111        }
112    }
113}
114
115/// A helper struct to adapt schema of the batch to an expected schema.
116pub(crate) struct PrimaryKeyCompatBatch {
117    /// Optional primary key adapter.
118    rewrite_pk: Option<RewritePrimaryKey>,
119    /// Optional primary key adapter.
120    compat_pk: Option<CompatPrimaryKey>,
121    /// Optional fields adapter.
122    compat_fields: Option<CompatFields>,
123}
124
125impl PrimaryKeyCompatBatch {
126    /// Creates a new [CompatBatch].
127    /// - `mapper` is built from the metadata users expect to see.
128    /// - `reader_meta` is the metadata of the input reader.
129    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
130        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
131        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
132        let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
133            reason: "Unexpected format",
134        })?;
135        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
136
137        Ok(Self {
138            rewrite_pk,
139            compat_pk,
140            compat_fields,
141        })
142    }
143
144    /// Adapts the `batch` to the expected schema.
145    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
146        if let Some(rewrite_pk) = &self.rewrite_pk {
147            batch = rewrite_pk.compat(batch)?;
148        }
149        if let Some(compat_pk) = &self.compat_pk {
150            batch = compat_pk.compat(batch)?;
151        }
152        if let Some(compat_fields) = &self.compat_fields {
153            batch = compat_fields.compat(batch);
154        }
155
156        Ok(batch)
157    }
158}
159
160/// Returns true if `left` and `right` have same columns and primary key encoding.
161pub(crate) fn has_same_columns_and_pk_encoding(
162    left: &RegionMetadata,
163    right: &RegionMetadata,
164) -> bool {
165    if left.primary_key_encoding != right.primary_key_encoding {
166        return false;
167    }
168
169    if left.column_metadatas.len() != right.column_metadatas.len() {
170        return false;
171    }
172
173    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
174        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
175            return false;
176        }
177        debug_assert_eq!(
178            left_col.column_schema.data_type,
179            right_col.column_schema.data_type
180        );
181        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
182    }
183
184    true
185}
186
187/// A helper struct to adapt schema of the batch to an expected schema.
188pub(crate) struct FlatCompatBatch {
189    /// Indices to convert actual fields to expect fields.
190    index_or_defaults: Vec<IndexOrDefault>,
191    /// Expected arrow schema.
192    arrow_schema: SchemaRef,
193    /// Primary key adapter.
194    compat_pk: FlatCompatPrimaryKey,
195}
196
197impl FlatCompatBatch {
198    /// Creates a [FlatCompatBatch].
199    ///
200    /// - `mapper` is built from the metadata users expect to see.
201    /// - `actual` is the [RegionMetadata] of the input parquet.
202    /// - `format_projection` is the projection of the read format for the input parquet.
203    /// - `compaction` indicates whether the reader is for compaction.
204    pub(crate) fn try_new(
205        mapper: &FlatProjectionMapper,
206        actual: &RegionMetadataRef,
207        format_projection: &FormatProjection,
208        compaction: bool,
209    ) -> Result<Option<Self>> {
210        let actual_schema = flat_projected_columns(actual, format_projection);
211        let expect_schema = mapper.batch_schema();
212        if expect_schema == actual_schema {
213            // Although the SST has a different schema, but the schema after projection is the same
214            // as expected schema.
215            return Ok(None);
216        }
217
218        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
219            // Special handling for sparse encoding in compaction.
220            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
221        }
222
223        let (index_or_defaults, fields) =
224            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
225
226        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
227
228        Ok(Some(Self {
229            index_or_defaults,
230            arrow_schema: Arc::new(Schema::new(fields)),
231            compat_pk,
232        }))
233    }
234
235    fn compute_index_and_fields(
236        actual_schema: &[(ColumnId, ConcreteDataType)],
237        expect_schema: &[(ColumnId, ConcreteDataType)],
238        expect_metadata: &RegionMetadata,
239    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
240        // Maps column id to the index and data type in the actual schema.
241        let actual_schema_index: HashMap<_, _> = actual_schema
242            .iter()
243            .enumerate()
244            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
245            .collect();
246
247        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
248        let mut fields = Vec::with_capacity(expect_schema.len());
249        for (column_id, expect_data_type) in expect_schema {
250            // Safety: expect_schema comes from the same mapper.
251            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
252            let expect_column = &expect_metadata.column_metadatas[column_index];
253            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
254            // For tag columns, we need to create a dictionary field.
255            if expect_column.semantic_type == SemanticType::Tag {
256                fields.push(tag_maybe_to_dictionary_field(
257                    &expect_column.column_schema.data_type,
258                    column_field,
259                ));
260            } else {
261                fields.push(column_field.clone());
262            };
263
264            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
265                let mut cast_type = None;
266
267                // Same column different type.
268                if expect_data_type != *actual_data_type {
269                    cast_type = Some(expect_data_type.clone())
270                }
271                // Source has this column.
272                index_or_defaults.push(IndexOrDefault::Index {
273                    pos: *index,
274                    cast_type,
275                });
276            } else {
277                // Create a default vector with 1 element for that column.
278                let default_vector = expect_column
279                    .column_schema
280                    .create_default_vector(1)
281                    .context(CreateDefaultSnafu {
282                        region_id: expect_metadata.region_id,
283                        column: &expect_column.column_schema.name,
284                    })?
285                    .with_context(|| CompatReaderSnafu {
286                        region_id: expect_metadata.region_id,
287                        reason: format!(
288                            "column {} does not have a default value to read",
289                            expect_column.column_schema.name
290                        ),
291                    })?;
292                index_or_defaults.push(IndexOrDefault::DefaultValue {
293                    column_id: expect_column.column_id,
294                    default_vector,
295                    semantic_type: expect_column.semantic_type,
296                });
297            };
298        }
299        fields.extend_from_slice(&internal_fields());
300
301        Ok((index_or_defaults, fields))
302    }
303
304    fn try_new_compact_sparse(
305        mapper: &FlatProjectionMapper,
306        actual: &RegionMetadataRef,
307    ) -> Result<Option<Self>> {
308        // Currently, we don't support converting sparse encoding back to dense encoding in
309        // flat format.
310        ensure!(
311            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
312            UnsupportedOperationSnafu {
313                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
314            }
315        );
316
317        // For sparse encoding, we don't need to check the primary keys.
318        // Since this is for compaction, we always read all columns.
319        let actual_schema: Vec<_> = actual
320            .field_columns()
321            .chain([actual.time_index_column()])
322            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
323            .collect();
324        let expect_schema: Vec<_> = mapper
325            .metadata()
326            .field_columns()
327            .chain([mapper.metadata().time_index_column()])
328            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
329            .collect();
330
331        let (index_or_defaults, fields) =
332            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
333
334        let compat_pk = FlatCompatPrimaryKey::default();
335
336        Ok(Some(Self {
337            index_or_defaults,
338            arrow_schema: Arc::new(Schema::new(fields)),
339            compat_pk,
340        }))
341    }
342
343    /// Make columns of the `batch` compatible.
344    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
345        let len = batch.num_rows();
346        let columns = self
347            .index_or_defaults
348            .iter()
349            .map(|index_or_default| match index_or_default {
350                IndexOrDefault::Index { pos, cast_type } => {
351                    let old_column = batch.column(*pos);
352
353                    if let Some(ty) = cast_type {
354                        // Safety: We ensure type can be converted and the new batch should be valid.
355                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
356                        let casted =
357                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
358                                .context(ComputeArrowSnafu)?;
359                        Ok(casted)
360                    } else {
361                        Ok(old_column.clone())
362                    }
363                }
364                IndexOrDefault::DefaultValue {
365                    column_id: _,
366                    default_vector,
367                    semantic_type,
368                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
369            })
370            .chain(
371                // Adds internal columns.
372                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
373                    .iter()
374                    .map(|col| Ok(col.clone())),
375            )
376            .collect::<Result<Vec<_>>>()?;
377
378        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
379            .context(NewRecordBatchSnafu)?;
380
381        // Handles primary keys.
382        self.compat_pk.compat(compat_batch)
383    }
384}
385
386/// Repeats the vector value `to_len` times.
387fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
388    assert_eq!(1, vector.len());
389    let data_type = vector.data_type();
390    if is_tag && data_type.is_string() {
391        let values = vector.to_arrow_array();
392        if values.is_null(0) {
393            // Creates a dictionary array with `to_len` null keys.
394            let keys = UInt32Array::new_null(to_len);
395            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
396        } else {
397            let keys = UInt32Array::from_value(0, to_len);
398            Ok(Arc::new(DictionaryArray::new(keys, values)))
399        }
400    } else {
401        let keys = UInt32Array::from_value(0, to_len);
402        take(
403            &vector.to_arrow_array(),
404            &keys,
405            Some(TakeOptions {
406                check_bounds: false,
407            }),
408        )
409        .context(ComputeArrowSnafu)
410    }
411}
412
413/// Helper to make primary key compatible.
414#[derive(Debug)]
415struct CompatPrimaryKey {
416    /// Row converter to append values to primary keys.
417    converter: Arc<dyn PrimaryKeyCodec>,
418    /// Default values to append.
419    values: Vec<(ColumnId, Value)>,
420}
421
422impl CompatPrimaryKey {
423    /// Make primary key of the `batch` compatible.
424    fn compat(&self, mut batch: Batch) -> Result<Batch> {
425        let mut buffer = Vec::with_capacity(
426            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
427        );
428        buffer.extend_from_slice(batch.primary_key());
429        self.converter
430            .encode_values(&self.values, &mut buffer)
431            .context(EncodeSnafu)?;
432
433        batch.set_primary_key(buffer);
434
435        // update cache
436        if let Some(pk_values) = &mut batch.pk_values {
437            pk_values.extend(&self.values);
438        }
439
440        Ok(batch)
441    }
442}
443
444/// Helper to make fields compatible.
445#[derive(Debug)]
446struct CompatFields {
447    /// Column Ids and DataTypes the reader actually returns.
448    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
449    /// Indices to convert actual fields to expect fields.
450    index_or_defaults: Vec<IndexOrDefault>,
451}
452
453impl CompatFields {
454    /// Make fields of the `batch` compatible.
455    #[must_use]
456    fn compat(&self, batch: Batch) -> Batch {
457        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
458        debug_assert!(
459            self.actual_fields
460                .iter()
461                .zip(batch.fields())
462                .all(|((id, _), batch_column)| *id == batch_column.column_id)
463        );
464
465        let len = batch.num_rows();
466        let fields = self
467            .index_or_defaults
468            .iter()
469            .map(|index_or_default| match index_or_default {
470                IndexOrDefault::Index { pos, cast_type } => {
471                    let old_column = &batch.fields()[*pos];
472
473                    let data = if let Some(ty) = cast_type {
474                        // Safety: We ensure type can be converted and the new batch should be valid.
475                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
476                        old_column.data.cast(ty).unwrap()
477                    } else {
478                        old_column.data.clone()
479                    };
480                    BatchColumn {
481                        column_id: old_column.column_id,
482                        data,
483                    }
484                }
485                IndexOrDefault::DefaultValue {
486                    column_id,
487                    default_vector,
488                    semantic_type: _,
489                } => {
490                    let data = default_vector.replicate(&[len]);
491                    BatchColumn {
492                        column_id: *column_id,
493                        data,
494                    }
495                }
496            })
497            .collect();
498
499        // Safety: We ensure all columns have the same length and the new batch should be valid.
500        batch.with_fields(fields).unwrap()
501    }
502}
503
504fn may_rewrite_primary_key(
505    expect: &RegionMetadata,
506    actual: &RegionMetadata,
507) -> Option<RewritePrimaryKey> {
508    if expect.primary_key_encoding == actual.primary_key_encoding {
509        return None;
510    }
511
512    let fields = expect.primary_key.clone();
513    let original = build_primary_key_codec(actual);
514    let new = build_primary_key_codec(expect);
515
516    Some(RewritePrimaryKey {
517        original,
518        new,
519        fields,
520    })
521}
522
523/// Returns true if the actual primary keys is the same as expected.
524fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
525    ensure!(
526        actual.primary_key.len() <= expect.primary_key.len(),
527        CompatReaderSnafu {
528            region_id: expect.region_id,
529            reason: format!(
530                "primary key has more columns {} than expect {}",
531                actual.primary_key.len(),
532                expect.primary_key.len()
533            ),
534        }
535    );
536    ensure!(
537        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
538        CompatReaderSnafu {
539            region_id: expect.region_id,
540            reason: format!(
541                "primary key has different prefix, expect: {:?}, actual: {:?}",
542                expect.primary_key, actual.primary_key
543            ),
544        }
545    );
546
547    Ok(actual.primary_key.len() == expect.primary_key.len())
548}
549
550/// Creates a [CompatPrimaryKey] if needed.
551fn may_compat_primary_key(
552    expect: &RegionMetadata,
553    actual: &RegionMetadata,
554) -> Result<Option<CompatPrimaryKey>> {
555    if is_primary_key_same(expect, actual)? {
556        return Ok(None);
557    }
558
559    // We need to append default values to the primary key.
560    let to_add = &expect.primary_key[actual.primary_key.len()..];
561    let mut fields = Vec::with_capacity(to_add.len());
562    let mut values = Vec::with_capacity(to_add.len());
563    for column_id in to_add {
564        // Safety: The id comes from expect region metadata.
565        let column = expect.column_by_id(*column_id).unwrap();
566        fields.push((
567            *column_id,
568            SortField::new(column.column_schema.data_type.clone()),
569        ));
570        let default_value = column
571            .column_schema
572            .create_default()
573            .context(CreateDefaultSnafu {
574                region_id: expect.region_id,
575                column: &column.column_schema.name,
576            })?
577            .with_context(|| CompatReaderSnafu {
578                region_id: expect.region_id,
579                reason: format!(
580                    "key column {} does not have a default value to read",
581                    column.column_schema.name
582                ),
583            })?;
584        values.push((*column_id, default_value));
585    }
586    // Using expect primary key encoding to build the converter
587    let converter =
588        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
589
590    Ok(Some(CompatPrimaryKey { converter, values }))
591}
592
593/// Creates a [CompatFields] if needed.
594fn may_compat_fields(
595    mapper: &PrimaryKeyProjectionMapper,
596    actual: &RegionMetadata,
597) -> Result<Option<CompatFields>> {
598    let expect_fields = mapper.batch_fields();
599    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
600    if expect_fields == actual_fields {
601        return Ok(None);
602    }
603
604    let source_field_index: HashMap<_, _> = actual_fields
605        .iter()
606        .enumerate()
607        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
608        .collect();
609
610    let index_or_defaults = expect_fields
611        .iter()
612        .map(|(column_id, expect_data_type)| {
613            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
614                let mut cast_type = None;
615
616                if expect_data_type != *actual_data_type {
617                    cast_type = Some(expect_data_type.clone())
618                }
619                // Source has this field.
620                Ok(IndexOrDefault::Index {
621                    pos: *index,
622                    cast_type,
623                })
624            } else {
625                // Safety: mapper must have this column.
626                let column = mapper.metadata().column_by_id(*column_id).unwrap();
627                // Create a default vector with 1 element for that column.
628                let default_vector = column
629                    .column_schema
630                    .create_default_vector(1)
631                    .context(CreateDefaultSnafu {
632                        region_id: mapper.metadata().region_id,
633                        column: &column.column_schema.name,
634                    })?
635                    .with_context(|| CompatReaderSnafu {
636                        region_id: mapper.metadata().region_id,
637                        reason: format!(
638                            "column {} does not have a default value to read",
639                            column.column_schema.name
640                        ),
641                    })?;
642                Ok(IndexOrDefault::DefaultValue {
643                    column_id: column.column_id,
644                    default_vector,
645                    semantic_type: SemanticType::Field,
646                })
647            }
648        })
649        .collect::<Result<Vec<_>>>()?;
650
651    Ok(Some(CompatFields {
652        actual_fields,
653        index_or_defaults,
654    }))
655}
656
657/// Index in source batch or a default value to fill a column.
658#[derive(Debug)]
659enum IndexOrDefault {
660    /// Index of the column in source batch.
661    Index {
662        pos: usize,
663        cast_type: Option<ConcreteDataType>,
664    },
665    /// Default value for the column.
666    DefaultValue {
667        /// Id of the column.
668        column_id: ColumnId,
669        /// Default value. The vector has only 1 element.
670        default_vector: VectorRef,
671        /// Semantic type of the column.
672        semantic_type: SemanticType,
673    },
674}
675
676/// Adapter to rewrite primary key.
677struct RewritePrimaryKey {
678    /// Original primary key codec.
679    original: Arc<dyn PrimaryKeyCodec>,
680    /// New primary key codec.
681    new: Arc<dyn PrimaryKeyCodec>,
682    /// Order of the fields in the new primary key.
683    fields: Vec<ColumnId>,
684}
685
686impl RewritePrimaryKey {
687    /// Make primary key of the `batch` compatible.
688    fn compat(&self, mut batch: Batch) -> Result<Batch> {
689        if batch.pk_values().is_none() {
690            let new_pk_values = self
691                .original
692                .decode(batch.primary_key())
693                .context(DecodeSnafu)?;
694            batch.set_pk_values(new_pk_values);
695        }
696        // Safety: We ensure pk_values is not None.
697        let values = batch.pk_values().unwrap();
698
699        let mut buffer = Vec::with_capacity(
700            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
701        );
702        match values {
703            CompositeValues::Dense(values) => {
704                self.new
705                    .encode_values(values.as_slice(), &mut buffer)
706                    .context(EncodeSnafu)?;
707            }
708            CompositeValues::Sparse(values) => {
709                let values = self
710                    .fields
711                    .iter()
712                    .map(|id| {
713                        let value = values.get_or_null(*id);
714                        (*id, value.as_value_ref())
715                    })
716                    .collect::<Vec<_>>();
717                self.new
718                    .encode_value_refs(&values, &mut buffer)
719                    .context(EncodeSnafu)?;
720            }
721        }
722        batch.set_primary_key(buffer);
723
724        Ok(batch)
725    }
726}
727
728/// Helper to rewrite primary key to another encoding for flat format.
729struct FlatRewritePrimaryKey {
730    /// New primary key encoder.
731    codec: Arc<dyn PrimaryKeyCodec>,
732    /// Metadata of the expected region.
733    metadata: RegionMetadataRef,
734    /// Original primary key codec.
735    /// If we need to rewrite the primary key.
736    old_codec: Arc<dyn PrimaryKeyCodec>,
737}
738
739impl FlatRewritePrimaryKey {
740    fn new(
741        expect: &RegionMetadataRef,
742        actual: &RegionMetadataRef,
743    ) -> Option<FlatRewritePrimaryKey> {
744        if expect.primary_key_encoding == actual.primary_key_encoding {
745            return None;
746        }
747        let codec = build_primary_key_codec(expect);
748        let old_codec = build_primary_key_codec(actual);
749
750        Some(FlatRewritePrimaryKey {
751            codec,
752            metadata: expect.clone(),
753            old_codec,
754        })
755    }
756
757    /// Rewrites the primary key of the `batch`.
758    /// It also appends the values to the primary key.
759    fn rewrite_key(
760        &self,
761        append_values: &[(ColumnId, Value)],
762        batch: RecordBatch,
763    ) -> Result<RecordBatch> {
764        let old_pk_dict_array = batch
765            .column(primary_key_column_index(batch.num_columns()))
766            .as_any()
767            .downcast_ref::<PrimaryKeyArray>()
768            .unwrap();
769        let old_pk_values_array = old_pk_dict_array
770            .values()
771            .as_any()
772            .downcast_ref::<BinaryArray>()
773            .unwrap();
774        let mut builder = BinaryBuilder::with_capacity(
775            old_pk_values_array.len(),
776            old_pk_values_array.value_data().len(),
777        );
778
779        // Binary buffer for the primary key.
780        let mut buffer = Vec::with_capacity(
781            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
782        );
783        let mut column_id_values = Vec::new();
784        // Iterates the binary array and rewrites the primary key.
785        for value in old_pk_values_array.iter() {
786            let Some(old_pk) = value else {
787                builder.append_null();
788                continue;
789            };
790            // Decodes the old primary key.
791            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
792            pk_values.extend(append_values);
793
794            buffer.clear();
795            column_id_values.clear();
796            // Encodes the new primary key.
797            match pk_values {
798                CompositeValues::Dense(dense_values) => {
799                    self.codec
800                        .encode_values(dense_values.as_slice(), &mut buffer)
801                        .context(EncodeSnafu)?;
802                }
803                CompositeValues::Sparse(sparse_values) => {
804                    for id in &self.metadata.primary_key {
805                        let value = sparse_values.get_or_null(*id);
806                        column_id_values.push((*id, value.clone()));
807                    }
808                    self.codec
809                        .encode_values(&column_id_values, &mut buffer)
810                        .context(EncodeSnafu)?;
811                }
812            }
813            builder.append_value(&buffer);
814        }
815        let new_pk_values_array = Arc::new(builder.finish());
816        let new_pk_dict_array =
817            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
818
819        let mut columns = batch.columns().to_vec();
820        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
821
822        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
823    }
824}
825
826/// Helper to make primary key compatible for flat format.
827#[derive(Default)]
828struct FlatCompatPrimaryKey {
829    /// Primary key rewriter.
830    rewriter: Option<FlatRewritePrimaryKey>,
831    /// Converter to append values to primary keys.
832    converter: Option<Arc<dyn PrimaryKeyCodec>>,
833    /// Default values to append.
834    values: Vec<(ColumnId, Value)>,
835}
836
837impl FlatCompatPrimaryKey {
838    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
839        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
840
841        if is_primary_key_same(expect, actual)? {
842            return Ok(Self {
843                rewriter,
844                converter: None,
845                values: Vec::new(),
846            });
847        }
848
849        // We need to append default values to the primary key.
850        let to_add = &expect.primary_key[actual.primary_key.len()..];
851        let mut values = Vec::with_capacity(to_add.len());
852        let mut fields = Vec::with_capacity(to_add.len());
853        for column_id in to_add {
854            // Safety: The id comes from expect region metadata.
855            let column = expect.column_by_id(*column_id).unwrap();
856            fields.push((
857                *column_id,
858                SortField::new(column.column_schema.data_type.clone()),
859            ));
860            let default_value = column
861                .column_schema
862                .create_default()
863                .context(CreateDefaultSnafu {
864                    region_id: expect.region_id,
865                    column: &column.column_schema.name,
866                })?
867                .with_context(|| CompatReaderSnafu {
868                    region_id: expect.region_id,
869                    reason: format!(
870                        "key column {} does not have a default value to read",
871                        column.column_schema.name
872                    ),
873                })?;
874            values.push((*column_id, default_value));
875        }
876        // is_primary_key_same() is false so we have different number of primary key columns.
877        debug_assert!(!fields.is_empty());
878
879        // Create converter to append values.
880        let converter = Some(build_primary_key_codec_with_fields(
881            expect.primary_key_encoding,
882            fields.into_iter(),
883        ));
884
885        Ok(Self {
886            rewriter,
887            converter,
888            values,
889        })
890    }
891
892    /// Makes primary key of the `batch` compatible.
893    ///
894    /// Callers must ensure other columns except the `__primary_key` column is compatible.
895    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
896        if let Some(rewriter) = &self.rewriter {
897            // If we have different encoding, rewrite the whole primary key.
898            return rewriter.rewrite_key(&self.values, batch);
899        }
900
901        self.append_key(batch)
902    }
903
904    /// Appends values to the primary key of the `batch`.
905    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
906        let Some(converter) = &self.converter else {
907            return Ok(batch);
908        };
909
910        let old_pk_dict_array = batch
911            .column(primary_key_column_index(batch.num_columns()))
912            .as_any()
913            .downcast_ref::<PrimaryKeyArray>()
914            .unwrap();
915        let old_pk_values_array = old_pk_dict_array
916            .values()
917            .as_any()
918            .downcast_ref::<BinaryArray>()
919            .unwrap();
920        let mut builder = BinaryBuilder::with_capacity(
921            old_pk_values_array.len(),
922            old_pk_values_array.value_data().len()
923                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
924        );
925
926        // Binary buffer for the primary key.
927        let mut buffer = Vec::with_capacity(
928            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
929                + converter.estimated_size().unwrap_or_default(),
930        );
931
932        // Iterates the binary array and appends values to the primary key.
933        for value in old_pk_values_array.iter() {
934            let Some(old_pk) = value else {
935                builder.append_null();
936                continue;
937            };
938
939            buffer.clear();
940            buffer.extend_from_slice(old_pk);
941            converter
942                .encode_values(&self.values, &mut buffer)
943                .context(EncodeSnafu)?;
944
945            builder.append_value(&buffer);
946        }
947
948        let new_pk_values_array = Arc::new(builder.finish());
949        let new_pk_dict_array =
950            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
951
952        // Overrides the primary key column.
953        let mut columns = batch.columns().to_vec();
954        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
955
956        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
957    }
958}
959
960#[cfg(test)]
961mod tests {
962    use std::sync::Arc;
963
964    use api::v1::{OpType, SemanticType};
965    use datatypes::arrow::array::{
966        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
967        TimestampMillisecondArray, UInt8Array, UInt64Array,
968    };
969    use datatypes::arrow::datatypes::UInt32Type;
970    use datatypes::arrow::record_batch::RecordBatch;
971    use datatypes::prelude::ConcreteDataType;
972    use datatypes::schema::ColumnSchema;
973    use datatypes::value::ValueRef;
974    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
975    use mito_codec::row_converter::{
976        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
977    };
978    use store_api::codec::PrimaryKeyEncoding;
979    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
980    use store_api::storage::RegionId;
981
982    use super::*;
983    use crate::read::flat_projection::FlatProjectionMapper;
984    use crate::sst::parquet::flat_format::FlatReadFormat;
985    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
986    use crate::test_util::{VecBatchReader, check_reader_result};
987
988    /// Creates a new [RegionMetadata].
989    fn new_metadata(
990        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
991        primary_key: &[ColumnId],
992    ) -> RegionMetadata {
993        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
994        for (id, semantic_type, data_type) in semantic_types {
995            let column_schema = match semantic_type {
996                SemanticType::Tag => {
997                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
998                }
999                SemanticType::Field => {
1000                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1001                }
1002                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1003            };
1004
1005            builder.push_column_metadata(ColumnMetadata {
1006                column_schema,
1007                semantic_type: *semantic_type,
1008                column_id: *id,
1009            });
1010        }
1011        builder.primary_key(primary_key.to_vec());
1012        builder.build().unwrap()
1013    }
1014
1015    /// Encode primary key.
1016    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1017        let fields = (0..keys.len())
1018            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1019            .collect();
1020        let converter = DensePrimaryKeyCodec::with_fields(fields);
1021        let row = keys.iter().map(|str_opt| match str_opt {
1022            Some(v) => ValueRef::String(v),
1023            None => ValueRef::Null,
1024        });
1025
1026        converter.encode(row).unwrap()
1027    }
1028
1029    /// Encode sparse primary key.
1030    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1031        let fields = (0..keys.len())
1032            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1033            .collect();
1034        let converter = SparsePrimaryKeyCodec::with_fields(fields);
1035        let row = keys
1036            .iter()
1037            .map(|(id, str_opt)| match str_opt {
1038                Some(v) => (*id, ValueRef::String(v)),
1039                None => (*id, ValueRef::Null),
1040            })
1041            .collect::<Vec<_>>();
1042        let mut buffer = vec![];
1043        converter.encode_value_refs(&row, &mut buffer).unwrap();
1044        buffer
1045    }
1046
1047    /// Creates a batch for specific primary `key`.
1048    ///
1049    /// `fields`: [(column_id of the field, is null)]
1050    fn new_batch(
1051        primary_key: &[u8],
1052        fields: &[(ColumnId, bool)],
1053        start_ts: i64,
1054        num_rows: usize,
1055    ) -> Batch {
1056        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1057            start_ts..start_ts + num_rows as i64,
1058        ));
1059        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1060        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1061        let field_columns = fields
1062            .iter()
1063            .map(|(id, is_null)| {
1064                let data = if *is_null {
1065                    Arc::new(Int64Vector::from(vec![None; num_rows]))
1066                } else {
1067                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1068                };
1069                BatchColumn {
1070                    column_id: *id,
1071                    data,
1072                }
1073            })
1074            .collect();
1075        Batch::new(
1076            primary_key.to_vec(),
1077            timestamps,
1078            sequences,
1079            op_types,
1080            field_columns,
1081        )
1082        .unwrap()
1083    }
1084
1085    #[test]
1086    fn test_invalid_pk_len() {
1087        let reader_meta = new_metadata(
1088            &[
1089                (
1090                    0,
1091                    SemanticType::Timestamp,
1092                    ConcreteDataType::timestamp_millisecond_datatype(),
1093                ),
1094                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1096                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1097            ],
1098            &[1, 2],
1099        );
1100        let expect_meta = new_metadata(
1101            &[
1102                (
1103                    0,
1104                    SemanticType::Timestamp,
1105                    ConcreteDataType::timestamp_millisecond_datatype(),
1106                ),
1107                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1108                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1109            ],
1110            &[1],
1111        );
1112        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1113    }
1114
1115    #[test]
1116    fn test_different_pk() {
1117        let reader_meta = new_metadata(
1118            &[
1119                (
1120                    0,
1121                    SemanticType::Timestamp,
1122                    ConcreteDataType::timestamp_millisecond_datatype(),
1123                ),
1124                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1125                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1126                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1127            ],
1128            &[2, 1],
1129        );
1130        let expect_meta = new_metadata(
1131            &[
1132                (
1133                    0,
1134                    SemanticType::Timestamp,
1135                    ConcreteDataType::timestamp_millisecond_datatype(),
1136                ),
1137                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1138                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1139                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1140                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1141            ],
1142            &[1, 2, 4],
1143        );
1144        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1145    }
1146
1147    #[test]
1148    fn test_same_pk() {
1149        let reader_meta = new_metadata(
1150            &[
1151                (
1152                    0,
1153                    SemanticType::Timestamp,
1154                    ConcreteDataType::timestamp_millisecond_datatype(),
1155                ),
1156                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1157                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1158            ],
1159            &[1],
1160        );
1161        assert!(
1162            may_compat_primary_key(&reader_meta, &reader_meta)
1163                .unwrap()
1164                .is_none()
1165        );
1166    }
1167
1168    #[test]
1169    fn test_same_pk_encoding() {
1170        let reader_meta = Arc::new(new_metadata(
1171            &[
1172                (
1173                    0,
1174                    SemanticType::Timestamp,
1175                    ConcreteDataType::timestamp_millisecond_datatype(),
1176                ),
1177                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1178            ],
1179            &[1],
1180        ));
1181
1182        assert!(
1183            may_compat_primary_key(&reader_meta, &reader_meta)
1184                .unwrap()
1185                .is_none()
1186        );
1187    }
1188
1189    #[test]
1190    fn test_same_fields() {
1191        let reader_meta = Arc::new(new_metadata(
1192            &[
1193                (
1194                    0,
1195                    SemanticType::Timestamp,
1196                    ConcreteDataType::timestamp_millisecond_datatype(),
1197                ),
1198                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1199                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1200            ],
1201            &[1],
1202        ));
1203        let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1204        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1205    }
1206
1207    #[tokio::test]
1208    async fn test_compat_reader() {
1209        let reader_meta = Arc::new(new_metadata(
1210            &[
1211                (
1212                    0,
1213                    SemanticType::Timestamp,
1214                    ConcreteDataType::timestamp_millisecond_datatype(),
1215                ),
1216                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1217                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1218            ],
1219            &[1],
1220        ));
1221        let expect_meta = Arc::new(new_metadata(
1222            &[
1223                (
1224                    0,
1225                    SemanticType::Timestamp,
1226                    ConcreteDataType::timestamp_millisecond_datatype(),
1227                ),
1228                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1229                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1230                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1231                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1232            ],
1233            &[1, 3],
1234        ));
1235        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1236        let k1 = encode_key(&[Some("a")]);
1237        let k2 = encode_key(&[Some("b")]);
1238        let source_reader = VecBatchReader::new(&[
1239            new_batch(&k1, &[(2, false)], 1000, 3),
1240            new_batch(&k2, &[(2, false)], 1000, 3),
1241        ]);
1242
1243        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1244        let k1 = encode_key(&[Some("a"), None]);
1245        let k2 = encode_key(&[Some("b"), None]);
1246        check_reader_result(
1247            &mut compat_reader,
1248            &[
1249                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1250                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1251            ],
1252        )
1253        .await;
1254    }
1255
1256    #[tokio::test]
1257    async fn test_compat_reader_different_order() {
1258        let reader_meta = Arc::new(new_metadata(
1259            &[
1260                (
1261                    0,
1262                    SemanticType::Timestamp,
1263                    ConcreteDataType::timestamp_millisecond_datatype(),
1264                ),
1265                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1266                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1267            ],
1268            &[1],
1269        ));
1270        let expect_meta = Arc::new(new_metadata(
1271            &[
1272                (
1273                    0,
1274                    SemanticType::Timestamp,
1275                    ConcreteDataType::timestamp_millisecond_datatype(),
1276                ),
1277                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1278                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1279                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1280                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1281            ],
1282            &[1],
1283        ));
1284        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1285        let k1 = encode_key(&[Some("a")]);
1286        let k2 = encode_key(&[Some("b")]);
1287        let source_reader = VecBatchReader::new(&[
1288            new_batch(&k1, &[(2, false)], 1000, 3),
1289            new_batch(&k2, &[(2, false)], 1000, 3),
1290        ]);
1291
1292        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1293        check_reader_result(
1294            &mut compat_reader,
1295            &[
1296                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1297                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1298            ],
1299        )
1300        .await;
1301    }
1302
1303    #[tokio::test]
1304    async fn test_compat_reader_different_types() {
1305        let actual_meta = Arc::new(new_metadata(
1306            &[
1307                (
1308                    0,
1309                    SemanticType::Timestamp,
1310                    ConcreteDataType::timestamp_millisecond_datatype(),
1311                ),
1312                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1313                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1314            ],
1315            &[1],
1316        ));
1317        let expect_meta = Arc::new(new_metadata(
1318            &[
1319                (
1320                    0,
1321                    SemanticType::Timestamp,
1322                    ConcreteDataType::timestamp_millisecond_datatype(),
1323                ),
1324                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1325                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1326            ],
1327            &[1],
1328        ));
1329        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1330        let k1 = encode_key(&[Some("a")]);
1331        let k2 = encode_key(&[Some("b")]);
1332        let source_reader = VecBatchReader::new(&[
1333            new_batch(&k1, &[(2, false)], 1000, 3),
1334            new_batch(&k2, &[(2, false)], 1000, 3),
1335        ]);
1336
1337        let fn_batch_cast = |batch: Batch| {
1338            let mut new_fields = batch.fields.clone();
1339            new_fields[0].data = new_fields[0]
1340                .data
1341                .cast(&ConcreteDataType::string_datatype())
1342                .unwrap();
1343
1344            batch.with_fields(new_fields).unwrap()
1345        };
1346        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1347        check_reader_result(
1348            &mut compat_reader,
1349            &[
1350                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1351                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1352            ],
1353        )
1354        .await;
1355    }
1356
1357    #[tokio::test]
1358    async fn test_compat_reader_projection() {
1359        let reader_meta = Arc::new(new_metadata(
1360            &[
1361                (
1362                    0,
1363                    SemanticType::Timestamp,
1364                    ConcreteDataType::timestamp_millisecond_datatype(),
1365                ),
1366                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1367                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1368            ],
1369            &[1],
1370        ));
1371        let expect_meta = Arc::new(new_metadata(
1372            &[
1373                (
1374                    0,
1375                    SemanticType::Timestamp,
1376                    ConcreteDataType::timestamp_millisecond_datatype(),
1377                ),
1378                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1379                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1381                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1382            ],
1383            &[1],
1384        ));
1385        // tag_1, field_2, field_3
1386        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1387        let k1 = encode_key(&[Some("a")]);
1388        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1389
1390        let mut compat_reader =
1391            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1392        check_reader_result(
1393            &mut compat_reader,
1394            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1395        )
1396        .await;
1397
1398        // tag_1, field_4, field_3
1399        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1400        let k1 = encode_key(&[Some("a")]);
1401        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1402
1403        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1404        check_reader_result(
1405            &mut compat_reader,
1406            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1407        )
1408        .await;
1409    }
1410
1411    #[tokio::test]
1412    async fn test_compat_reader_different_pk_encoding() {
1413        let mut reader_meta = new_metadata(
1414            &[
1415                (
1416                    0,
1417                    SemanticType::Timestamp,
1418                    ConcreteDataType::timestamp_millisecond_datatype(),
1419                ),
1420                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1421                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1422            ],
1423            &[1],
1424        );
1425        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1426        let reader_meta = Arc::new(reader_meta);
1427        let mut expect_meta = new_metadata(
1428            &[
1429                (
1430                    0,
1431                    SemanticType::Timestamp,
1432                    ConcreteDataType::timestamp_millisecond_datatype(),
1433                ),
1434                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1435                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1436                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1437                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1438            ],
1439            &[1, 3],
1440        );
1441        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1442        let expect_meta = Arc::new(expect_meta);
1443
1444        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1445        let k1 = encode_key(&[Some("a")]);
1446        let k2 = encode_key(&[Some("b")]);
1447        let source_reader = VecBatchReader::new(&[
1448            new_batch(&k1, &[(2, false)], 1000, 3),
1449            new_batch(&k2, &[(2, false)], 1000, 3),
1450        ]);
1451
1452        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1453        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1454        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1455        check_reader_result(
1456            &mut compat_reader,
1457            &[
1458                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1459                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1460            ],
1461        )
1462        .await;
1463    }
1464
1465    /// Creates a primary key array for flat format testing.
1466    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1467        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1468        for &pk in primary_keys {
1469            builder.append(pk).unwrap();
1470        }
1471        Arc::new(builder.finish())
1472    }
1473
1474    #[test]
1475    fn test_flat_compat_batch_with_missing_columns() {
1476        let actual_metadata = Arc::new(new_metadata(
1477            &[
1478                (
1479                    0,
1480                    SemanticType::Timestamp,
1481                    ConcreteDataType::timestamp_millisecond_datatype(),
1482                ),
1483                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1484                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1485            ],
1486            &[1],
1487        ));
1488
1489        let expected_metadata = Arc::new(new_metadata(
1490            &[
1491                (
1492                    0,
1493                    SemanticType::Timestamp,
1494                    ConcreteDataType::timestamp_millisecond_datatype(),
1495                ),
1496                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1497                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1498                // Adds a new field.
1499                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1500            ],
1501            &[1],
1502        ));
1503
1504        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1505        let read_format = FlatReadFormat::new(
1506            actual_metadata.clone(),
1507            [0, 1, 2, 3].into_iter(),
1508            None,
1509            "test",
1510            false,
1511        )
1512        .unwrap();
1513        let format_projection = read_format.format_projection();
1514
1515        let compat_batch =
1516            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1517                .unwrap()
1518                .unwrap();
1519
1520        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1521        tag_builder.append_value("tag1");
1522        tag_builder.append_value("tag1");
1523        let tag_dict_array = Arc::new(tag_builder.finish());
1524
1525        let k1 = encode_key(&[Some("tag1")]);
1526        let input_columns: Vec<ArrayRef> = vec![
1527            tag_dict_array.clone(),
1528            Arc::new(Int64Array::from(vec![100, 200])),
1529            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1530            build_flat_test_pk_array(&[&k1, &k1]),
1531            Arc::new(UInt64Array::from_iter_values([1, 2])),
1532            Arc::new(UInt8Array::from_iter_values([
1533                OpType::Put as u8,
1534                OpType::Put as u8,
1535            ])),
1536        ];
1537        let input_schema =
1538            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1539        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1540
1541        let result = compat_batch.compat(input_batch).unwrap();
1542
1543        let expected_schema =
1544            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1545
1546        let expected_columns: Vec<ArrayRef> = vec![
1547            tag_dict_array.clone(),
1548            Arc::new(Int64Array::from(vec![100, 200])),
1549            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1550            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1551            build_flat_test_pk_array(&[&k1, &k1]),
1552            Arc::new(UInt64Array::from_iter_values([1, 2])),
1553            Arc::new(UInt8Array::from_iter_values([
1554                OpType::Put as u8,
1555                OpType::Put as u8,
1556            ])),
1557        ];
1558        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1559
1560        assert_eq!(expected_batch, result);
1561    }
1562
1563    #[test]
1564    fn test_flat_compat_batch_with_different_pk_encoding() {
1565        let mut actual_metadata = new_metadata(
1566            &[
1567                (
1568                    0,
1569                    SemanticType::Timestamp,
1570                    ConcreteDataType::timestamp_millisecond_datatype(),
1571                ),
1572                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1573                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1574            ],
1575            &[1],
1576        );
1577        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1578        let actual_metadata = Arc::new(actual_metadata);
1579
1580        let mut expected_metadata = new_metadata(
1581            &[
1582                (
1583                    0,
1584                    SemanticType::Timestamp,
1585                    ConcreteDataType::timestamp_millisecond_datatype(),
1586                ),
1587                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1588                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1589                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1590            ],
1591            &[1, 3],
1592        );
1593        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1594        let expected_metadata = Arc::new(expected_metadata);
1595
1596        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1597        let read_format = FlatReadFormat::new(
1598            actual_metadata.clone(),
1599            [0, 1, 2, 3].into_iter(),
1600            None,
1601            "test",
1602            false,
1603        )
1604        .unwrap();
1605        let format_projection = read_format.format_projection();
1606
1607        let compat_batch =
1608            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1609                .unwrap()
1610                .unwrap();
1611
1612        // Tag array.
1613        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1614        tag1_builder.append_value("tag1");
1615        tag1_builder.append_value("tag1");
1616        let tag1_dict_array = Arc::new(tag1_builder.finish());
1617
1618        let k1 = encode_key(&[Some("tag1")]);
1619        let input_columns: Vec<ArrayRef> = vec![
1620            tag1_dict_array.clone(),
1621            Arc::new(Int64Array::from(vec![100, 200])),
1622            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1623            build_flat_test_pk_array(&[&k1, &k1]),
1624            Arc::new(UInt64Array::from_iter_values([1, 2])),
1625            Arc::new(UInt8Array::from_iter_values([
1626                OpType::Put as u8,
1627                OpType::Put as u8,
1628            ])),
1629        ];
1630        let input_schema =
1631            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1632        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1633
1634        let result = compat_batch.compat(input_batch).unwrap();
1635
1636        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1637        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1638        null_tag_builder.append_nulls(2);
1639        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1640        let expected_columns: Vec<ArrayRef> = vec![
1641            tag1_dict_array.clone(),
1642            null_tag_dict_array,
1643            Arc::new(Int64Array::from(vec![100, 200])),
1644            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1645            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1646            Arc::new(UInt64Array::from_iter_values([1, 2])),
1647            Arc::new(UInt8Array::from_iter_values([
1648                OpType::Put as u8,
1649                OpType::Put as u8,
1650            ])),
1651        ];
1652        let output_schema =
1653            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1654        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1655
1656        assert_eq!(expected_batch, result);
1657    }
1658
1659    #[test]
1660    fn test_flat_compat_batch_compact_sparse() {
1661        let mut actual_metadata = new_metadata(
1662            &[
1663                (
1664                    0,
1665                    SemanticType::Timestamp,
1666                    ConcreteDataType::timestamp_millisecond_datatype(),
1667                ),
1668                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1669            ],
1670            &[],
1671        );
1672        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1673        let actual_metadata = Arc::new(actual_metadata);
1674
1675        let mut expected_metadata = new_metadata(
1676            &[
1677                (
1678                    0,
1679                    SemanticType::Timestamp,
1680                    ConcreteDataType::timestamp_millisecond_datatype(),
1681                ),
1682                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1683                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1684            ],
1685            &[],
1686        );
1687        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1688        let expected_metadata = Arc::new(expected_metadata);
1689
1690        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1691        let read_format = FlatReadFormat::new(
1692            actual_metadata.clone(),
1693            [0, 2, 3].into_iter(),
1694            None,
1695            "test",
1696            true,
1697        )
1698        .unwrap();
1699        let format_projection = read_format.format_projection();
1700
1701        let compat_batch =
1702            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1703                .unwrap()
1704                .unwrap();
1705
1706        let sparse_k1 = encode_sparse_key(&[]);
1707        let input_columns: Vec<ArrayRef> = vec![
1708            Arc::new(Int64Array::from(vec![100, 200])),
1709            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1710            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1711            Arc::new(UInt64Array::from_iter_values([1, 2])),
1712            Arc::new(UInt8Array::from_iter_values([
1713                OpType::Put as u8,
1714                OpType::Put as u8,
1715            ])),
1716        ];
1717        let input_schema =
1718            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1719        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1720
1721        let result = compat_batch.compat(input_batch).unwrap();
1722
1723        let expected_columns: Vec<ArrayRef> = vec![
1724            Arc::new(Int64Array::from(vec![100, 200])),
1725            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1726            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1727            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1728            Arc::new(UInt64Array::from_iter_values([1, 2])),
1729            Arc::new(UInt8Array::from_iter_values([
1730                OpType::Put as u8,
1731                OpType::Put as u8,
1732            ])),
1733        ];
1734        let output_schema =
1735            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1736        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1737
1738        assert_eq!(expected_batch, result);
1739    }
1740}