Skip to main content

mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use common_recordbatch::recordbatch::align_json_array;
22use datatypes::arrow::array::{
23    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
24};
25use datatypes::arrow::compute::{TakeOptions, take};
26use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::{Helper, VectorRef};
32use mito_codec::row_converter::{
33    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34    build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42    CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
43    DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
44    UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
48use crate::read::{Batch, BatchColumn, BatchReader};
49use crate::sst::parquet::flat_format::primary_key_column_index;
50use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
51use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
52
53/// Reader to adapt schema of underlying reader to expected schema.
54pub struct CompatReader<R> {
55    /// Underlying reader.
56    reader: R,
57    /// Helper to compat batches.
58    compat: PrimaryKeyCompatBatch,
59}
60
61impl<R> CompatReader<R> {
62    /// Creates a new compat reader.
63    /// - `mapper` is built from the metadata users expect to see.
64    /// - `reader_meta` is the metadata of the input reader.
65    /// - `reader` is the input reader.
66    pub fn new(
67        mapper: &ProjectionMapper,
68        reader_meta: RegionMetadataRef,
69        reader: R,
70    ) -> Result<CompatReader<R>> {
71        Ok(CompatReader {
72            reader,
73            compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
74        })
75    }
76}
77
78#[async_trait::async_trait]
79impl<R: BatchReader> BatchReader for CompatReader<R> {
80    async fn next_batch(&mut self) -> Result<Option<Batch>> {
81        let Some(mut batch) = self.reader.next_batch().await? else {
82            return Ok(None);
83        };
84
85        batch = self.compat.compat_batch(batch)?;
86
87        Ok(Some(batch))
88    }
89}
90
91/// Helper to adapt schema of the batch to an expected schema.
92pub(crate) enum CompatBatch {
93    /// Adapter for primary key format.
94    PrimaryKey(PrimaryKeyCompatBatch),
95    /// Adapter for flat format.
96    Flat(FlatCompatBatch),
97}
98
99impl CompatBatch {
100    /// Returns the inner primary key batch adapter if this is a PrimaryKey format.
101    #[allow(dead_code)]
102    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
103        match self {
104            CompatBatch::PrimaryKey(batch) => Some(batch),
105            _ => None,
106        }
107    }
108
109    /// Returns the inner flat batch adapter if this is a Flat format.
110    pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
111        match self {
112            CompatBatch::Flat(batch) => Some(batch),
113            _ => None,
114        }
115    }
116}
117
118/// A helper struct to adapt schema of the batch to an expected schema.
119pub(crate) struct PrimaryKeyCompatBatch {
120    /// Optional primary key adapter.
121    rewrite_pk: Option<RewritePrimaryKey>,
122    /// Optional primary key adapter.
123    compat_pk: Option<CompatPrimaryKey>,
124    /// Optional fields adapter.
125    compat_fields: Option<CompatFields>,
126}
127
128impl PrimaryKeyCompatBatch {
129    /// Creates a new [CompatBatch].
130    /// - `mapper` is built from the metadata users expect to see.
131    /// - `reader_meta` is the metadata of the input reader.
132    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
133        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
134        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
135        let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
136            reason: "Unexpected format",
137        })?;
138        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
139
140        Ok(Self {
141            rewrite_pk,
142            compat_pk,
143            compat_fields,
144        })
145    }
146
147    /// Adapts the `batch` to the expected schema.
148    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
149        if let Some(rewrite_pk) = &self.rewrite_pk {
150            batch = rewrite_pk.compat(batch)?;
151        }
152        if let Some(compat_pk) = &self.compat_pk {
153            batch = compat_pk.compat(batch)?;
154        }
155        if let Some(compat_fields) = &self.compat_fields {
156            batch = compat_fields.compat(batch)?;
157        }
158
159        Ok(batch)
160    }
161}
162
163/// Returns true if `left` and `right` have same columns and primary key encoding.
164pub(crate) fn has_same_columns_and_pk_encoding(
165    left: &RegionMetadata,
166    right: &RegionMetadata,
167) -> bool {
168    if left.primary_key_encoding != right.primary_key_encoding {
169        return false;
170    }
171
172    if left.column_metadatas.len() != right.column_metadatas.len() {
173        return false;
174    }
175
176    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
177        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
178            return false;
179        }
180        debug_assert_eq!(
181            left_col.column_schema.data_type,
182            right_col.column_schema.data_type
183        );
184        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
185    }
186
187    true
188}
189
190/// A helper struct to adapt schema of the batch to an expected schema.
191pub(crate) struct FlatCompatBatch {
192    /// Indices to convert actual fields to expect fields.
193    index_or_defaults: Vec<IndexOrDefault>,
194    /// Expected arrow schema.
195    arrow_schema: SchemaRef,
196    /// Primary key adapter.
197    compat_pk: FlatCompatPrimaryKey,
198}
199
200impl FlatCompatBatch {
201    /// Creates a [FlatCompatBatch].
202    ///
203    /// - `mapper` is built from the metadata users expect to see.
204    /// - `actual` is the [RegionMetadata] of the input parquet.
205    /// - `format_projection` is the projection of the read format for the input parquet.
206    /// - `compaction` indicates whether the reader is for compaction.
207    pub(crate) fn try_new(
208        mapper: &FlatProjectionMapper,
209        actual: &RegionMetadataRef,
210        format_projection: &FormatProjection,
211        compaction: bool,
212    ) -> Result<Option<Self>> {
213        let actual_schema = flat_projected_columns(actual, format_projection);
214        let expect_schema = mapper.batch_schema();
215        if expect_schema == actual_schema {
216            // Although the SST has a different schema, but the schema after projection is the same
217            // as expected schema.
218            return Ok(None);
219        }
220
221        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
222            // Special handling for sparse encoding in compaction.
223            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
224        }
225
226        let (index_or_defaults, fields) =
227            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
228
229        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
230
231        Ok(Some(Self {
232            index_or_defaults,
233            arrow_schema: Arc::new(Schema::new(fields)),
234            compat_pk,
235        }))
236    }
237
238    fn compute_index_and_fields(
239        actual_schema: &[(ColumnId, ConcreteDataType)],
240        expect_schema: &[(ColumnId, ConcreteDataType)],
241        expect_metadata: &RegionMetadata,
242    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
243        // Maps column id to the index and data type in the actual schema.
244        let actual_schema_index: HashMap<_, _> = actual_schema
245            .iter()
246            .enumerate()
247            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
248            .collect();
249
250        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
251        let mut fields = Vec::with_capacity(expect_schema.len());
252        for (column_id, expect_data_type) in expect_schema {
253            // Safety: expect_schema comes from the same mapper.
254            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
255            let expect_column = &expect_metadata.column_metadatas[column_index];
256            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
257            // For tag columns, we need to create a dictionary field.
258            if expect_column.semantic_type == SemanticType::Tag {
259                fields.push(tag_maybe_to_dictionary_field(
260                    &expect_column.column_schema.data_type,
261                    column_field,
262                ));
263            } else {
264                fields.push(column_field.clone());
265            };
266
267            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
268                let mut cast_type = None;
269
270                // Same column different type.
271                if expect_data_type != *actual_data_type {
272                    cast_type = Some(expect_data_type.clone())
273                }
274                // Source has this column.
275                index_or_defaults.push(IndexOrDefault::Index {
276                    pos: *index,
277                    cast_type,
278                });
279            } else {
280                // Create a default vector with 1 element for that column.
281                let default_vector = expect_column
282                    .column_schema
283                    .create_default_vector(1)
284                    .context(CreateDefaultSnafu {
285                        region_id: expect_metadata.region_id,
286                        column: &expect_column.column_schema.name,
287                    })?
288                    .with_context(|| CompatReaderSnafu {
289                        region_id: expect_metadata.region_id,
290                        reason: format!(
291                            "column {} does not have a default value to read",
292                            expect_column.column_schema.name
293                        ),
294                    })?;
295                index_or_defaults.push(IndexOrDefault::DefaultValue {
296                    column_id: expect_column.column_id,
297                    default_vector,
298                    semantic_type: expect_column.semantic_type,
299                });
300            };
301        }
302        fields.extend_from_slice(&internal_fields());
303
304        Ok((index_or_defaults, fields))
305    }
306
307    fn try_new_compact_sparse(
308        mapper: &FlatProjectionMapper,
309        actual: &RegionMetadataRef,
310    ) -> Result<Option<Self>> {
311        // Currently, we don't support converting sparse encoding back to dense encoding in
312        // flat format.
313        ensure!(
314            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
315            UnsupportedOperationSnafu {
316                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
317            }
318        );
319
320        // For sparse encoding, we don't need to check the primary keys.
321        // Since this is for compaction, we always read all columns.
322        let actual_schema: Vec<_> = actual
323            .field_columns()
324            .chain([actual.time_index_column()])
325            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
326            .collect();
327        let expect_schema: Vec<_> = mapper
328            .metadata()
329            .field_columns()
330            .chain([mapper.metadata().time_index_column()])
331            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
332            .collect();
333
334        let (index_or_defaults, fields) =
335            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
336
337        let compat_pk = FlatCompatPrimaryKey::default();
338
339        Ok(Some(Self {
340            index_or_defaults,
341            arrow_schema: Arc::new(Schema::new(fields)),
342            compat_pk,
343        }))
344    }
345
346    /// Make columns of the `batch` compatible.
347    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
348        let len = batch.num_rows();
349        let columns = self
350            .index_or_defaults
351            .iter()
352            .map(|index_or_default| match index_or_default {
353                IndexOrDefault::Index { pos, cast_type } => {
354                    let old_column = batch.column(*pos);
355
356                    if let Some(ty) = cast_type {
357                        let casted = if let Some(json_type) = ty.as_json() {
358                            align_json_array(old_column, &json_type.as_arrow_type())
359                                .context(RecordBatchSnafu)?
360                        } else {
361                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
362                                .context(ComputeArrowSnafu)?
363                        };
364                        Ok(casted)
365                    } else {
366                        Ok(old_column.clone())
367                    }
368                }
369                IndexOrDefault::DefaultValue {
370                    column_id: _,
371                    default_vector,
372                    semantic_type,
373                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
374            })
375            .chain(
376                // Adds internal columns.
377                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
378                    .iter()
379                    .map(|col| Ok(col.clone())),
380            )
381            .collect::<Result<Vec<_>>>()?;
382
383        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
384            .context(NewRecordBatchSnafu)?;
385
386        // Handles primary keys.
387        self.compat_pk.compat(compat_batch)
388    }
389}
390
391/// Repeats the vector value `to_len` times.
392fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
393    assert_eq!(1, vector.len());
394    let data_type = vector.data_type();
395    if is_tag && data_type.is_string() {
396        let values = vector.to_arrow_array();
397        if values.is_null(0) {
398            // Creates a dictionary array with `to_len` null keys.
399            let keys = UInt32Array::new_null(to_len);
400            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
401        } else {
402            let keys = UInt32Array::from_value(0, to_len);
403            Ok(Arc::new(DictionaryArray::new(keys, values)))
404        }
405    } else {
406        let keys = UInt32Array::from_value(0, to_len);
407        take(
408            &vector.to_arrow_array(),
409            &keys,
410            Some(TakeOptions {
411                check_bounds: false,
412            }),
413        )
414        .context(ComputeArrowSnafu)
415    }
416}
417
418/// Helper to make primary key compatible.
419#[derive(Debug)]
420struct CompatPrimaryKey {
421    /// Row converter to append values to primary keys.
422    converter: Arc<dyn PrimaryKeyCodec>,
423    /// Default values to append.
424    values: Vec<(ColumnId, Value)>,
425}
426
427impl CompatPrimaryKey {
428    /// Make primary key of the `batch` compatible.
429    fn compat(&self, mut batch: Batch) -> Result<Batch> {
430        let mut buffer = Vec::with_capacity(
431            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
432        );
433        buffer.extend_from_slice(batch.primary_key());
434        self.converter
435            .encode_values(&self.values, &mut buffer)
436            .context(EncodeSnafu)?;
437
438        batch.set_primary_key(buffer);
439
440        // update cache
441        if let Some(pk_values) = &mut batch.pk_values {
442            pk_values.extend(&self.values);
443        }
444
445        Ok(batch)
446    }
447}
448
449/// Helper to make fields compatible.
450#[derive(Debug)]
451struct CompatFields {
452    /// Column Ids and DataTypes the reader actually returns.
453    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
454    /// Indices to convert actual fields to expect fields.
455    index_or_defaults: Vec<IndexOrDefault>,
456}
457
458impl CompatFields {
459    /// Make fields of the `batch` compatible.
460    fn compat(&self, batch: Batch) -> Result<Batch> {
461        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
462        debug_assert!(
463            self.actual_fields
464                .iter()
465                .zip(batch.fields())
466                .all(|((id, _), batch_column)| *id == batch_column.column_id)
467        );
468
469        let len = batch.num_rows();
470        self.index_or_defaults
471            .iter()
472            .map(|index_or_default| match index_or_default {
473                IndexOrDefault::Index { pos, cast_type } => {
474                    let old_column = &batch.fields()[*pos];
475
476                    let data = if let Some(ty) = cast_type {
477                        if let Some(json_type) = ty.as_json() {
478                            let json_array = old_column.data.to_arrow_array();
479                            let json_array =
480                                align_json_array(&json_array, &json_type.as_arrow_type())
481                                    .context(RecordBatchSnafu)?;
482                            Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
483                        } else {
484                            old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
485                                from: old_column.data.data_type(),
486                                to: ty.clone(),
487                            })?
488                        }
489                    } else {
490                        old_column.data.clone()
491                    };
492                    Ok(BatchColumn {
493                        column_id: old_column.column_id,
494                        data,
495                    })
496                }
497                IndexOrDefault::DefaultValue {
498                    column_id,
499                    default_vector,
500                    semantic_type: _,
501                } => {
502                    let data = default_vector.replicate(&[len]);
503                    Ok(BatchColumn {
504                        column_id: *column_id,
505                        data,
506                    })
507                }
508            })
509            .collect::<Result<Vec<_>>>()
510            .and_then(|fields| batch.with_fields(fields))
511    }
512}
513
514fn may_rewrite_primary_key(
515    expect: &RegionMetadata,
516    actual: &RegionMetadata,
517) -> Option<RewritePrimaryKey> {
518    if expect.primary_key_encoding == actual.primary_key_encoding {
519        return None;
520    }
521
522    let fields = expect.primary_key.clone();
523    let original = build_primary_key_codec(actual);
524    let new = build_primary_key_codec(expect);
525
526    Some(RewritePrimaryKey {
527        original,
528        new,
529        fields,
530    })
531}
532
533/// Returns true if the actual primary keys is the same as expected.
534fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
535    ensure!(
536        actual.primary_key.len() <= expect.primary_key.len(),
537        CompatReaderSnafu {
538            region_id: expect.region_id,
539            reason: format!(
540                "primary key has more columns {} than expect {}",
541                actual.primary_key.len(),
542                expect.primary_key.len()
543            ),
544        }
545    );
546    ensure!(
547        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
548        CompatReaderSnafu {
549            region_id: expect.region_id,
550            reason: format!(
551                "primary key has different prefix, expect: {:?}, actual: {:?}",
552                expect.primary_key, actual.primary_key
553            ),
554        }
555    );
556
557    Ok(actual.primary_key.len() == expect.primary_key.len())
558}
559
560/// Creates a [CompatPrimaryKey] if needed.
561fn may_compat_primary_key(
562    expect: &RegionMetadata,
563    actual: &RegionMetadata,
564) -> Result<Option<CompatPrimaryKey>> {
565    if is_primary_key_same(expect, actual)? {
566        return Ok(None);
567    }
568
569    // We need to append default values to the primary key.
570    let to_add = &expect.primary_key[actual.primary_key.len()..];
571    let mut fields = Vec::with_capacity(to_add.len());
572    let mut values = Vec::with_capacity(to_add.len());
573    for column_id in to_add {
574        // Safety: The id comes from expect region metadata.
575        let column = expect.column_by_id(*column_id).unwrap();
576        fields.push((
577            *column_id,
578            SortField::new(column.column_schema.data_type.clone()),
579        ));
580        let default_value = column
581            .column_schema
582            .create_default()
583            .context(CreateDefaultSnafu {
584                region_id: expect.region_id,
585                column: &column.column_schema.name,
586            })?
587            .with_context(|| CompatReaderSnafu {
588                region_id: expect.region_id,
589                reason: format!(
590                    "key column {} does not have a default value to read",
591                    column.column_schema.name
592                ),
593            })?;
594        values.push((*column_id, default_value));
595    }
596    // Using expect primary key encoding to build the converter
597    let converter =
598        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
599
600    Ok(Some(CompatPrimaryKey { converter, values }))
601}
602
603/// Creates a [CompatFields] if needed.
604fn may_compat_fields(
605    mapper: &PrimaryKeyProjectionMapper,
606    actual: &RegionMetadata,
607) -> Result<Option<CompatFields>> {
608    let expect_fields = mapper.batch_fields();
609    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
610    if expect_fields == actual_fields {
611        return Ok(None);
612    }
613
614    let source_field_index: HashMap<_, _> = actual_fields
615        .iter()
616        .enumerate()
617        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
618        .collect();
619
620    let index_or_defaults = expect_fields
621        .iter()
622        .map(|(column_id, expect_data_type)| {
623            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
624                let mut cast_type = None;
625
626                if expect_data_type != *actual_data_type {
627                    cast_type = Some(expect_data_type.clone())
628                }
629                // Source has this field.
630                Ok(IndexOrDefault::Index {
631                    pos: *index,
632                    cast_type,
633                })
634            } else {
635                // Safety: mapper must have this column.
636                let column = mapper.metadata().column_by_id(*column_id).unwrap();
637                // Create a default vector with 1 element for that column.
638                let default_vector = column
639                    .column_schema
640                    .create_default_vector(1)
641                    .context(CreateDefaultSnafu {
642                        region_id: mapper.metadata().region_id,
643                        column: &column.column_schema.name,
644                    })?
645                    .with_context(|| CompatReaderSnafu {
646                        region_id: mapper.metadata().region_id,
647                        reason: format!(
648                            "column {} does not have a default value to read",
649                            column.column_schema.name
650                        ),
651                    })?;
652                Ok(IndexOrDefault::DefaultValue {
653                    column_id: column.column_id,
654                    default_vector,
655                    semantic_type: SemanticType::Field,
656                })
657            }
658        })
659        .collect::<Result<Vec<_>>>()?;
660
661    Ok(Some(CompatFields {
662        actual_fields,
663        index_or_defaults,
664    }))
665}
666
667/// Index in source batch or a default value to fill a column.
668#[derive(Debug)]
669enum IndexOrDefault {
670    /// Index of the column in source batch.
671    Index {
672        pos: usize,
673        cast_type: Option<ConcreteDataType>,
674    },
675    /// Default value for the column.
676    DefaultValue {
677        /// Id of the column.
678        column_id: ColumnId,
679        /// Default value. The vector has only 1 element.
680        default_vector: VectorRef,
681        /// Semantic type of the column.
682        semantic_type: SemanticType,
683    },
684}
685
686/// Adapter to rewrite primary key.
687struct RewritePrimaryKey {
688    /// Original primary key codec.
689    original: Arc<dyn PrimaryKeyCodec>,
690    /// New primary key codec.
691    new: Arc<dyn PrimaryKeyCodec>,
692    /// Order of the fields in the new primary key.
693    fields: Vec<ColumnId>,
694}
695
696impl RewritePrimaryKey {
697    /// Make primary key of the `batch` compatible.
698    fn compat(&self, mut batch: Batch) -> Result<Batch> {
699        if batch.pk_values().is_none() {
700            let new_pk_values = self
701                .original
702                .decode(batch.primary_key())
703                .context(DecodeSnafu)?;
704            batch.set_pk_values(new_pk_values);
705        }
706        // Safety: We ensure pk_values is not None.
707        let values = batch.pk_values().unwrap();
708
709        let mut buffer = Vec::with_capacity(
710            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
711        );
712        match values {
713            CompositeValues::Dense(values) => {
714                self.new
715                    .encode_values(values.as_slice(), &mut buffer)
716                    .context(EncodeSnafu)?;
717            }
718            CompositeValues::Sparse(values) => {
719                let values = self
720                    .fields
721                    .iter()
722                    .map(|id| {
723                        let value = values.get_or_null(*id);
724                        (*id, value.as_value_ref())
725                    })
726                    .collect::<Vec<_>>();
727                self.new
728                    .encode_value_refs(&values, &mut buffer)
729                    .context(EncodeSnafu)?;
730            }
731        }
732        batch.set_primary_key(buffer);
733
734        Ok(batch)
735    }
736}
737
738/// Helper to rewrite primary key to another encoding for flat format.
739struct FlatRewritePrimaryKey {
740    /// New primary key encoder.
741    codec: Arc<dyn PrimaryKeyCodec>,
742    /// Metadata of the expected region.
743    metadata: RegionMetadataRef,
744    /// Original primary key codec.
745    /// If we need to rewrite the primary key.
746    old_codec: Arc<dyn PrimaryKeyCodec>,
747}
748
749impl FlatRewritePrimaryKey {
750    fn new(
751        expect: &RegionMetadataRef,
752        actual: &RegionMetadataRef,
753    ) -> Option<FlatRewritePrimaryKey> {
754        if expect.primary_key_encoding == actual.primary_key_encoding {
755            return None;
756        }
757        let codec = build_primary_key_codec(expect);
758        let old_codec = build_primary_key_codec(actual);
759
760        Some(FlatRewritePrimaryKey {
761            codec,
762            metadata: expect.clone(),
763            old_codec,
764        })
765    }
766
767    /// Rewrites the primary key of the `batch`.
768    /// It also appends the values to the primary key.
769    fn rewrite_key(
770        &self,
771        append_values: &[(ColumnId, Value)],
772        batch: RecordBatch,
773    ) -> Result<RecordBatch> {
774        let old_pk_dict_array = batch
775            .column(primary_key_column_index(batch.num_columns()))
776            .as_any()
777            .downcast_ref::<PrimaryKeyArray>()
778            .unwrap();
779        let old_pk_values_array = old_pk_dict_array
780            .values()
781            .as_any()
782            .downcast_ref::<BinaryArray>()
783            .unwrap();
784        let mut builder = BinaryBuilder::with_capacity(
785            old_pk_values_array.len(),
786            old_pk_values_array.value_data().len(),
787        );
788
789        // Binary buffer for the primary key.
790        let mut buffer = Vec::with_capacity(
791            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
792        );
793        let mut column_id_values = Vec::new();
794        // Iterates the binary array and rewrites the primary key.
795        for value in old_pk_values_array.iter() {
796            let Some(old_pk) = value else {
797                builder.append_null();
798                continue;
799            };
800            // Decodes the old primary key.
801            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
802            pk_values.extend(append_values);
803
804            buffer.clear();
805            column_id_values.clear();
806            // Encodes the new primary key.
807            match pk_values {
808                CompositeValues::Dense(dense_values) => {
809                    self.codec
810                        .encode_values(dense_values.as_slice(), &mut buffer)
811                        .context(EncodeSnafu)?;
812                }
813                CompositeValues::Sparse(sparse_values) => {
814                    for id in &self.metadata.primary_key {
815                        let value = sparse_values.get_or_null(*id);
816                        column_id_values.push((*id, value.clone()));
817                    }
818                    self.codec
819                        .encode_values(&column_id_values, &mut buffer)
820                        .context(EncodeSnafu)?;
821                }
822            }
823            builder.append_value(&buffer);
824        }
825        let new_pk_values_array = Arc::new(builder.finish());
826        let new_pk_dict_array =
827            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
828
829        let mut columns = batch.columns().to_vec();
830        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
831
832        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
833    }
834}
835
836/// Helper to make primary key compatible for flat format.
837#[derive(Default)]
838struct FlatCompatPrimaryKey {
839    /// Primary key rewriter.
840    rewriter: Option<FlatRewritePrimaryKey>,
841    /// Converter to append values to primary keys.
842    converter: Option<Arc<dyn PrimaryKeyCodec>>,
843    /// Default values to append.
844    values: Vec<(ColumnId, Value)>,
845}
846
847impl FlatCompatPrimaryKey {
848    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
849        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
850
851        if is_primary_key_same(expect, actual)? {
852            return Ok(Self {
853                rewriter,
854                converter: None,
855                values: Vec::new(),
856            });
857        }
858
859        // We need to append default values to the primary key.
860        let to_add = &expect.primary_key[actual.primary_key.len()..];
861        let mut values = Vec::with_capacity(to_add.len());
862        let mut fields = Vec::with_capacity(to_add.len());
863        for column_id in to_add {
864            // Safety: The id comes from expect region metadata.
865            let column = expect.column_by_id(*column_id).unwrap();
866            fields.push((
867                *column_id,
868                SortField::new(column.column_schema.data_type.clone()),
869            ));
870            let default_value = column
871                .column_schema
872                .create_default()
873                .context(CreateDefaultSnafu {
874                    region_id: expect.region_id,
875                    column: &column.column_schema.name,
876                })?
877                .with_context(|| CompatReaderSnafu {
878                    region_id: expect.region_id,
879                    reason: format!(
880                        "key column {} does not have a default value to read",
881                        column.column_schema.name
882                    ),
883                })?;
884            values.push((*column_id, default_value));
885        }
886        // is_primary_key_same() is false so we have different number of primary key columns.
887        debug_assert!(!fields.is_empty());
888
889        // Create converter to append values.
890        let converter = Some(build_primary_key_codec_with_fields(
891            expect.primary_key_encoding,
892            fields.into_iter(),
893        ));
894
895        Ok(Self {
896            rewriter,
897            converter,
898            values,
899        })
900    }
901
902    /// Makes primary key of the `batch` compatible.
903    ///
904    /// Callers must ensure other columns except the `__primary_key` column is compatible.
905    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
906        if let Some(rewriter) = &self.rewriter {
907            // If we have different encoding, rewrite the whole primary key.
908            return rewriter.rewrite_key(&self.values, batch);
909        }
910
911        self.append_key(batch)
912    }
913
914    /// Appends values to the primary key of the `batch`.
915    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
916        let Some(converter) = &self.converter else {
917            return Ok(batch);
918        };
919
920        let old_pk_dict_array = batch
921            .column(primary_key_column_index(batch.num_columns()))
922            .as_any()
923            .downcast_ref::<PrimaryKeyArray>()
924            .unwrap();
925        let old_pk_values_array = old_pk_dict_array
926            .values()
927            .as_any()
928            .downcast_ref::<BinaryArray>()
929            .unwrap();
930        let mut builder = BinaryBuilder::with_capacity(
931            old_pk_values_array.len(),
932            old_pk_values_array.value_data().len()
933                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
934        );
935
936        // Binary buffer for the primary key.
937        let mut buffer = Vec::with_capacity(
938            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
939                + converter.estimated_size().unwrap_or_default(),
940        );
941
942        // Iterates the binary array and appends values to the primary key.
943        for value in old_pk_values_array.iter() {
944            let Some(old_pk) = value else {
945                builder.append_null();
946                continue;
947            };
948
949            buffer.clear();
950            buffer.extend_from_slice(old_pk);
951            converter
952                .encode_values(&self.values, &mut buffer)
953                .context(EncodeSnafu)?;
954
955            builder.append_value(&buffer);
956        }
957
958        let new_pk_values_array = Arc::new(builder.finish());
959        let new_pk_dict_array =
960            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
961
962        // Overrides the primary key column.
963        let mut columns = batch.columns().to_vec();
964        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
965
966        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
967    }
968}
969
970#[cfg(test)]
971mod tests {
972    use std::sync::Arc;
973
974    use api::v1::{OpType, SemanticType};
975    use datatypes::arrow::array::{
976        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
977        TimestampMillisecondArray, UInt8Array, UInt64Array,
978    };
979    use datatypes::arrow::datatypes::UInt32Type;
980    use datatypes::arrow::record_batch::RecordBatch;
981    use datatypes::prelude::ConcreteDataType;
982    use datatypes::schema::ColumnSchema;
983    use datatypes::value::ValueRef;
984    use mito_codec::row_converter::{
985        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
986    };
987    use store_api::codec::PrimaryKeyEncoding;
988    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
989    use store_api::storage::RegionId;
990
991    use super::*;
992    use crate::read::flat_projection::FlatProjectionMapper;
993    use crate::sst::parquet::flat_format::FlatReadFormat;
994    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
995
996    /// Creates a new [RegionMetadata].
997    fn new_metadata(
998        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
999        primary_key: &[ColumnId],
1000    ) -> RegionMetadata {
1001        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1002        for (id, semantic_type, data_type) in semantic_types {
1003            let column_schema = match semantic_type {
1004                SemanticType::Tag => {
1005                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
1006                }
1007                SemanticType::Field => {
1008                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1009                }
1010                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1011            };
1012
1013            builder.push_column_metadata(ColumnMetadata {
1014                column_schema,
1015                semantic_type: *semantic_type,
1016                column_id: *id,
1017            });
1018        }
1019        builder.primary_key(primary_key.to_vec());
1020        builder.build().unwrap()
1021    }
1022
1023    /// Encode primary key.
1024    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1025        let fields = (0..keys.len())
1026            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1027            .collect();
1028        let converter = DensePrimaryKeyCodec::with_fields(fields);
1029        let row = keys.iter().map(|str_opt| match str_opt {
1030            Some(v) => ValueRef::String(v),
1031            None => ValueRef::Null,
1032        });
1033
1034        converter.encode(row).unwrap()
1035    }
1036
1037    /// Encode sparse primary key.
1038    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1039        let fields = (0..keys.len())
1040            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1041            .collect();
1042        let converter = SparsePrimaryKeyCodec::with_fields(fields);
1043        let row = keys
1044            .iter()
1045            .map(|(id, str_opt)| match str_opt {
1046                Some(v) => (*id, ValueRef::String(v)),
1047                None => (*id, ValueRef::Null),
1048            })
1049            .collect::<Vec<_>>();
1050        let mut buffer = vec![];
1051        converter.encode_value_refs(&row, &mut buffer).unwrap();
1052        buffer
1053    }
1054
1055    #[test]
1056    fn test_invalid_pk_len() {
1057        let reader_meta = new_metadata(
1058            &[
1059                (
1060                    0,
1061                    SemanticType::Timestamp,
1062                    ConcreteDataType::timestamp_millisecond_datatype(),
1063                ),
1064                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1065                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1066                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1067            ],
1068            &[1, 2],
1069        );
1070        let expect_meta = new_metadata(
1071            &[
1072                (
1073                    0,
1074                    SemanticType::Timestamp,
1075                    ConcreteDataType::timestamp_millisecond_datatype(),
1076                ),
1077                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1078                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1079            ],
1080            &[1],
1081        );
1082        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1083    }
1084
1085    #[test]
1086    fn test_different_pk() {
1087        let reader_meta = new_metadata(
1088            &[
1089                (
1090                    0,
1091                    SemanticType::Timestamp,
1092                    ConcreteDataType::timestamp_millisecond_datatype(),
1093                ),
1094                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1096                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1097            ],
1098            &[2, 1],
1099        );
1100        let expect_meta = new_metadata(
1101            &[
1102                (
1103                    0,
1104                    SemanticType::Timestamp,
1105                    ConcreteDataType::timestamp_millisecond_datatype(),
1106                ),
1107                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1108                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1109                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1110                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1111            ],
1112            &[1, 2, 4],
1113        );
1114        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1115    }
1116
1117    #[test]
1118    fn test_same_pk() {
1119        let reader_meta = new_metadata(
1120            &[
1121                (
1122                    0,
1123                    SemanticType::Timestamp,
1124                    ConcreteDataType::timestamp_millisecond_datatype(),
1125                ),
1126                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1127                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1128            ],
1129            &[1],
1130        );
1131        assert!(
1132            may_compat_primary_key(&reader_meta, &reader_meta)
1133                .unwrap()
1134                .is_none()
1135        );
1136    }
1137
1138    #[test]
1139    fn test_same_pk_encoding() {
1140        let reader_meta = Arc::new(new_metadata(
1141            &[
1142                (
1143                    0,
1144                    SemanticType::Timestamp,
1145                    ConcreteDataType::timestamp_millisecond_datatype(),
1146                ),
1147                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1148            ],
1149            &[1],
1150        ));
1151
1152        assert!(
1153            may_compat_primary_key(&reader_meta, &reader_meta)
1154                .unwrap()
1155                .is_none()
1156        );
1157    }
1158
1159    #[test]
1160    fn test_same_fields() {
1161        let reader_meta = Arc::new(new_metadata(
1162            &[
1163                (
1164                    0,
1165                    SemanticType::Timestamp,
1166                    ConcreteDataType::timestamp_millisecond_datatype(),
1167                ),
1168                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1169                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1170            ],
1171            &[1],
1172        ));
1173        let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1174        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1175    }
1176
1177    /// Creates a primary key array for flat format testing.
1178    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1179        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1180        for &pk in primary_keys {
1181            builder.append(pk).unwrap();
1182        }
1183        Arc::new(builder.finish())
1184    }
1185
1186    #[test]
1187    fn test_flat_compat_batch_with_missing_columns() {
1188        let actual_metadata = Arc::new(new_metadata(
1189            &[
1190                (
1191                    0,
1192                    SemanticType::Timestamp,
1193                    ConcreteDataType::timestamp_millisecond_datatype(),
1194                ),
1195                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1196                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1197            ],
1198            &[1],
1199        ));
1200
1201        let expected_metadata = Arc::new(new_metadata(
1202            &[
1203                (
1204                    0,
1205                    SemanticType::Timestamp,
1206                    ConcreteDataType::timestamp_millisecond_datatype(),
1207                ),
1208                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1209                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1210                // Adds a new field.
1211                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1212            ],
1213            &[1],
1214        ));
1215
1216        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1217        let read_format = FlatReadFormat::new(
1218            actual_metadata.clone(),
1219            [0, 1, 2, 3].into_iter(),
1220            None,
1221            "test",
1222            false,
1223        )
1224        .unwrap();
1225        let format_projection = read_format.format_projection();
1226
1227        let compat_batch =
1228            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1229                .unwrap()
1230                .unwrap();
1231
1232        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1233        tag_builder.append_value("tag1");
1234        tag_builder.append_value("tag1");
1235        let tag_dict_array = Arc::new(tag_builder.finish());
1236
1237        let k1 = encode_key(&[Some("tag1")]);
1238        let input_columns: Vec<ArrayRef> = vec![
1239            tag_dict_array.clone(),
1240            Arc::new(Int64Array::from(vec![100, 200])),
1241            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1242            build_flat_test_pk_array(&[&k1, &k1]),
1243            Arc::new(UInt64Array::from_iter_values([1, 2])),
1244            Arc::new(UInt8Array::from_iter_values([
1245                OpType::Put as u8,
1246                OpType::Put as u8,
1247            ])),
1248        ];
1249        let input_schema =
1250            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1251        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1252
1253        let result = compat_batch.compat(input_batch).unwrap();
1254
1255        let expected_schema =
1256            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1257
1258        let expected_columns: Vec<ArrayRef> = vec![
1259            tag_dict_array.clone(),
1260            Arc::new(Int64Array::from(vec![100, 200])),
1261            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1262            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1263            build_flat_test_pk_array(&[&k1, &k1]),
1264            Arc::new(UInt64Array::from_iter_values([1, 2])),
1265            Arc::new(UInt8Array::from_iter_values([
1266                OpType::Put as u8,
1267                OpType::Put as u8,
1268            ])),
1269        ];
1270        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1271
1272        assert_eq!(expected_batch, result);
1273    }
1274
1275    #[test]
1276    fn test_flat_compat_batch_with_read_projection_superset() {
1277        let actual_metadata = Arc::new(new_metadata(
1278            &[
1279                (
1280                    0,
1281                    SemanticType::Timestamp,
1282                    ConcreteDataType::timestamp_millisecond_datatype(),
1283                ),
1284                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1285                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1286            ],
1287            &[1],
1288        ));
1289
1290        let expected_metadata = Arc::new(new_metadata(
1291            &[
1292                (
1293                    0,
1294                    SemanticType::Timestamp,
1295                    ConcreteDataType::timestamp_millisecond_datatype(),
1296                ),
1297                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1298                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1299                // Adds a new field.
1300                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1301            ],
1302            &[1],
1303        ));
1304
1305        // Output projection: tag_1, field_2. Read also includes field_3.
1306        let mapper = FlatProjectionMapper::new_with_read_columns(
1307            &expected_metadata,
1308            vec![1, 2],
1309            vec![1, 2, 3],
1310        )
1311        .unwrap();
1312        let read_format = FlatReadFormat::new(
1313            actual_metadata.clone(),
1314            [1, 2, 3].into_iter(),
1315            None,
1316            "test",
1317            false,
1318        )
1319        .unwrap();
1320        let format_projection = read_format.format_projection();
1321
1322        let compat_batch =
1323            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1324                .unwrap()
1325                .unwrap();
1326
1327        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1328        tag_builder.append_value("tag1");
1329        tag_builder.append_value("tag1");
1330        let tag_dict_array = Arc::new(tag_builder.finish());
1331
1332        let k1 = encode_key(&[Some("tag1")]);
1333        let input_columns: Vec<ArrayRef> = vec![
1334            tag_dict_array.clone(),
1335            Arc::new(Int64Array::from(vec![100, 200])),
1336            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1337            build_flat_test_pk_array(&[&k1, &k1]),
1338            Arc::new(UInt64Array::from_iter_values([1, 2])),
1339            Arc::new(UInt8Array::from_iter_values([
1340                OpType::Put as u8,
1341                OpType::Put as u8,
1342            ])),
1343        ];
1344        let input_schema =
1345            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1346        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1347
1348        let result = compat_batch.compat(input_batch).unwrap();
1349
1350        let expected_schema =
1351            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1352        let expected_columns: Vec<ArrayRef> = vec![
1353            tag_dict_array.clone(),
1354            Arc::new(Int64Array::from(vec![100, 200])),
1355            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1356            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1357            build_flat_test_pk_array(&[&k1, &k1]),
1358            Arc::new(UInt64Array::from_iter_values([1, 2])),
1359            Arc::new(UInt8Array::from_iter_values([
1360                OpType::Put as u8,
1361                OpType::Put as u8,
1362            ])),
1363        ];
1364        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1365
1366        assert_eq!(expected_batch, result);
1367    }
1368
1369    #[test]
1370    fn test_flat_compat_batch_with_different_pk_encoding() {
1371        let mut actual_metadata = new_metadata(
1372            &[
1373                (
1374                    0,
1375                    SemanticType::Timestamp,
1376                    ConcreteDataType::timestamp_millisecond_datatype(),
1377                ),
1378                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1379                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380            ],
1381            &[1],
1382        );
1383        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1384        let actual_metadata = Arc::new(actual_metadata);
1385
1386        let mut expected_metadata = new_metadata(
1387            &[
1388                (
1389                    0,
1390                    SemanticType::Timestamp,
1391                    ConcreteDataType::timestamp_millisecond_datatype(),
1392                ),
1393                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1394                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1395                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1396            ],
1397            &[1, 3],
1398        );
1399        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1400        let expected_metadata = Arc::new(expected_metadata);
1401
1402        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1403        let read_format = FlatReadFormat::new(
1404            actual_metadata.clone(),
1405            [0, 1, 2, 3].into_iter(),
1406            None,
1407            "test",
1408            false,
1409        )
1410        .unwrap();
1411        let format_projection = read_format.format_projection();
1412
1413        let compat_batch =
1414            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1415                .unwrap()
1416                .unwrap();
1417
1418        // Tag array.
1419        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1420        tag1_builder.append_value("tag1");
1421        tag1_builder.append_value("tag1");
1422        let tag1_dict_array = Arc::new(tag1_builder.finish());
1423
1424        let k1 = encode_key(&[Some("tag1")]);
1425        let input_columns: Vec<ArrayRef> = vec![
1426            tag1_dict_array.clone(),
1427            Arc::new(Int64Array::from(vec![100, 200])),
1428            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1429            build_flat_test_pk_array(&[&k1, &k1]),
1430            Arc::new(UInt64Array::from_iter_values([1, 2])),
1431            Arc::new(UInt8Array::from_iter_values([
1432                OpType::Put as u8,
1433                OpType::Put as u8,
1434            ])),
1435        ];
1436        let input_schema =
1437            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1438        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1439
1440        let result = compat_batch.compat(input_batch).unwrap();
1441
1442        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1443        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1444        null_tag_builder.append_nulls(2);
1445        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1446        let expected_columns: Vec<ArrayRef> = vec![
1447            tag1_dict_array.clone(),
1448            null_tag_dict_array,
1449            Arc::new(Int64Array::from(vec![100, 200])),
1450            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1451            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1452            Arc::new(UInt64Array::from_iter_values([1, 2])),
1453            Arc::new(UInt8Array::from_iter_values([
1454                OpType::Put as u8,
1455                OpType::Put as u8,
1456            ])),
1457        ];
1458        let output_schema =
1459            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1460        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1461
1462        assert_eq!(expected_batch, result);
1463    }
1464
1465    #[test]
1466    fn test_flat_compat_batch_compact_sparse() {
1467        let mut actual_metadata = new_metadata(
1468            &[
1469                (
1470                    0,
1471                    SemanticType::Timestamp,
1472                    ConcreteDataType::timestamp_millisecond_datatype(),
1473                ),
1474                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1475            ],
1476            &[],
1477        );
1478        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1479        let actual_metadata = Arc::new(actual_metadata);
1480
1481        let mut expected_metadata = new_metadata(
1482            &[
1483                (
1484                    0,
1485                    SemanticType::Timestamp,
1486                    ConcreteDataType::timestamp_millisecond_datatype(),
1487                ),
1488                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1489                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1490            ],
1491            &[],
1492        );
1493        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1494        let expected_metadata = Arc::new(expected_metadata);
1495
1496        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1497        let read_format = FlatReadFormat::new(
1498            actual_metadata.clone(),
1499            [0, 2, 3].into_iter(),
1500            None,
1501            "test",
1502            true,
1503        )
1504        .unwrap();
1505        let format_projection = read_format.format_projection();
1506
1507        let compat_batch =
1508            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1509                .unwrap()
1510                .unwrap();
1511
1512        let sparse_k1 = encode_sparse_key(&[]);
1513        let input_columns: Vec<ArrayRef> = vec![
1514            Arc::new(Int64Array::from(vec![100, 200])),
1515            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1516            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1517            Arc::new(UInt64Array::from_iter_values([1, 2])),
1518            Arc::new(UInt8Array::from_iter_values([
1519                OpType::Put as u8,
1520                OpType::Put as u8,
1521            ])),
1522        ];
1523        let input_schema =
1524            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1525        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1526
1527        let result = compat_batch.compat(input_batch).unwrap();
1528
1529        let expected_columns: Vec<ArrayRef> = vec![
1530            Arc::new(Int64Array::from(vec![100, 200])),
1531            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1532            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1533            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1534            Arc::new(UInt64Array::from_iter_values([1, 2])),
1535            Arc::new(UInt8Array::from_iter_values([
1536                OpType::Put as u8,
1537                OpType::Put as u8,
1538            ])),
1539        ];
1540        let output_schema =
1541            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1542        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1543
1544        assert_eq!(expected_batch, result);
1545    }
1546}