Skip to main content

mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::extension::json::is_structured_json_field;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::VectorRef;
32use datatypes::vectors::json::array::JsonArray;
33use mito_codec::row_converter::{
34    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
35    build_primary_key_codec_with_fields,
36};
37use snafu::{OptionExt, ResultExt, ensure};
38use store_api::codec::PrimaryKeyEncoding;
39use store_api::metadata::{RegionMetadata, RegionMetadataRef};
40use store_api::storage::ColumnId;
41
42use crate::error::{
43    CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
44    EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
48use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
50
51/// Returns true if the columns in the `projection_mapper` and `read_format` have same data types
52/// and primary key encodings.
53pub(crate) fn has_same_columns_and_pk_encoding(
54    projection_mapper: &FlatProjectionMapper,
55    read_format: &FlatReadFormat,
56    compaction: bool,
57) -> bool {
58    let left = projection_mapper.metadata();
59    let right = read_format.metadata();
60    if left.primary_key_encoding != right.primary_key_encoding {
61        return false;
62    }
63
64    if left.column_metadatas.len() != right.column_metadatas.len() {
65        return false;
66    }
67
68    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
69        if left_col.column_id != right_col.column_id {
70            return false;
71        }
72        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
73    }
74
75    &projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema()
76}
77
78/// A helper struct to adapt schema of the batch to an expected schema.
79pub(crate) struct FlatCompatBatch {
80    /// Indices to convert actual fields to expect fields.
81    index_or_defaults: Vec<IndexOrDefault>,
82    /// Expected arrow schema.
83    arrow_schema: SchemaRef,
84    /// Primary key adapter.
85    compat_pk: FlatCompatPrimaryKey,
86}
87
88impl FlatCompatBatch {
89    /// Creates a [FlatCompatBatch].
90    ///
91    /// - `mapper` is built from the metadata users expect to see.
92    /// - `read_format` is the [FlatReadFormat] of the input parquet.
93    /// - `compaction` indicates whether the reader is for compaction.
94    pub(crate) fn try_new(
95        mapper: &FlatProjectionMapper,
96        read_format: &FlatReadFormat,
97        compaction: bool,
98    ) -> Result<Option<Self>> {
99        let actual = read_format.metadata();
100        let format_projection = read_format.format_projection();
101        let mut actual_schema = flat_projected_columns(actual, format_projection);
102        if read_format
103            .arrow_schema()
104            .fields()
105            .iter()
106            .any(is_structured_json_field)
107        {
108            for field in read_format.arrow_schema().fields() {
109                if is_structured_json_field(field)
110                    && let Some(column_id) =
111                        actual.column_by_name(field.name()).map(|x| x.column_id)
112                    && let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)
113                {
114                    actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type());
115                }
116            }
117        }
118
119        let expect_schema = mapper.batch_schema();
120        if expect_schema == actual_schema {
121            // Although the SST has a different schema, but the schema after projection is the same
122            // as expected schema.
123            return Ok(None);
124        }
125
126        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
127            // Special handling for sparse encoding in compaction.
128            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
129        }
130
131        let (index_or_defaults, fields) =
132            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
133
134        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
135
136        Ok(Some(Self {
137            index_or_defaults,
138            arrow_schema: Arc::new(Schema::new(fields)),
139            compat_pk,
140        }))
141    }
142
143    fn compute_index_and_fields(
144        actual_schema: &[(ColumnId, ConcreteDataType)],
145        expect_schema: &[(ColumnId, ConcreteDataType)],
146        expect_metadata: &RegionMetadata,
147    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
148        // Maps column id to the index and data type in the actual schema.
149        let actual_schema_index: HashMap<_, _> = actual_schema
150            .iter()
151            .enumerate()
152            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
153            .collect();
154
155        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
156        let mut fields = Vec::with_capacity(expect_schema.len());
157        for (column_id, expect_data_type) in expect_schema {
158            // Safety: expect_schema comes from the same mapper.
159            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
160            let expect_column = &expect_metadata.column_metadatas[column_index];
161            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
162            // For tag columns, we need to create a dictionary field.
163            if expect_column.semantic_type == SemanticType::Tag {
164                let field = tag_maybe_to_dictionary_field(
165                    &expect_column.column_schema.data_type,
166                    column_field,
167                );
168                fields.push(Arc::new(with_field_id(
169                    (*field).clone(),
170                    expect_column.column_id,
171                )));
172            } else {
173                let field = with_field_id(
174                    Arc::unwrap_or_clone(column_field.clone()),
175                    expect_column.column_id,
176                )
177                .with_data_type(expect_data_type.as_arrow_type());
178                fields.push(Arc::new(field))
179            };
180
181            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
182                let mut cast_type = None;
183
184                // Same column different type.
185                if expect_data_type != *actual_data_type {
186                    cast_type = Some(expect_data_type.clone())
187                }
188                // Source has this column.
189                index_or_defaults.push(IndexOrDefault::Index {
190                    pos: *index,
191                    cast_type,
192                });
193            } else {
194                // Create a default vector with 1 element for that column.
195                let default_vector = expect_column
196                    .column_schema
197                    .create_default_vector(1)
198                    .context(CreateDefaultSnafu {
199                        region_id: expect_metadata.region_id,
200                        column: &expect_column.column_schema.name,
201                    })?
202                    .with_context(|| CompatReaderSnafu {
203                        region_id: expect_metadata.region_id,
204                        reason: format!(
205                            "column {} does not have a default value to read",
206                            expect_column.column_schema.name
207                        ),
208                    })?;
209                index_or_defaults.push(IndexOrDefault::DefaultValue {
210                    default_vector,
211                    semantic_type: expect_column.semantic_type,
212                });
213            };
214        }
215        fields.extend_from_slice(&internal_fields());
216
217        Ok((index_or_defaults, fields))
218    }
219
220    fn try_new_compact_sparse(
221        mapper: &FlatProjectionMapper,
222        actual: &RegionMetadataRef,
223    ) -> Result<Option<Self>> {
224        // Currently, we don't support converting sparse encoding back to dense encoding in
225        // flat format.
226        ensure!(
227            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
228            UnsupportedOperationSnafu {
229                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
230            }
231        );
232
233        // For sparse encoding, we don't need to check the primary keys.
234        // Since this is for compaction, we always read all columns.
235        let actual_schema: Vec<_> = actual
236            .field_columns()
237            .chain([actual.time_index_column()])
238            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
239            .collect();
240        let expect_schema: Vec<_> = mapper
241            .metadata()
242            .field_columns()
243            .chain([mapper.metadata().time_index_column()])
244            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
245            .collect();
246
247        let (index_or_defaults, fields) =
248            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
249
250        let compat_pk = FlatCompatPrimaryKey::default();
251
252        Ok(Some(Self {
253            index_or_defaults,
254            arrow_schema: Arc::new(Schema::new(fields)),
255            compat_pk,
256        }))
257    }
258
259    /// Make columns of the `batch` compatible.
260    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
261        let len = batch.num_rows();
262        let columns = self
263            .index_or_defaults
264            .iter()
265            .map(|index_or_default| match index_or_default {
266                IndexOrDefault::Index { pos, cast_type } => {
267                    let old_column = batch.column(*pos);
268
269                    if let Some(ty) = cast_type {
270                        let casted = if let Some(json_type) = ty.as_json()
271                            && json_type.is_json2()
272                        {
273                            JsonArray::from(old_column)
274                                .try_align(&json_type.as_arrow_type())
275                                .context(ConvertValueSnafu)?
276                        } else {
277                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
278                                .context(ComputeArrowSnafu)?
279                        };
280                        Ok(casted)
281                    } else {
282                        Ok(old_column.clone())
283                    }
284                }
285                IndexOrDefault::DefaultValue {
286                    default_vector,
287                    semantic_type,
288                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
289            })
290            .chain(
291                // Adds internal columns.
292                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
293                    .iter()
294                    .map(|col| Ok(col.clone())),
295            )
296            .collect::<Result<Vec<_>>>()?;
297
298        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
299            .context(NewRecordBatchSnafu)?;
300
301        // Handles primary keys.
302        self.compat_pk.compat(compat_batch)
303    }
304}
305
306/// Repeats the vector value `to_len` times.
307fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
308    assert_eq!(1, vector.len());
309    let data_type = vector.data_type();
310    if is_tag && data_type.is_string() {
311        let values = vector.to_arrow_array();
312        if values.is_null(0) {
313            // Creates a dictionary array with `to_len` null keys.
314            let keys = UInt32Array::new_null(to_len);
315            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
316        } else {
317            let keys = UInt32Array::from_value(0, to_len);
318            Ok(Arc::new(DictionaryArray::new(keys, values)))
319        }
320    } else {
321        let keys = UInt32Array::from_value(0, to_len);
322        take(
323            &vector.to_arrow_array(),
324            &keys,
325            Some(TakeOptions {
326                check_bounds: false,
327            }),
328        )
329        .context(ComputeArrowSnafu)
330    }
331}
332
333/// Returns true if the actual primary keys is the same as expected.
334fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
335    ensure!(
336        actual.primary_key.len() <= expect.primary_key.len(),
337        CompatReaderSnafu {
338            region_id: expect.region_id,
339            reason: format!(
340                "primary key has more columns {} than expect {}",
341                actual.primary_key.len(),
342                expect.primary_key.len()
343            ),
344        }
345    );
346    ensure!(
347        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
348        CompatReaderSnafu {
349            region_id: expect.region_id,
350            reason: format!(
351                "primary key has different prefix, expect: {:?}, actual: {:?}",
352                expect.primary_key, actual.primary_key
353            ),
354        }
355    );
356
357    Ok(actual.primary_key.len() == expect.primary_key.len())
358}
359
360/// Index in source batch or a default value to fill a column.
361#[derive(Debug)]
362enum IndexOrDefault {
363    /// Index of the column in source batch.
364    Index {
365        pos: usize,
366        cast_type: Option<ConcreteDataType>,
367    },
368    /// Default value for the column.
369    DefaultValue {
370        /// Default value. The vector has only 1 element.
371        default_vector: VectorRef,
372        /// Semantic type of the column.
373        semantic_type: SemanticType,
374    },
375}
376
377/// Helper to rewrite primary key to another encoding for flat format.
378struct FlatRewritePrimaryKey {
379    /// New primary key encoder.
380    codec: Arc<dyn PrimaryKeyCodec>,
381    /// Metadata of the expected region.
382    metadata: RegionMetadataRef,
383    /// Original primary key codec.
384    /// If we need to rewrite the primary key.
385    old_codec: Arc<dyn PrimaryKeyCodec>,
386}
387
388impl FlatRewritePrimaryKey {
389    fn new(
390        expect: &RegionMetadataRef,
391        actual: &RegionMetadataRef,
392    ) -> Option<FlatRewritePrimaryKey> {
393        if expect.primary_key_encoding == actual.primary_key_encoding {
394            return None;
395        }
396        let codec = build_primary_key_codec(expect);
397        let old_codec = build_primary_key_codec(actual);
398
399        Some(FlatRewritePrimaryKey {
400            codec,
401            metadata: expect.clone(),
402            old_codec,
403        })
404    }
405
406    /// Rewrites the primary key of the `batch`.
407    /// It also appends the values to the primary key.
408    fn rewrite_key(
409        &self,
410        append_values: &[(ColumnId, Value)],
411        batch: RecordBatch,
412    ) -> Result<RecordBatch> {
413        let old_pk_dict_array = batch
414            .column(primary_key_column_index(batch.num_columns()))
415            .as_any()
416            .downcast_ref::<PrimaryKeyArray>()
417            .unwrap();
418        let old_pk_values_array = old_pk_dict_array
419            .values()
420            .as_any()
421            .downcast_ref::<BinaryArray>()
422            .unwrap();
423        let mut builder = BinaryBuilder::with_capacity(
424            old_pk_values_array.len(),
425            old_pk_values_array.value_data().len(),
426        );
427
428        // Binary buffer for the primary key.
429        let mut buffer = Vec::with_capacity(
430            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
431        );
432        let mut column_id_values = Vec::new();
433        // Iterates the binary array and rewrites the primary key.
434        for value in old_pk_values_array.iter() {
435            let Some(old_pk) = value else {
436                builder.append_null();
437                continue;
438            };
439            // Decodes the old primary key.
440            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
441            pk_values.extend(append_values);
442
443            buffer.clear();
444            column_id_values.clear();
445            // Encodes the new primary key.
446            match pk_values {
447                CompositeValues::Dense(dense_values) => {
448                    self.codec
449                        .encode_values(dense_values.as_slice(), &mut buffer)
450                        .context(EncodeSnafu)?;
451                }
452                CompositeValues::Sparse(sparse_values) => {
453                    for id in &self.metadata.primary_key {
454                        let value = sparse_values.get_or_null(*id);
455                        column_id_values.push((*id, value.clone()));
456                    }
457                    self.codec
458                        .encode_values(&column_id_values, &mut buffer)
459                        .context(EncodeSnafu)?;
460                }
461            }
462            builder.append_value(&buffer);
463        }
464        let new_pk_values_array = Arc::new(builder.finish());
465        let new_pk_dict_array =
466            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
467
468        let mut columns = batch.columns().to_vec();
469        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
470
471        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
472    }
473}
474
475/// Helper to make primary key compatible for flat format.
476#[derive(Default)]
477struct FlatCompatPrimaryKey {
478    /// Primary key rewriter.
479    rewriter: Option<FlatRewritePrimaryKey>,
480    /// Converter to append values to primary keys.
481    converter: Option<Arc<dyn PrimaryKeyCodec>>,
482    /// Default values to append.
483    values: Vec<(ColumnId, Value)>,
484}
485
486impl FlatCompatPrimaryKey {
487    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
488        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
489
490        if is_primary_key_same(expect, actual)? {
491            return Ok(Self {
492                rewriter,
493                converter: None,
494                values: Vec::new(),
495            });
496        }
497
498        // We need to append default values to the primary key.
499        let to_add = &expect.primary_key[actual.primary_key.len()..];
500        let mut values = Vec::with_capacity(to_add.len());
501        let mut fields = Vec::with_capacity(to_add.len());
502        for column_id in to_add {
503            // Safety: The id comes from expect region metadata.
504            let column = expect.column_by_id(*column_id).unwrap();
505            fields.push((
506                *column_id,
507                SortField::new(column.column_schema.data_type.clone()),
508            ));
509            let default_value = column
510                .column_schema
511                .create_default()
512                .context(CreateDefaultSnafu {
513                    region_id: expect.region_id,
514                    column: &column.column_schema.name,
515                })?
516                .with_context(|| CompatReaderSnafu {
517                    region_id: expect.region_id,
518                    reason: format!(
519                        "key column {} does not have a default value to read",
520                        column.column_schema.name
521                    ),
522                })?;
523            values.push((*column_id, default_value));
524        }
525        // is_primary_key_same() is false so we have different number of primary key columns.
526        debug_assert!(!fields.is_empty());
527
528        // Create converter to append values.
529        let converter = Some(build_primary_key_codec_with_fields(
530            expect.primary_key_encoding,
531            fields.into_iter(),
532        ));
533
534        Ok(Self {
535            rewriter,
536            converter,
537            values,
538        })
539    }
540
541    /// Makes primary key of the `batch` compatible.
542    ///
543    /// Callers must ensure other columns except the `__primary_key` column is compatible.
544    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
545        if let Some(rewriter) = &self.rewriter {
546            // If we have different encoding, rewrite the whole primary key.
547            return rewriter.rewrite_key(&self.values, batch);
548        }
549
550        self.append_key(batch)
551    }
552
553    /// Appends values to the primary key of the `batch`.
554    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
555        let Some(converter) = &self.converter else {
556            return Ok(batch);
557        };
558
559        let old_pk_dict_array = batch
560            .column(primary_key_column_index(batch.num_columns()))
561            .as_any()
562            .downcast_ref::<PrimaryKeyArray>()
563            .unwrap();
564        let old_pk_values_array = old_pk_dict_array
565            .values()
566            .as_any()
567            .downcast_ref::<BinaryArray>()
568            .unwrap();
569        let mut builder = BinaryBuilder::with_capacity(
570            old_pk_values_array.len(),
571            old_pk_values_array.value_data().len()
572                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
573        );
574
575        // Binary buffer for the primary key.
576        let mut buffer = Vec::with_capacity(
577            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
578                + converter.estimated_size().unwrap_or_default(),
579        );
580
581        // Iterates the binary array and appends values to the primary key.
582        for value in old_pk_values_array.iter() {
583            let Some(old_pk) = value else {
584                builder.append_null();
585                continue;
586            };
587
588            buffer.clear();
589            buffer.extend_from_slice(old_pk);
590            converter
591                .encode_values(&self.values, &mut buffer)
592                .context(EncodeSnafu)?;
593
594            builder.append_value(&buffer);
595        }
596
597        let new_pk_values_array = Arc::new(builder.finish());
598        let new_pk_dict_array =
599            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
600
601        // Overrides the primary key column.
602        let mut columns = batch.columns().to_vec();
603        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
604
605        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use std::sync::Arc;
612
613    use api::v1::{OpType, SemanticType};
614    use datatypes::arrow::array::{
615        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
616        TimestampMillisecondArray, UInt8Array, UInt64Array,
617    };
618    use datatypes::arrow::datatypes::UInt32Type;
619    use datatypes::arrow::record_batch::RecordBatch;
620    use datatypes::prelude::ConcreteDataType;
621    use datatypes::schema::ColumnSchema;
622    use datatypes::value::ValueRef;
623    use mito_codec::row_converter::{
624        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
625    };
626    use store_api::codec::PrimaryKeyEncoding;
627    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
628    use store_api::storage::RegionId;
629
630    use super::*;
631    use crate::read::flat_projection::FlatProjectionMapper;
632    use crate::read::read_columns::ReadColumns;
633    use crate::sst::parquet::flat_format::FlatReadFormat;
634    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
635
636    /// Creates a new [RegionMetadata].
637    fn new_metadata(
638        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
639        primary_key: &[ColumnId],
640    ) -> RegionMetadata {
641        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
642        for (id, semantic_type, data_type) in semantic_types {
643            let column_schema = match semantic_type {
644                SemanticType::Tag => {
645                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
646                }
647                SemanticType::Field => {
648                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
649                }
650                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
651            };
652
653            builder.push_column_metadata(ColumnMetadata {
654                column_schema,
655                semantic_type: *semantic_type,
656                column_id: *id,
657            });
658        }
659        builder.primary_key(primary_key.to_vec());
660        builder.build().unwrap()
661    }
662
663    /// Encode primary key.
664    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
665        let fields = (0..keys.len())
666            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
667            .collect();
668        let converter = DensePrimaryKeyCodec::with_fields(fields);
669        let row = keys.iter().map(|str_opt| match str_opt {
670            Some(v) => ValueRef::String(v),
671            None => ValueRef::Null,
672        });
673
674        converter.encode(row).unwrap()
675    }
676
677    /// Encode sparse primary key.
678    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
679        let fields = (0..keys.len())
680            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
681            .collect();
682        let converter = SparsePrimaryKeyCodec::with_fields(fields);
683        let row = keys
684            .iter()
685            .map(|(id, str_opt)| match str_opt {
686                Some(v) => (*id, ValueRef::String(v)),
687                None => (*id, ValueRef::Null),
688            })
689            .collect::<Vec<_>>();
690        let mut buffer = vec![];
691        converter.encode_value_refs(&row, &mut buffer).unwrap();
692        buffer
693    }
694
695    /// Creates a primary key array for flat format testing.
696    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
697        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
698        for &pk in primary_keys {
699            builder.append(pk).unwrap();
700        }
701        Arc::new(builder.finish())
702    }
703
704    #[test]
705    fn test_flat_compat_batch_with_missing_columns() {
706        let actual_metadata = Arc::new(new_metadata(
707            &[
708                (
709                    0,
710                    SemanticType::Timestamp,
711                    ConcreteDataType::timestamp_millisecond_datatype(),
712                ),
713                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
714                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
715            ],
716            &[1],
717        ));
718
719        let expected_metadata = Arc::new(new_metadata(
720            &[
721                (
722                    0,
723                    SemanticType::Timestamp,
724                    ConcreteDataType::timestamp_millisecond_datatype(),
725                ),
726                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
727                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
728                // Adds a new field.
729                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
730            ],
731            &[1],
732        ));
733
734        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
735        let read_format = FlatReadFormat::new(
736            actual_metadata.clone(),
737            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
738            None,
739            "test",
740            false,
741        )
742        .unwrap();
743
744        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
745            .unwrap()
746            .unwrap();
747
748        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
749        tag_builder.append_value("tag1");
750        tag_builder.append_value("tag1");
751        let tag_dict_array = Arc::new(tag_builder.finish());
752
753        let k1 = encode_key(&[Some("tag1")]);
754        let input_columns: Vec<ArrayRef> = vec![
755            tag_dict_array.clone(),
756            Arc::new(Int64Array::from(vec![100, 200])),
757            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
758            build_flat_test_pk_array(&[&k1, &k1]),
759            Arc::new(UInt64Array::from_iter_values([1, 2])),
760            Arc::new(UInt8Array::from_iter_values([
761                OpType::Put as u8,
762                OpType::Put as u8,
763            ])),
764        ];
765        let input_schema =
766            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
767        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
768
769        let result = compat_batch.compat(input_batch).unwrap();
770
771        let expected_schema =
772            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
773
774        let expected_columns: Vec<ArrayRef> = vec![
775            tag_dict_array.clone(),
776            Arc::new(Int64Array::from(vec![100, 200])),
777            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
778            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
779            build_flat_test_pk_array(&[&k1, &k1]),
780            Arc::new(UInt64Array::from_iter_values([1, 2])),
781            Arc::new(UInt8Array::from_iter_values([
782                OpType::Put as u8,
783                OpType::Put as u8,
784            ])),
785        ];
786        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
787
788        assert_eq!(expected_batch, result);
789    }
790
791    #[test]
792    fn test_flat_compat_batch_with_read_projection_superset() {
793        let actual_metadata = Arc::new(new_metadata(
794            &[
795                (
796                    0,
797                    SemanticType::Timestamp,
798                    ConcreteDataType::timestamp_millisecond_datatype(),
799                ),
800                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
801                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
802            ],
803            &[1],
804        ));
805
806        let expected_metadata = Arc::new(new_metadata(
807            &[
808                (
809                    0,
810                    SemanticType::Timestamp,
811                    ConcreteDataType::timestamp_millisecond_datatype(),
812                ),
813                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
814                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
815                // Adds a new field.
816                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
817            ],
818            &[1],
819        ));
820
821        let mapper = FlatProjectionMapper::new_with_read_columns(
822            &expected_metadata,
823            vec![1, 2],
824            ReadColumns::from_deduped_column_ids([1, 2, 3]),
825            None,
826        )
827        .unwrap();
828        let read_format = FlatReadFormat::new(
829            actual_metadata.clone(),
830            ReadColumns::from_deduped_column_ids([1, 2, 3]),
831            None,
832            "test",
833            false,
834        )
835        .unwrap();
836
837        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
838            .unwrap()
839            .unwrap();
840
841        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
842        tag_builder.append_value("tag1");
843        tag_builder.append_value("tag1");
844        let tag_dict_array = Arc::new(tag_builder.finish());
845
846        let k1 = encode_key(&[Some("tag1")]);
847        let input_columns: Vec<ArrayRef> = vec![
848            tag_dict_array.clone(),
849            Arc::new(Int64Array::from(vec![100, 200])),
850            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
851            build_flat_test_pk_array(&[&k1, &k1]),
852            Arc::new(UInt64Array::from_iter_values([1, 2])),
853            Arc::new(UInt8Array::from_iter_values([
854                OpType::Put as u8,
855                OpType::Put as u8,
856            ])),
857        ];
858        let input_schema =
859            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
860        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
861
862        let result = compat_batch.compat(input_batch).unwrap();
863
864        let expected_schema =
865            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
866        let expected_columns: Vec<ArrayRef> = vec![
867            tag_dict_array.clone(),
868            Arc::new(Int64Array::from(vec![100, 200])),
869            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
870            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
871            build_flat_test_pk_array(&[&k1, &k1]),
872            Arc::new(UInt64Array::from_iter_values([1, 2])),
873            Arc::new(UInt8Array::from_iter_values([
874                OpType::Put as u8,
875                OpType::Put as u8,
876            ])),
877        ];
878        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
879
880        assert_eq!(expected_batch, result);
881    }
882
883    #[test]
884    fn test_flat_compat_batch_with_different_pk_encoding() {
885        let mut actual_metadata = new_metadata(
886            &[
887                (
888                    0,
889                    SemanticType::Timestamp,
890                    ConcreteDataType::timestamp_millisecond_datatype(),
891                ),
892                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
893                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
894            ],
895            &[1],
896        );
897        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
898        let actual_metadata = Arc::new(actual_metadata);
899
900        let mut expected_metadata = new_metadata(
901            &[
902                (
903                    0,
904                    SemanticType::Timestamp,
905                    ConcreteDataType::timestamp_millisecond_datatype(),
906                ),
907                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
908                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
909                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
910            ],
911            &[1, 3],
912        );
913        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
914        let expected_metadata = Arc::new(expected_metadata);
915
916        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
917        let read_format = FlatReadFormat::new(
918            actual_metadata.clone(),
919            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
920            None,
921            "test",
922            false,
923        )
924        .unwrap();
925
926        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
927            .unwrap()
928            .unwrap();
929
930        // Tag array.
931        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
932        tag1_builder.append_value("tag1");
933        tag1_builder.append_value("tag1");
934        let tag1_dict_array = Arc::new(tag1_builder.finish());
935
936        let k1 = encode_key(&[Some("tag1")]);
937        let input_columns: Vec<ArrayRef> = vec![
938            tag1_dict_array.clone(),
939            Arc::new(Int64Array::from(vec![100, 200])),
940            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
941            build_flat_test_pk_array(&[&k1, &k1]),
942            Arc::new(UInt64Array::from_iter_values([1, 2])),
943            Arc::new(UInt8Array::from_iter_values([
944                OpType::Put as u8,
945                OpType::Put as u8,
946            ])),
947        ];
948        let input_schema =
949            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
950        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
951
952        let result = compat_batch.compat(input_batch).unwrap();
953
954        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
955        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
956        null_tag_builder.append_nulls(2);
957        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
958        let expected_columns: Vec<ArrayRef> = vec![
959            tag1_dict_array.clone(),
960            null_tag_dict_array,
961            Arc::new(Int64Array::from(vec![100, 200])),
962            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
963            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
964            Arc::new(UInt64Array::from_iter_values([1, 2])),
965            Arc::new(UInt8Array::from_iter_values([
966                OpType::Put as u8,
967                OpType::Put as u8,
968            ])),
969        ];
970        let output_schema =
971            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
972        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
973
974        assert_eq!(expected_batch, result);
975    }
976
977    #[test]
978    fn test_flat_compat_batch_compact_sparse() {
979        let mut actual_metadata = new_metadata(
980            &[
981                (
982                    0,
983                    SemanticType::Timestamp,
984                    ConcreteDataType::timestamp_millisecond_datatype(),
985                ),
986                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
987            ],
988            &[],
989        );
990        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
991        let actual_metadata = Arc::new(actual_metadata);
992
993        let mut expected_metadata = new_metadata(
994            &[
995                (
996                    0,
997                    SemanticType::Timestamp,
998                    ConcreteDataType::timestamp_millisecond_datatype(),
999                ),
1000                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1001                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1002            ],
1003            &[],
1004        );
1005        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1006        let expected_metadata = Arc::new(expected_metadata);
1007
1008        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1009        let read_format = FlatReadFormat::new(
1010            actual_metadata.clone(),
1011            ReadColumns::from_deduped_column_ids([0, 2, 3]),
1012            None,
1013            "test",
1014            true,
1015        )
1016        .unwrap();
1017
1018        let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true)
1019            .unwrap()
1020            .unwrap();
1021
1022        let sparse_k1 = encode_sparse_key(&[]);
1023        let input_columns: Vec<ArrayRef> = vec![
1024            Arc::new(Int64Array::from(vec![100, 200])),
1025            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1026            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1027            Arc::new(UInt64Array::from_iter_values([1, 2])),
1028            Arc::new(UInt8Array::from_iter_values([
1029                OpType::Put as u8,
1030                OpType::Put as u8,
1031            ])),
1032        ];
1033        let input_schema =
1034            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1035        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1036
1037        let result = compat_batch.compat(input_batch).unwrap();
1038
1039        let expected_columns: Vec<ArrayRef> = vec![
1040            Arc::new(Int64Array::from(vec![100, 200])),
1041            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1042            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1043            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1044            Arc::new(UInt64Array::from_iter_values([1, 2])),
1045            Arc::new(UInt8Array::from_iter_values([
1046                OpType::Put as u8,
1047                OpType::Put as u8,
1048            ])),
1049        ];
1050        let output_schema =
1051            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1052        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1053
1054        assert_eq!(expected_batch, result);
1055    }
1056}