Skip to main content

mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22    Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use datatypes::vectors::json::array::JsonArray;
32use mito_codec::row_converter::{
33    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34    build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42    CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
43    EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
44};
45use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
46use crate::sst::parquet::flat_format::primary_key_column_index;
47use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
48use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
49
50/// Returns true if `left` and `right` have same columns and primary key encoding.
51pub(crate) fn has_same_columns_and_pk_encoding(
52    left: &RegionMetadata,
53    right: &RegionMetadata,
54) -> bool {
55    if left.primary_key_encoding != right.primary_key_encoding {
56        return false;
57    }
58
59    if left.column_metadatas.len() != right.column_metadatas.len() {
60        return false;
61    }
62
63    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
64        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
65            return false;
66        }
67        debug_assert_eq!(
68            left_col.column_schema.data_type,
69            right_col.column_schema.data_type
70        );
71        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
72    }
73
74    true
75}
76
77/// A helper struct to adapt schema of the batch to an expected schema.
78pub(crate) struct FlatCompatBatch {
79    /// Indices to convert actual fields to expect fields.
80    index_or_defaults: Vec<IndexOrDefault>,
81    /// Expected arrow schema.
82    arrow_schema: SchemaRef,
83    /// Primary key adapter.
84    compat_pk: FlatCompatPrimaryKey,
85}
86
87impl FlatCompatBatch {
88    /// Creates a [FlatCompatBatch].
89    ///
90    /// - `mapper` is built from the metadata users expect to see.
91    /// - `actual` is the [RegionMetadata] of the input parquet.
92    /// - `format_projection` is the projection of the read format for the input parquet.
93    /// - `compaction` indicates whether the reader is for compaction.
94    pub(crate) fn try_new(
95        mapper: &FlatProjectionMapper,
96        actual: &RegionMetadataRef,
97        format_projection: &FormatProjection,
98        compaction: bool,
99    ) -> Result<Option<Self>> {
100        let actual_schema = flat_projected_columns(actual, format_projection);
101        let expect_schema = mapper.batch_schema();
102        if expect_schema == actual_schema {
103            // Although the SST has a different schema, but the schema after projection is the same
104            // as expected schema.
105            return Ok(None);
106        }
107
108        if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
109            // Special handling for sparse encoding in compaction.
110            return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
111        }
112
113        let (index_or_defaults, fields) =
114            Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
115
116        let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
117
118        Ok(Some(Self {
119            index_or_defaults,
120            arrow_schema: Arc::new(Schema::new(fields)),
121            compat_pk,
122        }))
123    }
124
125    fn compute_index_and_fields(
126        actual_schema: &[(ColumnId, ConcreteDataType)],
127        expect_schema: &[(ColumnId, ConcreteDataType)],
128        expect_metadata: &RegionMetadata,
129    ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
130        // Maps column id to the index and data type in the actual schema.
131        let actual_schema_index: HashMap<_, _> = actual_schema
132            .iter()
133            .enumerate()
134            .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
135            .collect();
136
137        let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
138        let mut fields = Vec::with_capacity(expect_schema.len());
139        for (column_id, expect_data_type) in expect_schema {
140            // Safety: expect_schema comes from the same mapper.
141            let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
142            let expect_column = &expect_metadata.column_metadatas[column_index];
143            let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
144            // For tag columns, we need to create a dictionary field.
145            if expect_column.semantic_type == SemanticType::Tag {
146                let field = tag_maybe_to_dictionary_field(
147                    &expect_column.column_schema.data_type,
148                    column_field,
149                );
150                fields.push(Arc::new(with_field_id(
151                    (*field).clone(),
152                    expect_column.column_id,
153                )));
154            } else {
155                fields.push(Arc::new(with_field_id(
156                    (**column_field).clone(),
157                    expect_column.column_id,
158                )));
159            };
160
161            if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
162                let mut cast_type = None;
163
164                // Same column different type.
165                if expect_data_type != *actual_data_type {
166                    cast_type = Some(expect_data_type.clone())
167                }
168                // Source has this column.
169                index_or_defaults.push(IndexOrDefault::Index {
170                    pos: *index,
171                    cast_type,
172                });
173            } else {
174                // Create a default vector with 1 element for that column.
175                let default_vector = expect_column
176                    .column_schema
177                    .create_default_vector(1)
178                    .context(CreateDefaultSnafu {
179                        region_id: expect_metadata.region_id,
180                        column: &expect_column.column_schema.name,
181                    })?
182                    .with_context(|| CompatReaderSnafu {
183                        region_id: expect_metadata.region_id,
184                        reason: format!(
185                            "column {} does not have a default value to read",
186                            expect_column.column_schema.name
187                        ),
188                    })?;
189                index_or_defaults.push(IndexOrDefault::DefaultValue {
190                    default_vector,
191                    semantic_type: expect_column.semantic_type,
192                });
193            };
194        }
195        fields.extend_from_slice(&internal_fields());
196
197        Ok((index_or_defaults, fields))
198    }
199
200    fn try_new_compact_sparse(
201        mapper: &FlatProjectionMapper,
202        actual: &RegionMetadataRef,
203    ) -> Result<Option<Self>> {
204        // Currently, we don't support converting sparse encoding back to dense encoding in
205        // flat format.
206        ensure!(
207            mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
208            UnsupportedOperationSnafu {
209                err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
210            }
211        );
212
213        // For sparse encoding, we don't need to check the primary keys.
214        // Since this is for compaction, we always read all columns.
215        let actual_schema: Vec<_> = actual
216            .field_columns()
217            .chain([actual.time_index_column()])
218            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
219            .collect();
220        let expect_schema: Vec<_> = mapper
221            .metadata()
222            .field_columns()
223            .chain([mapper.metadata().time_index_column()])
224            .map(|col| (col.column_id, col.column_schema.data_type.clone()))
225            .collect();
226
227        let (index_or_defaults, fields) =
228            Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
229
230        let compat_pk = FlatCompatPrimaryKey::default();
231
232        Ok(Some(Self {
233            index_or_defaults,
234            arrow_schema: Arc::new(Schema::new(fields)),
235            compat_pk,
236        }))
237    }
238
239    /// Make columns of the `batch` compatible.
240    pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
241        let len = batch.num_rows();
242        let columns = self
243            .index_or_defaults
244            .iter()
245            .map(|index_or_default| match index_or_default {
246                IndexOrDefault::Index { pos, cast_type } => {
247                    let old_column = batch.column(*pos);
248
249                    if let Some(ty) = cast_type {
250                        let casted = if let Some(json_type) = ty.as_json()
251                            && json_type.is_json2()
252                        {
253                            JsonArray::from(old_column)
254                                .try_align(&json_type.as_arrow_type())
255                                .context(ConvertValueSnafu)?
256                        } else {
257                            datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
258                                .context(ComputeArrowSnafu)?
259                        };
260                        Ok(casted)
261                    } else {
262                        Ok(old_column.clone())
263                    }
264                }
265                IndexOrDefault::DefaultValue {
266                    default_vector,
267                    semantic_type,
268                } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
269            })
270            .chain(
271                // Adds internal columns.
272                batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
273                    .iter()
274                    .map(|col| Ok(col.clone())),
275            )
276            .collect::<Result<Vec<_>>>()?;
277
278        let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
279            .context(NewRecordBatchSnafu)?;
280
281        // Handles primary keys.
282        self.compat_pk.compat(compat_batch)
283    }
284}
285
286/// Repeats the vector value `to_len` times.
287fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
288    assert_eq!(1, vector.len());
289    let data_type = vector.data_type();
290    if is_tag && data_type.is_string() {
291        let values = vector.to_arrow_array();
292        if values.is_null(0) {
293            // Creates a dictionary array with `to_len` null keys.
294            let keys = UInt32Array::new_null(to_len);
295            Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
296        } else {
297            let keys = UInt32Array::from_value(0, to_len);
298            Ok(Arc::new(DictionaryArray::new(keys, values)))
299        }
300    } else {
301        let keys = UInt32Array::from_value(0, to_len);
302        take(
303            &vector.to_arrow_array(),
304            &keys,
305            Some(TakeOptions {
306                check_bounds: false,
307            }),
308        )
309        .context(ComputeArrowSnafu)
310    }
311}
312
313/// Returns true if the actual primary keys is the same as expected.
314fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
315    ensure!(
316        actual.primary_key.len() <= expect.primary_key.len(),
317        CompatReaderSnafu {
318            region_id: expect.region_id,
319            reason: format!(
320                "primary key has more columns {} than expect {}",
321                actual.primary_key.len(),
322                expect.primary_key.len()
323            ),
324        }
325    );
326    ensure!(
327        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
328        CompatReaderSnafu {
329            region_id: expect.region_id,
330            reason: format!(
331                "primary key has different prefix, expect: {:?}, actual: {:?}",
332                expect.primary_key, actual.primary_key
333            ),
334        }
335    );
336
337    Ok(actual.primary_key.len() == expect.primary_key.len())
338}
339
340/// Index in source batch or a default value to fill a column.
341#[derive(Debug)]
342enum IndexOrDefault {
343    /// Index of the column in source batch.
344    Index {
345        pos: usize,
346        cast_type: Option<ConcreteDataType>,
347    },
348    /// Default value for the column.
349    DefaultValue {
350        /// Default value. The vector has only 1 element.
351        default_vector: VectorRef,
352        /// Semantic type of the column.
353        semantic_type: SemanticType,
354    },
355}
356
357/// Helper to rewrite primary key to another encoding for flat format.
358struct FlatRewritePrimaryKey {
359    /// New primary key encoder.
360    codec: Arc<dyn PrimaryKeyCodec>,
361    /// Metadata of the expected region.
362    metadata: RegionMetadataRef,
363    /// Original primary key codec.
364    /// If we need to rewrite the primary key.
365    old_codec: Arc<dyn PrimaryKeyCodec>,
366}
367
368impl FlatRewritePrimaryKey {
369    fn new(
370        expect: &RegionMetadataRef,
371        actual: &RegionMetadataRef,
372    ) -> Option<FlatRewritePrimaryKey> {
373        if expect.primary_key_encoding == actual.primary_key_encoding {
374            return None;
375        }
376        let codec = build_primary_key_codec(expect);
377        let old_codec = build_primary_key_codec(actual);
378
379        Some(FlatRewritePrimaryKey {
380            codec,
381            metadata: expect.clone(),
382            old_codec,
383        })
384    }
385
386    /// Rewrites the primary key of the `batch`.
387    /// It also appends the values to the primary key.
388    fn rewrite_key(
389        &self,
390        append_values: &[(ColumnId, Value)],
391        batch: RecordBatch,
392    ) -> Result<RecordBatch> {
393        let old_pk_dict_array = batch
394            .column(primary_key_column_index(batch.num_columns()))
395            .as_any()
396            .downcast_ref::<PrimaryKeyArray>()
397            .unwrap();
398        let old_pk_values_array = old_pk_dict_array
399            .values()
400            .as_any()
401            .downcast_ref::<BinaryArray>()
402            .unwrap();
403        let mut builder = BinaryBuilder::with_capacity(
404            old_pk_values_array.len(),
405            old_pk_values_array.value_data().len(),
406        );
407
408        // Binary buffer for the primary key.
409        let mut buffer = Vec::with_capacity(
410            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
411        );
412        let mut column_id_values = Vec::new();
413        // Iterates the binary array and rewrites the primary key.
414        for value in old_pk_values_array.iter() {
415            let Some(old_pk) = value else {
416                builder.append_null();
417                continue;
418            };
419            // Decodes the old primary key.
420            let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
421            pk_values.extend(append_values);
422
423            buffer.clear();
424            column_id_values.clear();
425            // Encodes the new primary key.
426            match pk_values {
427                CompositeValues::Dense(dense_values) => {
428                    self.codec
429                        .encode_values(dense_values.as_slice(), &mut buffer)
430                        .context(EncodeSnafu)?;
431                }
432                CompositeValues::Sparse(sparse_values) => {
433                    for id in &self.metadata.primary_key {
434                        let value = sparse_values.get_or_null(*id);
435                        column_id_values.push((*id, value.clone()));
436                    }
437                    self.codec
438                        .encode_values(&column_id_values, &mut buffer)
439                        .context(EncodeSnafu)?;
440                }
441            }
442            builder.append_value(&buffer);
443        }
444        let new_pk_values_array = Arc::new(builder.finish());
445        let new_pk_dict_array =
446            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
447
448        let mut columns = batch.columns().to_vec();
449        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
450
451        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
452    }
453}
454
455/// Helper to make primary key compatible for flat format.
456#[derive(Default)]
457struct FlatCompatPrimaryKey {
458    /// Primary key rewriter.
459    rewriter: Option<FlatRewritePrimaryKey>,
460    /// Converter to append values to primary keys.
461    converter: Option<Arc<dyn PrimaryKeyCodec>>,
462    /// Default values to append.
463    values: Vec<(ColumnId, Value)>,
464}
465
466impl FlatCompatPrimaryKey {
467    fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
468        let rewriter = FlatRewritePrimaryKey::new(expect, actual);
469
470        if is_primary_key_same(expect, actual)? {
471            return Ok(Self {
472                rewriter,
473                converter: None,
474                values: Vec::new(),
475            });
476        }
477
478        // We need to append default values to the primary key.
479        let to_add = &expect.primary_key[actual.primary_key.len()..];
480        let mut values = Vec::with_capacity(to_add.len());
481        let mut fields = Vec::with_capacity(to_add.len());
482        for column_id in to_add {
483            // Safety: The id comes from expect region metadata.
484            let column = expect.column_by_id(*column_id).unwrap();
485            fields.push((
486                *column_id,
487                SortField::new(column.column_schema.data_type.clone()),
488            ));
489            let default_value = column
490                .column_schema
491                .create_default()
492                .context(CreateDefaultSnafu {
493                    region_id: expect.region_id,
494                    column: &column.column_schema.name,
495                })?
496                .with_context(|| CompatReaderSnafu {
497                    region_id: expect.region_id,
498                    reason: format!(
499                        "key column {} does not have a default value to read",
500                        column.column_schema.name
501                    ),
502                })?;
503            values.push((*column_id, default_value));
504        }
505        // is_primary_key_same() is false so we have different number of primary key columns.
506        debug_assert!(!fields.is_empty());
507
508        // Create converter to append values.
509        let converter = Some(build_primary_key_codec_with_fields(
510            expect.primary_key_encoding,
511            fields.into_iter(),
512        ));
513
514        Ok(Self {
515            rewriter,
516            converter,
517            values,
518        })
519    }
520
521    /// Makes primary key of the `batch` compatible.
522    ///
523    /// Callers must ensure other columns except the `__primary_key` column is compatible.
524    fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
525        if let Some(rewriter) = &self.rewriter {
526            // If we have different encoding, rewrite the whole primary key.
527            return rewriter.rewrite_key(&self.values, batch);
528        }
529
530        self.append_key(batch)
531    }
532
533    /// Appends values to the primary key of the `batch`.
534    fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
535        let Some(converter) = &self.converter else {
536            return Ok(batch);
537        };
538
539        let old_pk_dict_array = batch
540            .column(primary_key_column_index(batch.num_columns()))
541            .as_any()
542            .downcast_ref::<PrimaryKeyArray>()
543            .unwrap();
544        let old_pk_values_array = old_pk_dict_array
545            .values()
546            .as_any()
547            .downcast_ref::<BinaryArray>()
548            .unwrap();
549        let mut builder = BinaryBuilder::with_capacity(
550            old_pk_values_array.len(),
551            old_pk_values_array.value_data().len()
552                + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
553        );
554
555        // Binary buffer for the primary key.
556        let mut buffer = Vec::with_capacity(
557            old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
558                + converter.estimated_size().unwrap_or_default(),
559        );
560
561        // Iterates the binary array and appends values to the primary key.
562        for value in old_pk_values_array.iter() {
563            let Some(old_pk) = value else {
564                builder.append_null();
565                continue;
566            };
567
568            buffer.clear();
569            buffer.extend_from_slice(old_pk);
570            converter
571                .encode_values(&self.values, &mut buffer)
572                .context(EncodeSnafu)?;
573
574            builder.append_value(&buffer);
575        }
576
577        let new_pk_values_array = Arc::new(builder.finish());
578        let new_pk_dict_array =
579            PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
580
581        // Overrides the primary key column.
582        let mut columns = batch.columns().to_vec();
583        columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
584
585        RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
586    }
587}
588
589#[cfg(test)]
590mod tests {
591    use std::sync::Arc;
592
593    use api::v1::{OpType, SemanticType};
594    use datatypes::arrow::array::{
595        ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
596        TimestampMillisecondArray, UInt8Array, UInt64Array,
597    };
598    use datatypes::arrow::datatypes::UInt32Type;
599    use datatypes::arrow::record_batch::RecordBatch;
600    use datatypes::prelude::ConcreteDataType;
601    use datatypes::schema::ColumnSchema;
602    use datatypes::value::ValueRef;
603    use mito_codec::row_converter::{
604        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
605    };
606    use store_api::codec::PrimaryKeyEncoding;
607    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
608    use store_api::storage::RegionId;
609
610    use super::*;
611    use crate::read::flat_projection::FlatProjectionMapper;
612    use crate::read::read_columns::ReadColumns;
613    use crate::sst::parquet::flat_format::FlatReadFormat;
614    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
615
616    /// Creates a new [RegionMetadata].
617    fn new_metadata(
618        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
619        primary_key: &[ColumnId],
620    ) -> RegionMetadata {
621        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
622        for (id, semantic_type, data_type) in semantic_types {
623            let column_schema = match semantic_type {
624                SemanticType::Tag => {
625                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
626                }
627                SemanticType::Field => {
628                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
629                }
630                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
631            };
632
633            builder.push_column_metadata(ColumnMetadata {
634                column_schema,
635                semantic_type: *semantic_type,
636                column_id: *id,
637            });
638        }
639        builder.primary_key(primary_key.to_vec());
640        builder.build().unwrap()
641    }
642
643    /// Encode primary key.
644    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
645        let fields = (0..keys.len())
646            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
647            .collect();
648        let converter = DensePrimaryKeyCodec::with_fields(fields);
649        let row = keys.iter().map(|str_opt| match str_opt {
650            Some(v) => ValueRef::String(v),
651            None => ValueRef::Null,
652        });
653
654        converter.encode(row).unwrap()
655    }
656
657    /// Encode sparse primary key.
658    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
659        let fields = (0..keys.len())
660            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
661            .collect();
662        let converter = SparsePrimaryKeyCodec::with_fields(fields);
663        let row = keys
664            .iter()
665            .map(|(id, str_opt)| match str_opt {
666                Some(v) => (*id, ValueRef::String(v)),
667                None => (*id, ValueRef::Null),
668            })
669            .collect::<Vec<_>>();
670        let mut buffer = vec![];
671        converter.encode_value_refs(&row, &mut buffer).unwrap();
672        buffer
673    }
674
675    /// Creates a primary key array for flat format testing.
676    fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
677        let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
678        for &pk in primary_keys {
679            builder.append(pk).unwrap();
680        }
681        Arc::new(builder.finish())
682    }
683
684    #[test]
685    fn test_flat_compat_batch_with_missing_columns() {
686        let actual_metadata = Arc::new(new_metadata(
687            &[
688                (
689                    0,
690                    SemanticType::Timestamp,
691                    ConcreteDataType::timestamp_millisecond_datatype(),
692                ),
693                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
694                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
695            ],
696            &[1],
697        ));
698
699        let expected_metadata = Arc::new(new_metadata(
700            &[
701                (
702                    0,
703                    SemanticType::Timestamp,
704                    ConcreteDataType::timestamp_millisecond_datatype(),
705                ),
706                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
707                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
708                // Adds a new field.
709                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
710            ],
711            &[1],
712        ));
713
714        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
715        let read_format = FlatReadFormat::new(
716            actual_metadata.clone(),
717            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
718            None,
719            "test",
720            false,
721        )
722        .unwrap();
723        let format_projection = read_format.format_projection();
724
725        let compat_batch =
726            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
727                .unwrap()
728                .unwrap();
729
730        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
731        tag_builder.append_value("tag1");
732        tag_builder.append_value("tag1");
733        let tag_dict_array = Arc::new(tag_builder.finish());
734
735        let k1 = encode_key(&[Some("tag1")]);
736        let input_columns: Vec<ArrayRef> = vec![
737            tag_dict_array.clone(),
738            Arc::new(Int64Array::from(vec![100, 200])),
739            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
740            build_flat_test_pk_array(&[&k1, &k1]),
741            Arc::new(UInt64Array::from_iter_values([1, 2])),
742            Arc::new(UInt8Array::from_iter_values([
743                OpType::Put as u8,
744                OpType::Put as u8,
745            ])),
746        ];
747        let input_schema =
748            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
749        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
750
751        let result = compat_batch.compat(input_batch).unwrap();
752
753        let expected_schema =
754            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
755
756        let expected_columns: Vec<ArrayRef> = vec![
757            tag_dict_array.clone(),
758            Arc::new(Int64Array::from(vec![100, 200])),
759            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
760            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
761            build_flat_test_pk_array(&[&k1, &k1]),
762            Arc::new(UInt64Array::from_iter_values([1, 2])),
763            Arc::new(UInt8Array::from_iter_values([
764                OpType::Put as u8,
765                OpType::Put as u8,
766            ])),
767        ];
768        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
769
770        assert_eq!(expected_batch, result);
771    }
772
773    #[test]
774    fn test_flat_compat_batch_with_read_projection_superset() {
775        let actual_metadata = Arc::new(new_metadata(
776            &[
777                (
778                    0,
779                    SemanticType::Timestamp,
780                    ConcreteDataType::timestamp_millisecond_datatype(),
781                ),
782                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
783                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
784            ],
785            &[1],
786        ));
787
788        let expected_metadata = Arc::new(new_metadata(
789            &[
790                (
791                    0,
792                    SemanticType::Timestamp,
793                    ConcreteDataType::timestamp_millisecond_datatype(),
794                ),
795                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
796                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
797                // Adds a new field.
798                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
799            ],
800            &[1],
801        ));
802
803        let mapper = FlatProjectionMapper::new_with_read_columns(
804            &expected_metadata,
805            vec![1, 2],
806            ReadColumns::from_deduped_column_ids([1, 2, 3]),
807        )
808        .unwrap();
809        let read_format = FlatReadFormat::new(
810            actual_metadata.clone(),
811            ReadColumns::from_deduped_column_ids([1, 2, 3]),
812            None,
813            "test",
814            false,
815        )
816        .unwrap();
817        let format_projection = read_format.format_projection();
818
819        let compat_batch =
820            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
821                .unwrap()
822                .unwrap();
823
824        let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
825        tag_builder.append_value("tag1");
826        tag_builder.append_value("tag1");
827        let tag_dict_array = Arc::new(tag_builder.finish());
828
829        let k1 = encode_key(&[Some("tag1")]);
830        let input_columns: Vec<ArrayRef> = vec![
831            tag_dict_array.clone(),
832            Arc::new(Int64Array::from(vec![100, 200])),
833            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
834            build_flat_test_pk_array(&[&k1, &k1]),
835            Arc::new(UInt64Array::from_iter_values([1, 2])),
836            Arc::new(UInt8Array::from_iter_values([
837                OpType::Put as u8,
838                OpType::Put as u8,
839            ])),
840        ];
841        let input_schema =
842            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
843        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
844
845        let result = compat_batch.compat(input_batch).unwrap();
846
847        let expected_schema =
848            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
849        let expected_columns: Vec<ArrayRef> = vec![
850            tag_dict_array.clone(),
851            Arc::new(Int64Array::from(vec![100, 200])),
852            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
853            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
854            build_flat_test_pk_array(&[&k1, &k1]),
855            Arc::new(UInt64Array::from_iter_values([1, 2])),
856            Arc::new(UInt8Array::from_iter_values([
857                OpType::Put as u8,
858                OpType::Put as u8,
859            ])),
860        ];
861        let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
862
863        assert_eq!(expected_batch, result);
864    }
865
866    #[test]
867    fn test_flat_compat_batch_with_different_pk_encoding() {
868        let mut actual_metadata = new_metadata(
869            &[
870                (
871                    0,
872                    SemanticType::Timestamp,
873                    ConcreteDataType::timestamp_millisecond_datatype(),
874                ),
875                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
876                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
877            ],
878            &[1],
879        );
880        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
881        let actual_metadata = Arc::new(actual_metadata);
882
883        let mut expected_metadata = new_metadata(
884            &[
885                (
886                    0,
887                    SemanticType::Timestamp,
888                    ConcreteDataType::timestamp_millisecond_datatype(),
889                ),
890                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
891                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
892                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
893            ],
894            &[1, 3],
895        );
896        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
897        let expected_metadata = Arc::new(expected_metadata);
898
899        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
900        let read_format = FlatReadFormat::new(
901            actual_metadata.clone(),
902            ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
903            None,
904            "test",
905            false,
906        )
907        .unwrap();
908        let format_projection = read_format.format_projection();
909
910        let compat_batch =
911            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
912                .unwrap()
913                .unwrap();
914
915        // Tag array.
916        let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
917        tag1_builder.append_value("tag1");
918        tag1_builder.append_value("tag1");
919        let tag1_dict_array = Arc::new(tag1_builder.finish());
920
921        let k1 = encode_key(&[Some("tag1")]);
922        let input_columns: Vec<ArrayRef> = vec![
923            tag1_dict_array.clone(),
924            Arc::new(Int64Array::from(vec![100, 200])),
925            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
926            build_flat_test_pk_array(&[&k1, &k1]),
927            Arc::new(UInt64Array::from_iter_values([1, 2])),
928            Arc::new(UInt8Array::from_iter_values([
929                OpType::Put as u8,
930                OpType::Put as u8,
931            ])),
932        ];
933        let input_schema =
934            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
935        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
936
937        let result = compat_batch.compat(input_batch).unwrap();
938
939        let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
940        let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
941        null_tag_builder.append_nulls(2);
942        let null_tag_dict_array = Arc::new(null_tag_builder.finish());
943        let expected_columns: Vec<ArrayRef> = vec![
944            tag1_dict_array.clone(),
945            null_tag_dict_array,
946            Arc::new(Int64Array::from(vec![100, 200])),
947            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
948            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
949            Arc::new(UInt64Array::from_iter_values([1, 2])),
950            Arc::new(UInt8Array::from_iter_values([
951                OpType::Put as u8,
952                OpType::Put as u8,
953            ])),
954        ];
955        let output_schema =
956            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
957        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
958
959        assert_eq!(expected_batch, result);
960    }
961
962    #[test]
963    fn test_flat_compat_batch_compact_sparse() {
964        let mut actual_metadata = new_metadata(
965            &[
966                (
967                    0,
968                    SemanticType::Timestamp,
969                    ConcreteDataType::timestamp_millisecond_datatype(),
970                ),
971                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
972            ],
973            &[],
974        );
975        actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
976        let actual_metadata = Arc::new(actual_metadata);
977
978        let mut expected_metadata = new_metadata(
979            &[
980                (
981                    0,
982                    SemanticType::Timestamp,
983                    ConcreteDataType::timestamp_millisecond_datatype(),
984                ),
985                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
986                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
987            ],
988            &[],
989        );
990        expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
991        let expected_metadata = Arc::new(expected_metadata);
992
993        let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
994        let read_format = FlatReadFormat::new(
995            actual_metadata.clone(),
996            ReadColumns::from_deduped_column_ids([0, 2, 3]),
997            None,
998            "test",
999            true,
1000        )
1001        .unwrap();
1002        let format_projection = read_format.format_projection();
1003
1004        let compat_batch =
1005            FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1006                .unwrap()
1007                .unwrap();
1008
1009        let sparse_k1 = encode_sparse_key(&[]);
1010        let input_columns: Vec<ArrayRef> = vec![
1011            Arc::new(Int64Array::from(vec![100, 200])),
1012            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1013            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1014            Arc::new(UInt64Array::from_iter_values([1, 2])),
1015            Arc::new(UInt8Array::from_iter_values([
1016                OpType::Put as u8,
1017                OpType::Put as u8,
1018            ])),
1019        ];
1020        let input_schema =
1021            to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1022        let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1023
1024        let result = compat_batch.compat(input_batch).unwrap();
1025
1026        let expected_columns: Vec<ArrayRef> = vec![
1027            Arc::new(Int64Array::from(vec![100, 200])),
1028            Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1029            Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1030            build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1031            Arc::new(UInt64Array::from_iter_values([1, 2])),
1032            Arc::new(UInt8Array::from_iter_values([
1033                OpType::Put as u8,
1034                OpType::Put as u8,
1035            ])),
1036        ];
1037        let output_schema =
1038            to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1039        let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1040
1041        assert_eq!(expected_batch, result);
1042    }
1043}