mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use common_recordbatch::recordbatch::align_json_array;
22use datatypes::arrow::array::{
23    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
24};
25use datatypes::arrow::compute::{TakeOptions, take};
26use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::{Helper, VectorRef};
32use mito_codec::row_converter::{
33    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34    build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42    CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
43    DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
44    UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
48use crate::read::{Batch, BatchColumn, BatchReader};
49use crate::sst::parquet::flat_format::primary_key_column_index;
50use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
51use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
52
53/// Reader to adapt schema of underlying reader to expected schema.
54pub struct CompatReader<R> {
55    /// Underlying reader.
56    reader: R,
57    /// Helper to compat batches.
58    compat: PrimaryKeyCompatBatch,
59}
60
61impl<R> CompatReader<R> {
62    /// Creates a new compat reader.
63    /// - `mapper` is built from the metadata users expect to see.
64    /// - `reader_meta` is the metadata of the input reader.
65    /// - `reader` is the input reader.
66    pub fn new(
67        mapper: &ProjectionMapper,
68        reader_meta: RegionMetadataRef,
69        reader: R,
70    ) -> Result<CompatReader<R>> {
71        Ok(CompatReader {
72            reader,
73            compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
74        })
75    }
76}
77
78#[async_trait::async_trait]
79impl<R: BatchReader> BatchReader for CompatReader<R> {
80    async fn next_batch(&mut self) -> Result<Option<Batch>> {
81        let Some(mut batch) = self.reader.next_batch().await? else {
82            return Ok(None);
83        };
84
85        batch = self.compat.compat_batch(batch)?;
86
87        Ok(Some(batch))
88    }
89}
90
91/// Helper to adapt schema of the batch to an expected schema.
92pub(crate) enum CompatBatch {
93    /// Adapter for primary key format.
94    PrimaryKey(PrimaryKeyCompatBatch),
95    /// Adapter for flat format.
96    Flat(FlatCompatBatch),
97}
98
99impl CompatBatch {
100    /// Returns the inner primary key batch adapter if this is a PrimaryKey format.
101    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
102        match self {
103            CompatBatch::PrimaryKey(batch) => Some(batch),
104            _ => None,
105        }
106    }
107
108    /// Returns the inner flat batch adapter if this is a Flat format.
109    pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
110        match self {
111            CompatBatch::Flat(batch) => Some(batch),
112            _ => None,
113        }
114    }
115}
116
117/// A helper struct to adapt schema of the batch to an expected schema.
118pub(crate) struct PrimaryKeyCompatBatch {
119    /// Optional primary key adapter.
120    rewrite_pk: Option<RewritePrimaryKey>,
121    /// Optional primary key adapter.
122    compat_pk: Option<CompatPrimaryKey>,
123    /// Optional fields adapter.
124    compat_fields: Option<CompatFields>,
125}
126
127impl PrimaryKeyCompatBatch {
128    /// Creates a new [CompatBatch].
129    /// - `mapper` is built from the metadata users expect to see.
130    /// - `reader_meta` is the metadata of the input reader.
131    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
132        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
133        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
134        let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
135            reason: "Unexpected format",
136        })?;
137        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
138
139        Ok(Self {
140            rewrite_pk,
141            compat_pk,
142            compat_fields,
143        })
144    }
145
146    /// Adapts the `batch` to the expected schema.
147    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
148        if let Some(rewrite_pk) = &self.rewrite_pk {
149            batch = rewrite_pk.compat(batch)?;
150        }
151        if let Some(compat_pk) = &self.compat_pk {
152            batch = compat_pk.compat(batch)?;
153        }
154        if let Some(compat_fields) = &self.compat_fields {
155            batch = compat_fields.compat(batch)?;
156        }
157
158        Ok(batch)
159    }
160}
161
162/// Returns true if `left` and `right` have same columns and primary key encoding.
163pub(crate) fn has_same_columns_and_pk_encoding(
164    left: &RegionMetadata,
165    right: &RegionMetadata,
166) -> bool {
167    if left.primary_key_encoding != right.primary_key_encoding {
168        return false;
169    }
170
171    if left.column_metadatas.len() != right.column_metadatas.len() {
172        return false;
173    }
174
175    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
176        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
177            return false;
178        }
179        debug_assert_eq!(
180            left_col.column_schema.data_type,
181            right_col.column_schema.data_type
182        );
183        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
184    }
185
186    true
187}
188
189/// A helper struct to adapt schema of the batch to an expected schema.
190pub(crate) struct FlatCompatBatch {
191    /// Indices to convert actual fields to expect fields.
192    index_or_defaults: Vec<IndexOrDefault>,
193    /// Expected arrow schema.
194    arrow_schema: SchemaRef,
195    /// Primary key adapter.
196    compat_pk: FlatCompatPrimaryKey,
197}
198
199impl FlatCompatBatch {
200    /// Creates a [FlatCompatBatch].
201    ///
202    /// - `mapper` is built from the metadata users expect to see.
203    /// - `actual` is the [RegionMetadata] of the input parquet.
204    /// - `format_projection` is the projection of the read format for the input parquet.
205    /// - `compaction` indicates whether the reader is for compaction.
206    pub(crate) fn try_new(
207        mapper: &FlatProjectionMapper,
208        actual: &RegionMetadataRef,
209        format_projection: &FormatProjection,
210        compaction: bool,
211    ) -> Result<Option<Self>> {
212        let actual_schema = flat_projected_columns(actual, format_projection);
213        let expect_schema = mapper.batch_schema();
214        if expect_schema == actual_schema {
215            // Although the SST has a different schema, but the schema after projection is the same
216            // as expected schema.
217            return Ok(None);
218        }
219
220        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
221            // Special handling for sparse encoding in compaction.
222            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
223        }
224
225        let (index_or_defaults, fields) =
226            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
227
228        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
229
230        Ok(Some(Self {
231            index_or_defaults,
232            arrow_schema: Arc::new(Schema::new(fields)),
233            compat_pk,
234        }))
235    }
236
237    fn compute_index_and_fields(
238        actual_schema: &[(ColumnId, ConcreteDataType)],
239        expect_schema: &[(ColumnId, ConcreteDataType)],
240        expect_metadata: &RegionMetadata,
241    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
242        // Maps column id to the index and data type in the actual schema.
243        let actual_schema_index: HashMap<_, _> = actual_schema
244            .iter()
245            .enumerate()
246            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
247            .collect();
248
249        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
250        let mut fields = Vec::with_capacity(expect_schema.len());
251        for (column_id, expect_data_type) in expect_schema {
252            // Safety: expect_schema comes from the same mapper.
253            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
254            let expect_column = &expect_metadata.column_metadatas[column_index];
255            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
256            // For tag columns, we need to create a dictionary field.
257            if expect_column.semantic_type == SemanticType::Tag {
258                fields.push(tag_maybe_to_dictionary_field(
259                    &expect_column.column_schema.data_type,
260                    column_field,
261                ));
262            } else {
263                fields.push(column_field.clone());
264            };
265
266            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
267                let mut cast_type = None;
268
269                // Same column different type.
270                if expect_data_type != *actual_data_type {
271                    cast_type = Some(expect_data_type.clone())
272                }
273                // Source has this column.
274                index_or_defaults.push(IndexOrDefault::Index {
275                    pos: *index,
276                    cast_type,
277                });
278            } else {
279                // Create a default vector with 1 element for that column.
280                let default_vector = expect_column
281                    .column_schema
282                    .create_default_vector(1)
283                    .context(CreateDefaultSnafu {
284                        region_id: expect_metadata.region_id,
285                        column: &expect_column.column_schema.name,
286                    })?
287                    .with_context(|| CompatReaderSnafu {
288                        region_id: expect_metadata.region_id,
289                        reason: format!(
290                            "column {} does not have a default value to read",
291                            expect_column.column_schema.name
292                        ),
293                    })?;
294                index_or_defaults.push(IndexOrDefault::DefaultValue {
295                    column_id: expect_column.column_id,
296                    default_vector,
297                    semantic_type: expect_column.semantic_type,
298                });
299            };
300        }
301        fields.extend_from_slice(&internal_fields());
302
303        Ok((index_or_defaults, fields))
304    }
305
306    fn try_new_compact_sparse(
307        mapper: &FlatProjectionMapper,
308        actual: &RegionMetadataRef,
309    ) -> Result<Option<Self>> {
310        // Currently, we don't support converting sparse encoding back to dense encoding in
311        // flat format.
312        ensure!(
313            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
314            UnsupportedOperationSnafu {
315                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
316            }
317        );
318
319        // For sparse encoding, we don't need to check the primary keys.
320        // Since this is for compaction, we always read all columns.
321        let actual_schema: Vec<_> = actual
322            .field_columns()
323            .chain([actual.time_index_column()])
324            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
325            .collect();
326        let expect_schema: Vec<_> = mapper
327            .metadata()
328            .field_columns()
329            .chain([mapper.metadata().time_index_column()])
330            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
331            .collect();
332
333        let (index_or_defaults, fields) =
334            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
335
336        let compat_pk = FlatCompatPrimaryKey::default();
337
338        Ok(Some(Self {
339            index_or_defaults,
340            arrow_schema: Arc::new(Schema::new(fields)),
341            compat_pk,
342        }))
343    }
344
345    /// Make columns of the `batch` compatible.
346    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
347        let len = batch.num_rows();
348        let columns = self
349            .index_or_defaults
350            .iter()
351            .map(|index_or_default| match index_or_default {
352                IndexOrDefault::Index { pos, cast_type } => {
353                    let old_column = batch.column(*pos);
354
355                    if let Some(ty) = cast_type {
356                        let casted = if let Some(json_type) = ty.as_json() {
357                            align_json_array(old_column, &json_type.as_arrow_type())
358                                .context(RecordBatchSnafu)?
359                        } else {
360                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
361                                .context(ComputeArrowSnafu)?
362                        };
363                        Ok(casted)
364                    } else {
365                        Ok(old_column.clone())
366                    }
367                }
368                IndexOrDefault::DefaultValue {
369                    column_id: _,
370                    default_vector,
371                    semantic_type,
372                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
373            })
374            .chain(
375                // Adds internal columns.
376                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
377                    .iter()
378                    .map(|col| Ok(col.clone())),
379            )
380            .collect::<Result<Vec<_>>>()?;
381
382        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
383            .context(NewRecordBatchSnafu)?;
384
385        // Handles primary keys.
386        self.compat_pk.compat(compat_batch)
387    }
388}
389
390/// Repeats the vector value `to_len` times.
391fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
392    assert_eq!(1, vector.len());
393    let data_type = vector.data_type();
394    if is_tag && data_type.is_string() {
395        let values = vector.to_arrow_array();
396        if values.is_null(0) {
397            // Creates a dictionary array with `to_len` null keys.
398            let keys = UInt32Array::new_null(to_len);
399            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
400        } else {
401            let keys = UInt32Array::from_value(0, to_len);
402            Ok(Arc::new(DictionaryArray::new(keys, values)))
403        }
404    } else {
405        let keys = UInt32Array::from_value(0, to_len);
406        take(
407            &vector.to_arrow_array(),
408            &keys,
409            Some(TakeOptions {
410                check_bounds: false,
411            }),
412        )
413        .context(ComputeArrowSnafu)
414    }
415}
416
417/// Helper to make primary key compatible.
418#[derive(Debug)]
419struct CompatPrimaryKey {
420    /// Row converter to append values to primary keys.
421    converter: Arc<dyn PrimaryKeyCodec>,
422    /// Default values to append.
423    values: Vec<(ColumnId, Value)>,
424}
425
426impl CompatPrimaryKey {
427    /// Make primary key of the `batch` compatible.
428    fn compat(&self, mut batch: Batch) -> Result<Batch> {
429        let mut buffer = Vec::with_capacity(
430            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
431        );
432        buffer.extend_from_slice(batch.primary_key());
433        self.converter
434            .encode_values(&self.values, &mut buffer)
435            .context(EncodeSnafu)?;
436
437        batch.set_primary_key(buffer);
438
439        // update cache
440        if let Some(pk_values) = &mut batch.pk_values {
441            pk_values.extend(&self.values);
442        }
443
444        Ok(batch)
445    }
446}
447
448/// Helper to make fields compatible.
449#[derive(Debug)]
450struct CompatFields {
451    /// Column Ids and DataTypes the reader actually returns.
452    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
453    /// Indices to convert actual fields to expect fields.
454    index_or_defaults: Vec<IndexOrDefault>,
455}
456
457impl CompatFields {
458    /// Make fields of the `batch` compatible.
459    fn compat(&self, batch: Batch) -> Result<Batch> {
460        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
461        debug_assert!(
462            self.actual_fields
463                .iter()
464                .zip(batch.fields())
465                .all(|((id, _), batch_column)| *id == batch_column.column_id)
466        );
467
468        let len = batch.num_rows();
469        self.index_or_defaults
470            .iter()
471            .map(|index_or_default| match index_or_default {
472                IndexOrDefault::Index { pos, cast_type } => {
473                    let old_column = &batch.fields()[*pos];
474
475                    let data = if let Some(ty) = cast_type {
476                        if let Some(json_type) = ty.as_json() {
477                            let json_array = old_column.data.to_arrow_array();
478                            let json_array =
479                                align_json_array(&json_array, &json_type.as_arrow_type())
480                                    .context(RecordBatchSnafu)?;
481                            Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
482                        } else {
483                            old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
484                                from: old_column.data.data_type(),
485                                to: ty.clone(),
486                            })?
487                        }
488                    } else {
489                        old_column.data.clone()
490                    };
491                    Ok(BatchColumn {
492                        column_id: old_column.column_id,
493                        data,
494                    })
495                }
496                IndexOrDefault::DefaultValue {
497                    column_id,
498                    default_vector,
499                    semantic_type: _,
500                } => {
501                    let data = default_vector.replicate(&[len]);
502                    Ok(BatchColumn {
503                        column_id: *column_id,
504                        data,
505                    })
506                }
507            })
508            .collect::<Result<Vec<_>>>()
509            .and_then(|fields| batch.with_fields(fields))
510    }
511}
512
513fn may_rewrite_primary_key(
514    expect: &RegionMetadata,
515    actual: &RegionMetadata,
516) -> Option<RewritePrimaryKey> {
517    if expect.primary_key_encoding == actual.primary_key_encoding {
518        return None;
519    }
520
521    let fields = expect.primary_key.clone();
522    let original = build_primary_key_codec(actual);
523    let new = build_primary_key_codec(expect);
524
525    Some(RewritePrimaryKey {
526        original,
527        new,
528        fields,
529    })
530}
531
532/// Returns true if the actual primary keys is the same as expected.
533fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
534    ensure!(
535        actual.primary_key.len() <= expect.primary_key.len(),
536        CompatReaderSnafu {
537            region_id: expect.region_id,
538            reason: format!(
539                "primary key has more columns {} than expect {}",
540                actual.primary_key.len(),
541                expect.primary_key.len()
542            ),
543        }
544    );
545    ensure!(
546        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
547        CompatReaderSnafu {
548            region_id: expect.region_id,
549            reason: format!(
550                "primary key has different prefix, expect: {:?}, actual: {:?}",
551                expect.primary_key, actual.primary_key
552            ),
553        }
554    );
555
556    Ok(actual.primary_key.len() == expect.primary_key.len())
557}
558
559/// Creates a [CompatPrimaryKey] if needed.
560fn may_compat_primary_key(
561    expect: &RegionMetadata,
562    actual: &RegionMetadata,
563) -> Result<Option<CompatPrimaryKey>> {
564    if is_primary_key_same(expect, actual)? {
565        return Ok(None);
566    }
567
568    // We need to append default values to the primary key.
569    let to_add = &expect.primary_key[actual.primary_key.len()..];
570    let mut fields = Vec::with_capacity(to_add.len());
571    let mut values = Vec::with_capacity(to_add.len());
572    for column_id in to_add {
573        // Safety: The id comes from expect region metadata.
574        let column = expect.column_by_id(*column_id).unwrap();
575        fields.push((
576            *column_id,
577            SortField::new(column.column_schema.data_type.clone()),
578        ));
579        let default_value = column
580            .column_schema
581            .create_default()
582            .context(CreateDefaultSnafu {
583                region_id: expect.region_id,
584                column: &column.column_schema.name,
585            })?
586            .with_context(|| CompatReaderSnafu {
587                region_id: expect.region_id,
588                reason: format!(
589                    "key column {} does not have a default value to read",
590                    column.column_schema.name
591                ),
592            })?;
593        values.push((*column_id, default_value));
594    }
595    // Using expect primary key encoding to build the converter
596    let converter =
597        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
598
599    Ok(Some(CompatPrimaryKey { converter, values }))
600}
601
602/// Creates a [CompatFields] if needed.
603fn may_compat_fields(
604    mapper: &PrimaryKeyProjectionMapper,
605    actual: &RegionMetadata,
606) -> Result<Option<CompatFields>> {
607    let expect_fields = mapper.batch_fields();
608    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
609    if expect_fields == actual_fields {
610        return Ok(None);
611    }
612
613    let source_field_index: HashMap<_, _> = actual_fields
614        .iter()
615        .enumerate()
616        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
617        .collect();
618
619    let index_or_defaults = expect_fields
620        .iter()
621        .map(|(column_id, expect_data_type)| {
622            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
623                let mut cast_type = None;
624
625                if expect_data_type != *actual_data_type {
626                    cast_type = Some(expect_data_type.clone())
627                }
628                // Source has this field.
629                Ok(IndexOrDefault::Index {
630                    pos: *index,
631                    cast_type,
632                })
633            } else {
634                // Safety: mapper must have this column.
635                let column = mapper.metadata().column_by_id(*column_id).unwrap();
636                // Create a default vector with 1 element for that column.
637                let default_vector = column
638                    .column_schema
639                    .create_default_vector(1)
640                    .context(CreateDefaultSnafu {
641                        region_id: mapper.metadata().region_id,
642                        column: &column.column_schema.name,
643                    })?
644                    .with_context(|| CompatReaderSnafu {
645                        region_id: mapper.metadata().region_id,
646                        reason: format!(
647                            "column {} does not have a default value to read",
648                            column.column_schema.name
649                        ),
650                    })?;
651                Ok(IndexOrDefault::DefaultValue {
652                    column_id: column.column_id,
653                    default_vector,
654                    semantic_type: SemanticType::Field,
655                })
656            }
657        })
658        .collect::<Result<Vec<_>>>()?;
659
660    Ok(Some(CompatFields {
661        actual_fields,
662        index_or_defaults,
663    }))
664}
665
666/// Index in source batch or a default value to fill a column.
667#[derive(Debug)]
668enum IndexOrDefault {
669    /// Index of the column in source batch.
670    Index {
671        pos: usize,
672        cast_type: Option<ConcreteDataType>,
673    },
674    /// Default value for the column.
675    DefaultValue {
676        /// Id of the column.
677        column_id: ColumnId,
678        /// Default value. The vector has only 1 element.
679        default_vector: VectorRef,
680        /// Semantic type of the column.
681        semantic_type: SemanticType,
682    },
683}
684
685/// Adapter to rewrite primary key.
686struct RewritePrimaryKey {
687    /// Original primary key codec.
688    original: Arc<dyn PrimaryKeyCodec>,
689    /// New primary key codec.
690    new: Arc<dyn PrimaryKeyCodec>,
691    /// Order of the fields in the new primary key.
692    fields: Vec<ColumnId>,
693}
694
695impl RewritePrimaryKey {
696    /// Make primary key of the `batch` compatible.
697    fn compat(&self, mut batch: Batch) -> Result<Batch> {
698        if batch.pk_values().is_none() {
699            let new_pk_values = self
700                .original
701                .decode(batch.primary_key())
702                .context(DecodeSnafu)?;
703            batch.set_pk_values(new_pk_values);
704        }
705        // Safety: We ensure pk_values is not None.
706        let values = batch.pk_values().unwrap();
707
708        let mut buffer = Vec::with_capacity(
709            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
710        );
711        match values {
712            CompositeValues::Dense(values) => {
713                self.new
714                    .encode_values(values.as_slice(), &mut buffer)
715                    .context(EncodeSnafu)?;
716            }
717            CompositeValues::Sparse(values) => {
718                let values = self
719                    .fields
720                    .iter()
721                    .map(|id| {
722                        let value = values.get_or_null(*id);
723                        (*id, value.as_value_ref())
724                    })
725                    .collect::<Vec<_>>();
726                self.new
727                    .encode_value_refs(&values, &mut buffer)
728                    .context(EncodeSnafu)?;
729            }
730        }
731        batch.set_primary_key(buffer);
732
733        Ok(batch)
734    }
735}
736
737/// Helper to rewrite primary key to another encoding for flat format.
738struct FlatRewritePrimaryKey {
739    /// New primary key encoder.
740    codec: Arc<dyn PrimaryKeyCodec>,
741    /// Metadata of the expected region.
742    metadata: RegionMetadataRef,
743    /// Original primary key codec.
744    /// If we need to rewrite the primary key.
745    old_codec: Arc<dyn PrimaryKeyCodec>,
746}
747
748impl FlatRewritePrimaryKey {
749    fn new(
750        expect: &RegionMetadataRef,
751        actual: &RegionMetadataRef,
752    ) -> Option<FlatRewritePrimaryKey> {
753        if expect.primary_key_encoding == actual.primary_key_encoding {
754            return None;
755        }
756        let codec = build_primary_key_codec(expect);
757        let old_codec = build_primary_key_codec(actual);
758
759        Some(FlatRewritePrimaryKey {
760            codec,
761            metadata: expect.clone(),
762            old_codec,
763        })
764    }
765
766    /// Rewrites the primary key of the `batch`.
767    /// It also appends the values to the primary key.
768    fn rewrite_key(
769        &self,
770        append_values: &[(ColumnId, Value)],
771        batch: RecordBatch,
772    ) -> Result<RecordBatch> {
773        let old_pk_dict_array = batch
774            .column(primary_key_column_index(batch.num_columns()))
775            .as_any()
776            .downcast_ref::<PrimaryKeyArray>()
777            .unwrap();
778        let old_pk_values_array = old_pk_dict_array
779            .values()
780            .as_any()
781            .downcast_ref::<BinaryArray>()
782            .unwrap();
783        let mut builder = BinaryBuilder::with_capacity(
784            old_pk_values_array.len(),
785            old_pk_values_array.value_data().len(),
786        );
787
788        // Binary buffer for the primary key.
789        let mut buffer = Vec::with_capacity(
790            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
791        );
792        let mut column_id_values = Vec::new();
793        // Iterates the binary array and rewrites the primary key.
794        for value in old_pk_values_array.iter() {
795            let Some(old_pk) = value else {
796                builder.append_null();
797                continue;
798            };
799            // Decodes the old primary key.
800            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
801            pk_values.extend(append_values);
802
803            buffer.clear();
804            column_id_values.clear();
805            // Encodes the new primary key.
806            match pk_values {
807                CompositeValues::Dense(dense_values) => {
808                    self.codec
809                        .encode_values(dense_values.as_slice(), &mut buffer)
810                        .context(EncodeSnafu)?;
811                }
812                CompositeValues::Sparse(sparse_values) => {
813                    for id in &self.metadata.primary_key {
814                        let value = sparse_values.get_or_null(*id);
815                        column_id_values.push((*id, value.clone()));
816                    }
817                    self.codec
818                        .encode_values(&column_id_values, &mut buffer)
819                        .context(EncodeSnafu)?;
820                }
821            }
822            builder.append_value(&buffer);
823        }
824        let new_pk_values_array = Arc::new(builder.finish());
825        let new_pk_dict_array =
826            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
827
828        let mut columns = batch.columns().to_vec();
829        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
830
831        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
832    }
833}
834
835/// Helper to make primary key compatible for flat format.
836#[derive(Default)]
837struct FlatCompatPrimaryKey {
838    /// Primary key rewriter.
839    rewriter: Option<FlatRewritePrimaryKey>,
840    /// Converter to append values to primary keys.
841    converter: Option<Arc<dyn PrimaryKeyCodec>>,
842    /// Default values to append.
843    values: Vec<(ColumnId, Value)>,
844}
845
846impl FlatCompatPrimaryKey {
847    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
848        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
849
850        if is_primary_key_same(expect, actual)? {
851            return Ok(Self {
852                rewriter,
853                converter: None,
854                values: Vec::new(),
855            });
856        }
857
858        // We need to append default values to the primary key.
859        let to_add = &expect.primary_key[actual.primary_key.len()..];
860        let mut values = Vec::with_capacity(to_add.len());
861        let mut fields = Vec::with_capacity(to_add.len());
862        for column_id in to_add {
863            // Safety: The id comes from expect region metadata.
864            let column = expect.column_by_id(*column_id).unwrap();
865            fields.push((
866                *column_id,
867                SortField::new(column.column_schema.data_type.clone()),
868            ));
869            let default_value = column
870                .column_schema
871                .create_default()
872                .context(CreateDefaultSnafu {
873                    region_id: expect.region_id,
874                    column: &column.column_schema.name,
875                })?
876                .with_context(|| CompatReaderSnafu {
877                    region_id: expect.region_id,
878                    reason: format!(
879                        "key column {} does not have a default value to read",
880                        column.column_schema.name
881                    ),
882                })?;
883            values.push((*column_id, default_value));
884        }
885        // is_primary_key_same() is false so we have different number of primary key columns.
886        debug_assert!(!fields.is_empty());
887
888        // Create converter to append values.
889        let converter = Some(build_primary_key_codec_with_fields(
890            expect.primary_key_encoding,
891            fields.into_iter(),
892        ));
893
894        Ok(Self {
895            rewriter,
896            converter,
897            values,
898        })
899    }
900
901    /// Makes primary key of the `batch` compatible.
902    ///
903    /// Callers must ensure other columns except the `__primary_key` column is compatible.
904    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
905        if let Some(rewriter) = &self.rewriter {
906            // If we have different encoding, rewrite the whole primary key.
907            return rewriter.rewrite_key(&self.values, batch);
908        }
909
910        self.append_key(batch)
911    }
912
913    /// Appends values to the primary key of the `batch`.
914    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
915        let Some(converter) = &self.converter else {
916            return Ok(batch);
917        };
918
919        let old_pk_dict_array = batch
920            .column(primary_key_column_index(batch.num_columns()))
921            .as_any()
922            .downcast_ref::<PrimaryKeyArray>()
923            .unwrap();
924        let old_pk_values_array = old_pk_dict_array
925            .values()
926            .as_any()
927            .downcast_ref::<BinaryArray>()
928            .unwrap();
929        let mut builder = BinaryBuilder::with_capacity(
930            old_pk_values_array.len(),
931            old_pk_values_array.value_data().len()
932                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
933        );
934
935        // Binary buffer for the primary key.
936        let mut buffer = Vec::with_capacity(
937            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
938                + converter.estimated_size().unwrap_or_default(),
939        );
940
941        // Iterates the binary array and appends values to the primary key.
942        for value in old_pk_values_array.iter() {
943            let Some(old_pk) = value else {
944                builder.append_null();
945                continue;
946            };
947
948            buffer.clear();
949            buffer.extend_from_slice(old_pk);
950            converter
951                .encode_values(&self.values, &mut buffer)
952                .context(EncodeSnafu)?;
953
954            builder.append_value(&buffer);
955        }
956
957        let new_pk_values_array = Arc::new(builder.finish());
958        let new_pk_dict_array =
959            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
960
961        // Overrides the primary key column.
962        let mut columns = batch.columns().to_vec();
963        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
964
965        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
966    }
967}
968
969#[cfg(test)]
970mod tests {
971    use std::sync::Arc;
972
973    use api::v1::{OpType, SemanticType};
974    use datatypes::arrow::array::{
975        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
976        TimestampMillisecondArray, UInt8Array, UInt64Array,
977    };
978    use datatypes::arrow::datatypes::UInt32Type;
979    use datatypes::arrow::record_batch::RecordBatch;
980    use datatypes::prelude::ConcreteDataType;
981    use datatypes::schema::ColumnSchema;
982    use datatypes::value::ValueRef;
983    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
984    use mito_codec::row_converter::{
985        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
986    };
987    use store_api::codec::PrimaryKeyEncoding;
988    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
989    use store_api::storage::RegionId;
990
991    use super::*;
992    use crate::read::flat_projection::FlatProjectionMapper;
993    use crate::sst::parquet::flat_format::FlatReadFormat;
994    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
995    use crate::test_util::{VecBatchReader, check_reader_result};
996
997    /// Creates a new [RegionMetadata].
998    fn new_metadata(
999        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
1000        primary_key: &[ColumnId],
1001    ) -> RegionMetadata {
1002        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1003        for (id, semantic_type, data_type) in semantic_types {
1004            let column_schema = match semantic_type {
1005                SemanticType::Tag => {
1006                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
1007                }
1008                SemanticType::Field => {
1009                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1010                }
1011                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1012            };
1013
1014            builder.push_column_metadata(ColumnMetadata {
1015                column_schema,
1016                semantic_type: *semantic_type,
1017                column_id: *id,
1018            });
1019        }
1020        builder.primary_key(primary_key.to_vec());
1021        builder.build().unwrap()
1022    }
1023
1024    /// Encode primary key.
1025    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1026        let fields = (0..keys.len())
1027            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1028            .collect();
1029        let converter = DensePrimaryKeyCodec::with_fields(fields);
1030        let row = keys.iter().map(|str_opt| match str_opt {
1031            Some(v) => ValueRef::String(v),
1032            None => ValueRef::Null,
1033        });
1034
1035        converter.encode(row).unwrap()
1036    }
1037
1038    /// Encode sparse primary key.
1039    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1040        let fields = (0..keys.len())
1041            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1042            .collect();
1043        let converter = SparsePrimaryKeyCodec::with_fields(fields);
1044        let row = keys
1045            .iter()
1046            .map(|(id, str_opt)| match str_opt {
1047                Some(v) => (*id, ValueRef::String(v)),
1048                None => (*id, ValueRef::Null),
1049            })
1050            .collect::<Vec<_>>();
1051        let mut buffer = vec![];
1052        converter.encode_value_refs(&row, &mut buffer).unwrap();
1053        buffer
1054    }
1055
1056    /// Creates a batch for specific primary `key`.
1057    ///
1058    /// `fields`: [(column_id of the field, is null)]
1059    fn new_batch(
1060        primary_key: &[u8],
1061        fields: &[(ColumnId, bool)],
1062        start_ts: i64,
1063        num_rows: usize,
1064    ) -> Batch {
1065        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1066            start_ts..start_ts + num_rows as i64,
1067        ));
1068        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1069        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1070        let field_columns = fields
1071            .iter()
1072            .map(|(id, is_null)| {
1073                let data = if *is_null {
1074                    Arc::new(Int64Vector::from(vec![None; num_rows]))
1075                } else {
1076                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1077                };
1078                BatchColumn {
1079                    column_id: *id,
1080                    data,
1081                }
1082            })
1083            .collect();
1084        Batch::new(
1085            primary_key.to_vec(),
1086            timestamps,
1087            sequences,
1088            op_types,
1089            field_columns,
1090        )
1091        .unwrap()
1092    }
1093
1094    #[test]
1095    fn test_invalid_pk_len() {
1096        let reader_meta = new_metadata(
1097            &[
1098                (
1099                    0,
1100                    SemanticType::Timestamp,
1101                    ConcreteDataType::timestamp_millisecond_datatype(),
1102                ),
1103                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1104                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1105                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1106            ],
1107            &[1, 2],
1108        );
1109        let expect_meta = new_metadata(
1110            &[
1111                (
1112                    0,
1113                    SemanticType::Timestamp,
1114                    ConcreteDataType::timestamp_millisecond_datatype(),
1115                ),
1116                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1117                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1118            ],
1119            &[1],
1120        );
1121        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1122    }
1123
1124    #[test]
1125    fn test_different_pk() {
1126        let reader_meta = new_metadata(
1127            &[
1128                (
1129                    0,
1130                    SemanticType::Timestamp,
1131                    ConcreteDataType::timestamp_millisecond_datatype(),
1132                ),
1133                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1134                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1135                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1136            ],
1137            &[2, 1],
1138        );
1139        let expect_meta = new_metadata(
1140            &[
1141                (
1142                    0,
1143                    SemanticType::Timestamp,
1144                    ConcreteDataType::timestamp_millisecond_datatype(),
1145                ),
1146                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1147                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1148                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1149                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1150            ],
1151            &[1, 2, 4],
1152        );
1153        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1154    }
1155
1156    #[test]
1157    fn test_same_pk() {
1158        let reader_meta = new_metadata(
1159            &[
1160                (
1161                    0,
1162                    SemanticType::Timestamp,
1163                    ConcreteDataType::timestamp_millisecond_datatype(),
1164                ),
1165                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1166                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1167            ],
1168            &[1],
1169        );
1170        assert!(
1171            may_compat_primary_key(&reader_meta, &reader_meta)
1172                .unwrap()
1173                .is_none()
1174        );
1175    }
1176
1177    #[test]
1178    fn test_same_pk_encoding() {
1179        let reader_meta = Arc::new(new_metadata(
1180            &[
1181                (
1182                    0,
1183                    SemanticType::Timestamp,
1184                    ConcreteDataType::timestamp_millisecond_datatype(),
1185                ),
1186                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1187            ],
1188            &[1],
1189        ));
1190
1191        assert!(
1192            may_compat_primary_key(&reader_meta, &reader_meta)
1193                .unwrap()
1194                .is_none()
1195        );
1196    }
1197
1198    #[test]
1199    fn test_same_fields() {
1200        let reader_meta = Arc::new(new_metadata(
1201            &[
1202                (
1203                    0,
1204                    SemanticType::Timestamp,
1205                    ConcreteDataType::timestamp_millisecond_datatype(),
1206                ),
1207                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1208                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1209            ],
1210            &[1],
1211        ));
1212        let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1213        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1214    }
1215
1216    #[tokio::test]
1217    async fn test_compat_reader() {
1218        let reader_meta = Arc::new(new_metadata(
1219            &[
1220                (
1221                    0,
1222                    SemanticType::Timestamp,
1223                    ConcreteDataType::timestamp_millisecond_datatype(),
1224                ),
1225                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1226                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1227            ],
1228            &[1],
1229        ));
1230        let expect_meta = Arc::new(new_metadata(
1231            &[
1232                (
1233                    0,
1234                    SemanticType::Timestamp,
1235                    ConcreteDataType::timestamp_millisecond_datatype(),
1236                ),
1237                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1238                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1239                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1240                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1241            ],
1242            &[1, 3],
1243        ));
1244        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1245        let k1 = encode_key(&[Some("a")]);
1246        let k2 = encode_key(&[Some("b")]);
1247        let source_reader = VecBatchReader::new(&[
1248            new_batch(&k1, &[(2, false)], 1000, 3),
1249            new_batch(&k2, &[(2, false)], 1000, 3),
1250        ]);
1251
1252        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1253        let k1 = encode_key(&[Some("a"), None]);
1254        let k2 = encode_key(&[Some("b"), None]);
1255        check_reader_result(
1256            &mut compat_reader,
1257            &[
1258                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1259                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1260            ],
1261        )
1262        .await;
1263    }
1264
1265    #[tokio::test]
1266    async fn test_compat_reader_different_order() {
1267        let reader_meta = Arc::new(new_metadata(
1268            &[
1269                (
1270                    0,
1271                    SemanticType::Timestamp,
1272                    ConcreteDataType::timestamp_millisecond_datatype(),
1273                ),
1274                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1275                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1276            ],
1277            &[1],
1278        ));
1279        let expect_meta = Arc::new(new_metadata(
1280            &[
1281                (
1282                    0,
1283                    SemanticType::Timestamp,
1284                    ConcreteDataType::timestamp_millisecond_datatype(),
1285                ),
1286                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1287                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1288                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1289                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1290            ],
1291            &[1],
1292        ));
1293        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1294        let k1 = encode_key(&[Some("a")]);
1295        let k2 = encode_key(&[Some("b")]);
1296        let source_reader = VecBatchReader::new(&[
1297            new_batch(&k1, &[(2, false)], 1000, 3),
1298            new_batch(&k2, &[(2, false)], 1000, 3),
1299        ]);
1300
1301        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1302        check_reader_result(
1303            &mut compat_reader,
1304            &[
1305                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1306                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1307            ],
1308        )
1309        .await;
1310    }
1311
1312    #[tokio::test]
1313    async fn test_compat_reader_different_types() {
1314        let actual_meta = Arc::new(new_metadata(
1315            &[
1316                (
1317                    0,
1318                    SemanticType::Timestamp,
1319                    ConcreteDataType::timestamp_millisecond_datatype(),
1320                ),
1321                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1322                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1323            ],
1324            &[1],
1325        ));
1326        let expect_meta = Arc::new(new_metadata(
1327            &[
1328                (
1329                    0,
1330                    SemanticType::Timestamp,
1331                    ConcreteDataType::timestamp_millisecond_datatype(),
1332                ),
1333                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1334                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1335            ],
1336            &[1],
1337        ));
1338        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1339        let k1 = encode_key(&[Some("a")]);
1340        let k2 = encode_key(&[Some("b")]);
1341        let source_reader = VecBatchReader::new(&[
1342            new_batch(&k1, &[(2, false)], 1000, 3),
1343            new_batch(&k2, &[(2, false)], 1000, 3),
1344        ]);
1345
1346        let fn_batch_cast = |batch: Batch| {
1347            let mut new_fields = batch.fields.clone();
1348            new_fields[0].data = new_fields[0]
1349                .data
1350                .cast(&ConcreteDataType::string_datatype())
1351                .unwrap();
1352
1353            batch.with_fields(new_fields).unwrap()
1354        };
1355        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1356        check_reader_result(
1357            &mut compat_reader,
1358            &[
1359                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1360                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1361            ],
1362        )
1363        .await;
1364    }
1365
1366    #[tokio::test]
1367    async fn test_compat_reader_projection() {
1368        let reader_meta = Arc::new(new_metadata(
1369            &[
1370                (
1371                    0,
1372                    SemanticType::Timestamp,
1373                    ConcreteDataType::timestamp_millisecond_datatype(),
1374                ),
1375                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1376                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1377            ],
1378            &[1],
1379        ));
1380        let expect_meta = Arc::new(new_metadata(
1381            &[
1382                (
1383                    0,
1384                    SemanticType::Timestamp,
1385                    ConcreteDataType::timestamp_millisecond_datatype(),
1386                ),
1387                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1388                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1389                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1390                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1391            ],
1392            &[1],
1393        ));
1394        // tag_1, field_2, field_3
1395        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1396        let k1 = encode_key(&[Some("a")]);
1397        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1398
1399        let mut compat_reader =
1400            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1401        check_reader_result(
1402            &mut compat_reader,
1403            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1404        )
1405        .await;
1406
1407        // tag_1, field_4, field_3
1408        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1409        let k1 = encode_key(&[Some("a")]);
1410        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1411
1412        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1413        check_reader_result(
1414            &mut compat_reader,
1415            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1416        )
1417        .await;
1418    }
1419
1420    #[tokio::test]
1421    async fn test_compat_reader_different_pk_encoding() {
1422        let mut reader_meta = new_metadata(
1423            &[
1424                (
1425                    0,
1426                    SemanticType::Timestamp,
1427                    ConcreteDataType::timestamp_millisecond_datatype(),
1428                ),
1429                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1430                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1431            ],
1432            &[1],
1433        );
1434        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1435        let reader_meta = Arc::new(reader_meta);
1436        let mut expect_meta = new_metadata(
1437            &[
1438                (
1439                    0,
1440                    SemanticType::Timestamp,
1441                    ConcreteDataType::timestamp_millisecond_datatype(),
1442                ),
1443                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1444                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1445                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1446                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1447            ],
1448            &[1, 3],
1449        );
1450        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1451        let expect_meta = Arc::new(expect_meta);
1452
1453        let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1454        let k1 = encode_key(&[Some("a")]);
1455        let k2 = encode_key(&[Some("b")]);
1456        let source_reader = VecBatchReader::new(&[
1457            new_batch(&k1, &[(2, false)], 1000, 3),
1458            new_batch(&k2, &[(2, false)], 1000, 3),
1459        ]);
1460
1461        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1462        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1463        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1464        check_reader_result(
1465            &mut compat_reader,
1466            &[
1467                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1468                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1469            ],
1470        )
1471        .await;
1472    }
1473
1474    /// Creates a primary key array for flat format testing.
1475    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1476        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1477        for &pk in primary_keys {
1478            builder.append(pk).unwrap();
1479        }
1480        Arc::new(builder.finish())
1481    }
1482
1483    #[test]
1484    fn test_flat_compat_batch_with_missing_columns() {
1485        let actual_metadata = Arc::new(new_metadata(
1486            &[
1487                (
1488                    0,
1489                    SemanticType::Timestamp,
1490                    ConcreteDataType::timestamp_millisecond_datatype(),
1491                ),
1492                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1493                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1494            ],
1495            &[1],
1496        ));
1497
1498        let expected_metadata = Arc::new(new_metadata(
1499            &[
1500                (
1501                    0,
1502                    SemanticType::Timestamp,
1503                    ConcreteDataType::timestamp_millisecond_datatype(),
1504                ),
1505                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1506                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1507                // Adds a new field.
1508                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1509            ],
1510            &[1],
1511        ));
1512
1513        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1514        let read_format = FlatReadFormat::new(
1515            actual_metadata.clone(),
1516            [0, 1, 2, 3].into_iter(),
1517            None,
1518            "test",
1519            false,
1520        )
1521        .unwrap();
1522        let format_projection = read_format.format_projection();
1523
1524        let compat_batch =
1525            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1526                .unwrap()
1527                .unwrap();
1528
1529        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1530        tag_builder.append_value("tag1");
1531        tag_builder.append_value("tag1");
1532        let tag_dict_array = Arc::new(tag_builder.finish());
1533
1534        let k1 = encode_key(&[Some("tag1")]);
1535        let input_columns: Vec<ArrayRef> = vec![
1536            tag_dict_array.clone(),
1537            Arc::new(Int64Array::from(vec![100, 200])),
1538            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1539            build_flat_test_pk_array(&[&k1, &k1]),
1540            Arc::new(UInt64Array::from_iter_values([1, 2])),
1541            Arc::new(UInt8Array::from_iter_values([
1542                OpType::Put as u8,
1543                OpType::Put as u8,
1544            ])),
1545        ];
1546        let input_schema =
1547            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1548        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1549
1550        let result = compat_batch.compat(input_batch).unwrap();
1551
1552        let expected_schema =
1553            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1554
1555        let expected_columns: Vec<ArrayRef> = vec![
1556            tag_dict_array.clone(),
1557            Arc::new(Int64Array::from(vec![100, 200])),
1558            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1559            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1560            build_flat_test_pk_array(&[&k1, &k1]),
1561            Arc::new(UInt64Array::from_iter_values([1, 2])),
1562            Arc::new(UInt8Array::from_iter_values([
1563                OpType::Put as u8,
1564                OpType::Put as u8,
1565            ])),
1566        ];
1567        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1568
1569        assert_eq!(expected_batch, result);
1570    }
1571
1572    #[test]
1573    fn test_flat_compat_batch_with_different_pk_encoding() {
1574        let mut actual_metadata = new_metadata(
1575            &[
1576                (
1577                    0,
1578                    SemanticType::Timestamp,
1579                    ConcreteDataType::timestamp_millisecond_datatype(),
1580                ),
1581                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1582                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1583            ],
1584            &[1],
1585        );
1586        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1587        let actual_metadata = Arc::new(actual_metadata);
1588
1589        let mut expected_metadata = new_metadata(
1590            &[
1591                (
1592                    0,
1593                    SemanticType::Timestamp,
1594                    ConcreteDataType::timestamp_millisecond_datatype(),
1595                ),
1596                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1597                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1598                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1599            ],
1600            &[1, 3],
1601        );
1602        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1603        let expected_metadata = Arc::new(expected_metadata);
1604
1605        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1606        let read_format = FlatReadFormat::new(
1607            actual_metadata.clone(),
1608            [0, 1, 2, 3].into_iter(),
1609            None,
1610            "test",
1611            false,
1612        )
1613        .unwrap();
1614        let format_projection = read_format.format_projection();
1615
1616        let compat_batch =
1617            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1618                .unwrap()
1619                .unwrap();
1620
1621        // Tag array.
1622        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1623        tag1_builder.append_value("tag1");
1624        tag1_builder.append_value("tag1");
1625        let tag1_dict_array = Arc::new(tag1_builder.finish());
1626
1627        let k1 = encode_key(&[Some("tag1")]);
1628        let input_columns: Vec<ArrayRef> = vec![
1629            tag1_dict_array.clone(),
1630            Arc::new(Int64Array::from(vec![100, 200])),
1631            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1632            build_flat_test_pk_array(&[&k1, &k1]),
1633            Arc::new(UInt64Array::from_iter_values([1, 2])),
1634            Arc::new(UInt8Array::from_iter_values([
1635                OpType::Put as u8,
1636                OpType::Put as u8,
1637            ])),
1638        ];
1639        let input_schema =
1640            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1641        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1642
1643        let result = compat_batch.compat(input_batch).unwrap();
1644
1645        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1646        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1647        null_tag_builder.append_nulls(2);
1648        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1649        let expected_columns: Vec<ArrayRef> = vec![
1650            tag1_dict_array.clone(),
1651            null_tag_dict_array,
1652            Arc::new(Int64Array::from(vec![100, 200])),
1653            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1654            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1655            Arc::new(UInt64Array::from_iter_values([1, 2])),
1656            Arc::new(UInt8Array::from_iter_values([
1657                OpType::Put as u8,
1658                OpType::Put as u8,
1659            ])),
1660        ];
1661        let output_schema =
1662            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1663        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1664
1665        assert_eq!(expected_batch, result);
1666    }
1667
1668    #[test]
1669    fn test_flat_compat_batch_compact_sparse() {
1670        let mut actual_metadata = new_metadata(
1671            &[
1672                (
1673                    0,
1674                    SemanticType::Timestamp,
1675                    ConcreteDataType::timestamp_millisecond_datatype(),
1676                ),
1677                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1678            ],
1679            &[],
1680        );
1681        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1682        let actual_metadata = Arc::new(actual_metadata);
1683
1684        let mut expected_metadata = new_metadata(
1685            &[
1686                (
1687                    0,
1688                    SemanticType::Timestamp,
1689                    ConcreteDataType::timestamp_millisecond_datatype(),
1690                ),
1691                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1692                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1693            ],
1694            &[],
1695        );
1696        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1697        let expected_metadata = Arc::new(expected_metadata);
1698
1699        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1700        let read_format = FlatReadFormat::new(
1701            actual_metadata.clone(),
1702            [0, 2, 3].into_iter(),
1703            None,
1704            "test",
1705            true,
1706        )
1707        .unwrap();
1708        let format_projection = read_format.format_projection();
1709
1710        let compat_batch =
1711            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1712                .unwrap()
1713                .unwrap();
1714
1715        let sparse_k1 = encode_sparse_key(&[]);
1716        let input_columns: Vec<ArrayRef> = vec![
1717            Arc::new(Int64Array::from(vec![100, 200])),
1718            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1719            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1720            Arc::new(UInt64Array::from_iter_values([1, 2])),
1721            Arc::new(UInt8Array::from_iter_values([
1722                OpType::Put as u8,
1723                OpType::Put as u8,
1724            ])),
1725        ];
1726        let input_schema =
1727            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1728        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1729
1730        let result = compat_batch.compat(input_batch).unwrap();
1731
1732        let expected_columns: Vec<ArrayRef> = vec![
1733            Arc::new(Int64Array::from(vec![100, 200])),
1734            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1735            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1736            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1737            Arc::new(UInt64Array::from_iter_values([1, 2])),
1738            Arc::new(UInt8Array::from_iter_values([
1739                OpType::Put as u8,
1740                OpType::Put as u8,
1741            ])),
1742        ];
1743        let output_schema =
1744            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1745        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1746
1747        assert_eq!(expected_batch, result);
1748    }
1749}