mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{
44    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45    build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54    ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
55    NewRecordBatchSnafu, Result,
56};
57use crate::read::{Batch, BatchBuilder, BatchColumn};
58use crate::sst::file::{FileMeta, FileTimeRange};
59use crate::sst::parquet::flat_format::FlatReadFormat;
60use crate::sst::to_sst_arrow_schema;
61
62/// Arrow array type for the primary key dictionary.
63pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
64/// Builder type for primary key dictionary array.
65pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
66
67/// Number of columns that have fixed positions.
68///
69/// Contains: time index and internal columns.
70pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
71/// Number of internal columns.
72pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
73
74/// Helper for writing the SST format with primary key.
75pub(crate) struct PrimaryKeyWriteFormat {
76    metadata: RegionMetadataRef,
77    /// SST file schema.
78    arrow_schema: SchemaRef,
79    override_sequence: Option<SequenceNumber>,
80}
81
82impl PrimaryKeyWriteFormat {
83    /// Creates a new helper.
84    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
85        let arrow_schema = to_sst_arrow_schema(&metadata);
86        PrimaryKeyWriteFormat {
87            metadata,
88            arrow_schema,
89            override_sequence: None,
90        }
91    }
92
93    /// Set override sequence.
94    pub(crate) fn with_override_sequence(
95        mut self,
96        override_sequence: Option<SequenceNumber>,
97    ) -> Self {
98        self.override_sequence = override_sequence;
99        self
100    }
101
102    /// Gets the arrow schema to store in parquet.
103    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
104        &self.arrow_schema
105    }
106
107    /// Convert `batch` to a arrow record batch to store in parquet.
108    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
109        debug_assert_eq!(
110            batch.fields().len() + FIXED_POS_COLUMN_NUM,
111            self.arrow_schema.fields().len()
112        );
113        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
114        // Store all fields first.
115        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
116            ensure!(
117                column.column_id == column_metadata.column_id,
118                InvalidBatchSnafu {
119                    reason: format!(
120                        "Batch has column {} but metadata has column {}",
121                        column.column_id, column_metadata.column_id
122                    ),
123                }
124            );
125
126            columns.push(column.data.to_arrow_array());
127        }
128        // Add time index column.
129        columns.push(batch.timestamps().to_arrow_array());
130        // Add internal columns: primary key, sequences, op types.
131        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
132
133        if let Some(override_sequence) = self.override_sequence {
134            let sequence_array =
135                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
136            columns.push(sequence_array);
137        } else {
138            columns.push(batch.sequences().to_arrow_array());
139        }
140        columns.push(batch.op_types().to_arrow_array());
141
142        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
143    }
144}
145
146/// Helper to read parquet formats.
147pub enum ReadFormat {
148    /// The parquet is in the old primary key format.
149    PrimaryKey(PrimaryKeyReadFormat),
150    /// The parquet is in the new flat format.
151    Flat(FlatReadFormat),
152}
153
154impl ReadFormat {
155    /// Creates a helper to read the primary key format.
156    pub fn new_primary_key(
157        metadata: RegionMetadataRef,
158        column_ids: impl Iterator<Item = ColumnId>,
159    ) -> Self {
160        ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
161    }
162
163    /// Creates a helper to read the flat format.
164    pub fn new_flat(
165        metadata: RegionMetadataRef,
166        column_ids: impl Iterator<Item = ColumnId>,
167        num_columns: Option<usize>,
168        file_path: &str,
169        skip_auto_convert: bool,
170    ) -> Result<Self> {
171        Ok(ReadFormat::Flat(FlatReadFormat::new(
172            metadata,
173            column_ids,
174            num_columns,
175            file_path,
176            skip_auto_convert,
177        )?))
178    }
179
180    /// Creates a new read format.
181    pub fn new(
182        region_metadata: RegionMetadataRef,
183        projection: Option<&[ColumnId]>,
184        flat_format: bool,
185        num_columns: Option<usize>,
186        file_path: &str,
187        skip_auto_convert: bool,
188    ) -> Result<ReadFormat> {
189        if flat_format {
190            if let Some(column_ids) = projection {
191                ReadFormat::new_flat(
192                    region_metadata,
193                    column_ids.iter().copied(),
194                    num_columns,
195                    file_path,
196                    skip_auto_convert,
197                )
198            } else {
199                // No projection, lists all column ids to read.
200                ReadFormat::new_flat(
201                    region_metadata.clone(),
202                    region_metadata
203                        .column_metadatas
204                        .iter()
205                        .map(|col| col.column_id),
206                    num_columns,
207                    file_path,
208                    skip_auto_convert,
209                )
210            }
211        } else if let Some(column_ids) = projection {
212            Ok(ReadFormat::new_primary_key(
213                region_metadata,
214                column_ids.iter().copied(),
215            ))
216        } else {
217            // No projection, lists all column ids to read.
218            Ok(ReadFormat::new_primary_key(
219                region_metadata.clone(),
220                region_metadata
221                    .column_metadatas
222                    .iter()
223                    .map(|col| col.column_id),
224            ))
225        }
226    }
227
228    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
229        match self {
230            ReadFormat::PrimaryKey(format) => Some(format),
231            _ => None,
232        }
233    }
234
235    pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
236        match self {
237            ReadFormat::Flat(format) => Some(format),
238            _ => None,
239        }
240    }
241
242    /// Gets the arrow schema of the SST file.
243    ///
244    /// This schema is computed from the region metadata but should be the same
245    /// as the arrow schema decoded from the file metadata.
246    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
247        match self {
248            ReadFormat::PrimaryKey(format) => format.arrow_schema(),
249            ReadFormat::Flat(format) => format.arrow_schema(),
250        }
251    }
252
253    /// Gets the metadata of the SST.
254    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
255        match self {
256            ReadFormat::PrimaryKey(format) => format.metadata(),
257            ReadFormat::Flat(format) => format.metadata(),
258        }
259    }
260
261    /// Gets sorted projection indices to read.
262    pub(crate) fn projection_indices(&self) -> &[usize] {
263        match self {
264            ReadFormat::PrimaryKey(format) => format.projection_indices(),
265            ReadFormat::Flat(format) => format.projection_indices(),
266        }
267    }
268
269    /// Returns min values of specific column in row groups.
270    pub fn min_values(
271        &self,
272        row_groups: &[impl Borrow<RowGroupMetaData>],
273        column_id: ColumnId,
274    ) -> StatValues {
275        match self {
276            ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
277            ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
278        }
279    }
280
281    /// Returns max values of specific column in row groups.
282    pub fn max_values(
283        &self,
284        row_groups: &[impl Borrow<RowGroupMetaData>],
285        column_id: ColumnId,
286    ) -> StatValues {
287        match self {
288            ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
289            ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
290        }
291    }
292
293    /// Returns null counts of specific column in row groups.
294    pub fn null_counts(
295        &self,
296        row_groups: &[impl Borrow<RowGroupMetaData>],
297        column_id: ColumnId,
298    ) -> StatValues {
299        match self {
300            ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
301            ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
302        }
303    }
304
305    /// Returns min/max values of specific columns.
306    /// Returns None if the column does not have statistics.
307    /// The column should not be encoded as a part of a primary key.
308    pub(crate) fn column_values(
309        row_groups: &[impl Borrow<RowGroupMetaData>],
310        column: &ColumnMetadata,
311        column_index: usize,
312        is_min: bool,
313    ) -> Option<ArrayRef> {
314        let null_scalar: ScalarValue = column
315            .column_schema
316            .data_type
317            .as_arrow_type()
318            .try_into()
319            .ok()?;
320        let scalar_values = row_groups
321            .iter()
322            .map(|meta| {
323                let stats = meta.borrow().column(column_index).statistics()?;
324                match stats {
325                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
326                        *s.min_opt()?
327                    } else {
328                        *s.max_opt()?
329                    }))),
330                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
331                        *s.min_opt()?
332                    } else {
333                        *s.max_opt()?
334                    }))),
335                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
336                        *s.min_opt()?
337                    } else {
338                        *s.max_opt()?
339                    }))),
340
341                    Statistics::Int96(_) => None,
342                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
343                        *s.min_opt()?
344                    } else {
345                        *s.max_opt()?
346                    }))),
347                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
348                        *s.min_opt()?
349                    } else {
350                        *s.max_opt()?
351                    }))),
352                    Statistics::ByteArray(s) => {
353                        let bytes = if is_min {
354                            s.min_bytes_opt()?
355                        } else {
356                            s.max_bytes_opt()?
357                        };
358                        let s = String::from_utf8(bytes.to_vec()).ok();
359                        Some(ScalarValue::Utf8(s))
360                    }
361
362                    Statistics::FixedLenByteArray(_) => None,
363                }
364            })
365            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
366            .collect::<Vec<ScalarValue>>();
367        debug_assert_eq!(scalar_values.len(), row_groups.len());
368        ScalarValue::iter_to_array(scalar_values).ok()
369    }
370
371    /// Returns null counts of specific columns.
372    /// The column should not be encoded as a part of a primary key.
373    pub(crate) fn column_null_counts(
374        row_groups: &[impl Borrow<RowGroupMetaData>],
375        column_index: usize,
376    ) -> Option<ArrayRef> {
377        let values = row_groups.iter().map(|meta| {
378            let col = meta.borrow().column(column_index);
379            let stat = col.statistics()?;
380            stat.null_count_opt()
381        });
382        Some(Arc::new(UInt64Array::from_iter(values)))
383    }
384
385    /// Sets the sequence number to override.
386    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
387        match self {
388            ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
389            ReadFormat::Flat(format) => format.set_override_sequence(sequence),
390        }
391    }
392
393    /// Enables or disables eager decoding of primary key values into batches.
394    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
395        if let ReadFormat::PrimaryKey(format) = self {
396            format.set_decode_primary_key_values(decode);
397        }
398    }
399
400    /// Creates a sequence array to override.
401    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
402        match self {
403            ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
404            ReadFormat::Flat(format) => format.new_override_sequence_array(length),
405        }
406    }
407}
408
409/// Helper for reading the SST format.
410pub struct PrimaryKeyReadFormat {
411    /// The metadata stored in the SST.
412    metadata: RegionMetadataRef,
413    /// SST file schema.
414    arrow_schema: SchemaRef,
415    /// Field column id to its index in `schema` (SST schema).
416    /// In SST schema, fields are stored in the front of the schema.
417    field_id_to_index: HashMap<ColumnId, usize>,
418    /// Indices of columns to read from the SST. It contains all internal columns.
419    projection_indices: Vec<usize>,
420    /// Field column id to their index in the projected schema (
421    /// the schema of [Batch]).
422    field_id_to_projected_index: HashMap<ColumnId, usize>,
423    /// Sequence number to override the sequence read from the SST.
424    override_sequence: Option<SequenceNumber>,
425    /// Codec used to decode primary key values if eager decoding is enabled.
426    primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
427}
428
429impl PrimaryKeyReadFormat {
430    /// Creates a helper with existing `metadata` and `column_ids` to read.
431    pub fn new(
432        metadata: RegionMetadataRef,
433        column_ids: impl Iterator<Item = ColumnId>,
434    ) -> PrimaryKeyReadFormat {
435        let field_id_to_index: HashMap<_, _> = metadata
436            .field_columns()
437            .enumerate()
438            .map(|(index, column)| (column.column_id, index))
439            .collect();
440        let arrow_schema = to_sst_arrow_schema(&metadata);
441
442        let format_projection = FormatProjection::compute_format_projection(
443            &field_id_to_index,
444            arrow_schema.fields.len(),
445            column_ids,
446        );
447
448        PrimaryKeyReadFormat {
449            metadata,
450            arrow_schema,
451            field_id_to_index,
452            projection_indices: format_projection.projection_indices,
453            field_id_to_projected_index: format_projection.column_id_to_projected_index,
454            override_sequence: None,
455            primary_key_codec: None,
456        }
457    }
458
459    /// Sets the sequence number to override.
460    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
461        self.override_sequence = sequence;
462    }
463
464    /// Enables or disables eager decoding of primary key values into batches.
465    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
466        self.primary_key_codec = if decode {
467            Some(build_primary_key_codec(&self.metadata))
468        } else {
469            None
470        };
471    }
472
473    /// Gets the arrow schema of the SST file.
474    ///
475    /// This schema is computed from the region metadata but should be the same
476    /// as the arrow schema decoded from the file metadata.
477    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
478        &self.arrow_schema
479    }
480
481    /// Gets the metadata of the SST.
482    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
483        &self.metadata
484    }
485
486    /// Gets sorted projection indices to read.
487    pub(crate) fn projection_indices(&self) -> &[usize] {
488        &self.projection_indices
489    }
490
491    /// Creates a sequence array to override.
492    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
493        self.override_sequence
494            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
495    }
496
497    /// Convert a arrow record batch into `batches`.
498    ///
499    /// The length of `override_sequence_array` must be larger than the length of the record batch.
500    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
501    pub fn convert_record_batch(
502        &self,
503        record_batch: &RecordBatch,
504        override_sequence_array: Option<&ArrayRef>,
505        batches: &mut VecDeque<Batch>,
506    ) -> Result<()> {
507        debug_assert!(batches.is_empty());
508
509        // The record batch must has time index and internal columns.
510        ensure!(
511            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
512            InvalidRecordBatchSnafu {
513                reason: format!(
514                    "record batch only has {} columns",
515                    record_batch.num_columns()
516                ),
517            }
518        );
519
520        let mut fixed_pos_columns = record_batch
521            .columns()
522            .iter()
523            .rev()
524            .take(FIXED_POS_COLUMN_NUM);
525        // Safety: We have checked the column number.
526        let op_type_array = fixed_pos_columns.next().unwrap();
527        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
528        let pk_array = fixed_pos_columns.next().unwrap();
529        let ts_array = fixed_pos_columns.next().unwrap();
530        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
531
532        // Override sequence array if provided.
533        if let Some(override_array) = override_sequence_array {
534            assert!(override_array.len() >= sequence_array.len());
535            // It's fine to assign the override array directly, but we slice it to make
536            // sure it matches the length of the original sequence array.
537            sequence_array = if override_array.len() > sequence_array.len() {
538                override_array.slice(0, sequence_array.len())
539            } else {
540                override_array.clone()
541            };
542        }
543
544        // Compute primary key offsets.
545        let pk_dict_array = pk_array
546            .as_any()
547            .downcast_ref::<PrimaryKeyArray>()
548            .with_context(|| InvalidRecordBatchSnafu {
549                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
550            })?;
551        let offsets = primary_key_offsets(pk_dict_array)?;
552        if offsets.is_empty() {
553            return Ok(());
554        }
555
556        // Split record batch according to pk offsets.
557        let keys = pk_dict_array.keys();
558        let pk_values = pk_dict_array
559            .values()
560            .as_any()
561            .downcast_ref::<BinaryArray>()
562            .with_context(|| InvalidRecordBatchSnafu {
563                reason: format!(
564                    "values of primary key array should not be {:?}",
565                    pk_dict_array.values().data_type()
566                ),
567            })?;
568        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
569            let end = offsets[i + 1];
570            let rows_in_batch = end - start;
571            let dict_key = keys.value(*start);
572            let primary_key = pk_values.value(dict_key as usize).to_vec();
573
574            let mut builder = BatchBuilder::new(primary_key);
575            builder
576                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
577                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
578                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
579            // Push all fields
580            for batch_column in &field_batch_columns {
581                builder.push_field(BatchColumn {
582                    column_id: batch_column.column_id,
583                    data: batch_column.data.slice(*start, rows_in_batch),
584                });
585            }
586
587            let mut batch = builder.build()?;
588            if let Some(codec) = &self.primary_key_codec {
589                let pk_values: CompositeValues =
590                    codec.decode(batch.primary_key()).context(DecodeSnafu)?;
591                batch.set_pk_values(pk_values);
592            }
593            batches.push_back(batch);
594        }
595
596        Ok(())
597    }
598
599    /// Returns min values of specific column in row groups.
600    pub fn min_values(
601        &self,
602        row_groups: &[impl Borrow<RowGroupMetaData>],
603        column_id: ColumnId,
604    ) -> StatValues {
605        let Some(column) = self.metadata.column_by_id(column_id) else {
606            // No such column in the SST.
607            return StatValues::NoColumn;
608        };
609        match column.semantic_type {
610            SemanticType::Tag => self.tag_values(row_groups, column, true),
611            SemanticType::Field => {
612                // Safety: `field_id_to_index` is initialized by the semantic type.
613                let index = self.field_id_to_index.get(&column_id).unwrap();
614                let stats = ReadFormat::column_values(row_groups, column, *index, true);
615                StatValues::from_stats_opt(stats)
616            }
617            SemanticType::Timestamp => {
618                let index = self.time_index_position();
619                let stats = ReadFormat::column_values(row_groups, column, index, true);
620                StatValues::from_stats_opt(stats)
621            }
622        }
623    }
624
625    /// Returns max values of specific column in row groups.
626    pub fn max_values(
627        &self,
628        row_groups: &[impl Borrow<RowGroupMetaData>],
629        column_id: ColumnId,
630    ) -> StatValues {
631        let Some(column) = self.metadata.column_by_id(column_id) else {
632            // No such column in the SST.
633            return StatValues::NoColumn;
634        };
635        match column.semantic_type {
636            SemanticType::Tag => self.tag_values(row_groups, column, false),
637            SemanticType::Field => {
638                // Safety: `field_id_to_index` is initialized by the semantic type.
639                let index = self.field_id_to_index.get(&column_id).unwrap();
640                let stats = ReadFormat::column_values(row_groups, column, *index, false);
641                StatValues::from_stats_opt(stats)
642            }
643            SemanticType::Timestamp => {
644                let index = self.time_index_position();
645                let stats = ReadFormat::column_values(row_groups, column, index, false);
646                StatValues::from_stats_opt(stats)
647            }
648        }
649    }
650
651    /// Returns null counts of specific column in row groups.
652    pub fn null_counts(
653        &self,
654        row_groups: &[impl Borrow<RowGroupMetaData>],
655        column_id: ColumnId,
656    ) -> StatValues {
657        let Some(column) = self.metadata.column_by_id(column_id) else {
658            // No such column in the SST.
659            return StatValues::NoColumn;
660        };
661        match column.semantic_type {
662            SemanticType::Tag => StatValues::NoStats,
663            SemanticType::Field => {
664                // Safety: `field_id_to_index` is initialized by the semantic type.
665                let index = self.field_id_to_index.get(&column_id).unwrap();
666                let stats = ReadFormat::column_null_counts(row_groups, *index);
667                StatValues::from_stats_opt(stats)
668            }
669            SemanticType::Timestamp => {
670                let index = self.time_index_position();
671                let stats = ReadFormat::column_null_counts(row_groups, index);
672                StatValues::from_stats_opt(stats)
673            }
674        }
675    }
676
677    /// Get fields from `record_batch`.
678    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
679        record_batch
680            .columns()
681            .iter()
682            .zip(record_batch.schema().fields())
683            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
684            .map(|(array, field)| {
685                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
686                let column = self
687                    .metadata
688                    .column_by_name(field.name())
689                    .with_context(|| InvalidRecordBatchSnafu {
690                        reason: format!("column {} not found in metadata", field.name()),
691                    })?;
692
693                Ok(BatchColumn {
694                    column_id: column.column_id,
695                    data: vector,
696                })
697            })
698            .collect()
699    }
700
701    /// Returns min/max values of specific tag.
702    fn tag_values(
703        &self,
704        row_groups: &[impl Borrow<RowGroupMetaData>],
705        column: &ColumnMetadata,
706        is_min: bool,
707    ) -> StatValues {
708        let is_first_tag = self
709            .metadata
710            .primary_key
711            .first()
712            .map(|id| *id == column.column_id)
713            .unwrap_or(false);
714        if !is_first_tag {
715            // Only the min-max of the first tag is available in the primary key.
716            return StatValues::NoStats;
717        }
718
719        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
720    }
721
722    /// Returns min/max values of the first tag.
723    /// Returns None if the tag does not have statistics.
724    fn first_tag_values(
725        &self,
726        row_groups: &[impl Borrow<RowGroupMetaData>],
727        column: &ColumnMetadata,
728        is_min: bool,
729    ) -> Option<ArrayRef> {
730        debug_assert!(
731            self.metadata
732                .primary_key
733                .first()
734                .map(|id| *id == column.column_id)
735                .unwrap_or(false)
736        );
737
738        let primary_key_encoding = self.metadata.primary_key_encoding;
739        let converter = build_primary_key_codec_with_fields(
740            primary_key_encoding,
741            [(
742                column.column_id,
743                SortField::new(column.column_schema.data_type.clone()),
744            )]
745            .into_iter(),
746        );
747
748        let values = row_groups.iter().map(|meta| {
749            let stats = meta
750                .borrow()
751                .column(self.primary_key_position())
752                .statistics()?;
753            match stats {
754                Statistics::Boolean(_) => None,
755                Statistics::Int32(_) => None,
756                Statistics::Int64(_) => None,
757                Statistics::Int96(_) => None,
758                Statistics::Float(_) => None,
759                Statistics::Double(_) => None,
760                Statistics::ByteArray(s) => {
761                    let bytes = if is_min {
762                        s.min_bytes_opt()?
763                    } else {
764                        s.max_bytes_opt()?
765                    };
766                    converter.decode_leftmost(bytes).ok()?
767                }
768                Statistics::FixedLenByteArray(_) => None,
769            }
770        });
771        let mut builder = column
772            .column_schema
773            .data_type
774            .create_mutable_vector(row_groups.len());
775        for value_opt in values {
776            match value_opt {
777                // Safety: We use the same data type to create the converter.
778                Some(v) => builder.push_value_ref(&v.as_value_ref()),
779                None => builder.push_null(),
780            }
781        }
782        let vector = builder.to_vector();
783
784        Some(vector.to_arrow_array())
785    }
786
787    /// Index in SST of the primary key.
788    fn primary_key_position(&self) -> usize {
789        self.arrow_schema.fields.len() - 3
790    }
791
792    /// Index in SST of the time index.
793    fn time_index_position(&self) -> usize {
794        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
795    }
796
797    /// Index of a field column by its column id.
798    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
799        self.field_id_to_projected_index.get(&column_id).copied()
800    }
801}
802
803/// Helper to compute the projection for the SST.
804pub(crate) struct FormatProjection {
805    /// Indices of columns to read from the SST. It contains all internal columns.
806    pub(crate) projection_indices: Vec<usize>,
807    /// Column id to their index in the projected schema (
808    /// the schema after projection).
809    ///
810    /// It doesn't contain time index column if it is not present in the projection.
811    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
812}
813
814impl FormatProjection {
815    /// Computes the projection.
816    ///
817    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
818    pub(crate) fn compute_format_projection(
819        id_to_index: &HashMap<ColumnId, usize>,
820        sst_column_num: usize,
821        column_ids: impl Iterator<Item = ColumnId>,
822    ) -> Self {
823        // Maps column id of a projected column to its index in SST.
824        // It also ignores columns not in the SST.
825        // [(column id, index in SST)]
826        let mut projected_schema: Vec<_> = column_ids
827            .filter_map(|column_id| {
828                id_to_index
829                    .get(&column_id)
830                    .copied()
831                    .map(|index| (column_id, index))
832            })
833            .collect();
834        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
835        // This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
836        projected_schema.sort_unstable_by_key(|x| x.1);
837        // Dedups the entries to avoid the case that `column_ids` has duplicated columns.
838        projected_schema.dedup_by_key(|x| x.1);
839
840        // Collects all projected indices.
841        // It contains the positions of all columns we need to read.
842        let mut projection_indices: Vec<_> = projected_schema
843            .iter()
844            .map(|(_column_id, index)| *index)
845            // We need to add all fixed position columns.
846            .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
847            .collect();
848        projection_indices.sort_unstable();
849        // Removes duplications.
850        projection_indices.dedup();
851
852        // Creates a map from column id to the index of that column in the projected record batch.
853        let column_id_to_projected_index = projected_schema
854            .into_iter()
855            .map(|(column_id, _)| column_id)
856            .enumerate()
857            .map(|(index, column_id)| (column_id, index))
858            .collect();
859
860        Self {
861            projection_indices,
862            column_id_to_projected_index,
863        }
864    }
865}
866
867/// Values of column statistics of the SST.
868///
869/// It also distinguishes the case that a column is not found and
870/// the column exists but has no statistics.
871pub enum StatValues {
872    /// Values of each row group.
873    Values(ArrayRef),
874    /// No such column.
875    NoColumn,
876    /// Column exists but has no statistics.
877    NoStats,
878}
879
880impl StatValues {
881    /// Creates a new `StatValues` instance from optional statistics.
882    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
883        match stats {
884            Some(stats) => StatValues::Values(stats),
885            None => StatValues::NoStats,
886        }
887    }
888}
889
890#[cfg(test)]
891impl PrimaryKeyReadFormat {
892    /// Creates a helper with existing `metadata` and all columns.
893    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
894        Self::new(
895            Arc::clone(&metadata),
896            metadata.column_metadatas.iter().map(|c| c.column_id),
897        )
898    }
899}
900
901/// Compute offsets of different primary keys in the array.
902fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
903    if pk_dict_array.is_empty() {
904        return Ok(Vec::new());
905    }
906
907    // Init offsets.
908    let mut offsets = vec![0];
909    let keys = pk_dict_array.keys();
910    // We know that primary keys are always not null so we iterate `keys.values()` directly.
911    let pk_indices = keys.values();
912    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
913        // Compare each key with next key
914        if *key != pk_indices[i + 1] {
915            // We meet a new key, push the next index as end of the offset.
916            offsets.push(i + 1);
917        }
918    }
919    offsets.push(keys.len());
920
921    Ok(offsets)
922}
923
924/// Creates a new array for specific `primary_key`.
925fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
926    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
927    let keys = UInt32Array::from_value(0, num_rows);
928
929    // Safety: The key index is valid.
930    Arc::new(DictionaryArray::new(keys, values))
931}
932
933/// Gets the min/max time index of the row group from the parquet meta.
934/// It assumes the parquet is created by the mito engine.
935pub(crate) fn parquet_row_group_time_range(
936    file_meta: &FileMeta,
937    parquet_meta: &ParquetMetaData,
938    row_group_idx: usize,
939) -> Option<FileTimeRange> {
940    let row_group_meta = parquet_meta.row_group(row_group_idx);
941    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
942    assert!(
943        num_columns >= FIXED_POS_COLUMN_NUM,
944        "file only has {} columns",
945        num_columns
946    );
947    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
948
949    let stats = row_group_meta.column(time_index_pos).statistics()?;
950    // The physical type for the timestamp should be i64.
951    let (min, max) = match stats {
952        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
953        Statistics::Int32(_)
954        | Statistics::Boolean(_)
955        | Statistics::Int96(_)
956        | Statistics::Float(_)
957        | Statistics::Double(_)
958        | Statistics::ByteArray(_)
959        | Statistics::FixedLenByteArray(_) => {
960            common_telemetry::warn!(
961                "Invalid statistics {:?} for time index in parquet in {}",
962                stats,
963                file_meta.file_id
964            );
965            return None;
966        }
967    };
968
969    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
970    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
971    let unit = file_meta.time_range.0.unit();
972
973    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
974}
975
976/// Checks if sequence override is needed based on all row groups' statistics.
977/// Returns true if ALL row groups have sequence min-max values of 0.
978pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
979    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
980    if num_columns < FIXED_POS_COLUMN_NUM {
981        return false;
982    }
983
984    // The sequence column is the second-to-last column (before op_type)
985    let sequence_pos = num_columns - 2;
986
987    // Check all row groups - all must have sequence min-max of 0
988    for row_group in parquet_meta.row_groups() {
989        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
990            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
991                // If any row group doesn't have min=0 and max=0, return false
992                if *min_val != 0 || *max_val != 0 {
993                    return false;
994                }
995            } else {
996                // If any row group doesn't have statistics, return false
997                return false;
998            }
999        } else {
1000            // If any row group doesn't have Int64 statistics, return false
1001            return false;
1002        }
1003    }
1004
1005    // All row groups have sequence min-max of 0, or there are no row groups
1006    !parquet_meta.row_groups().is_empty()
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011    use std::sync::Arc;
1012
1013    use api::v1::OpType;
1014    use datatypes::arrow::array::{
1015        Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
1016    };
1017    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
1018    use datatypes::prelude::ConcreteDataType;
1019    use datatypes::schema::ColumnSchema;
1020    use datatypes::value::ValueRef;
1021    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1022    use mito_codec::row_converter::{
1023        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1024    };
1025    use store_api::codec::PrimaryKeyEncoding;
1026    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1027    use store_api::storage::RegionId;
1028    use store_api::storage::consts::ReservedColumnId;
1029
1030    use super::*;
1031    use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1032    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1033
1034    const TEST_SEQUENCE: u64 = 1;
1035    const TEST_OP_TYPE: u8 = OpType::Put as u8;
1036
1037    fn build_test_region_metadata() -> RegionMetadataRef {
1038        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1039        builder
1040            .push_column_metadata(ColumnMetadata {
1041                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1042                semantic_type: SemanticType::Tag,
1043                column_id: 1,
1044            })
1045            .push_column_metadata(ColumnMetadata {
1046                column_schema: ColumnSchema::new(
1047                    "field1",
1048                    ConcreteDataType::int64_datatype(),
1049                    true,
1050                ),
1051                semantic_type: SemanticType::Field,
1052                column_id: 4, // We change the order of fields columns.
1053            })
1054            .push_column_metadata(ColumnMetadata {
1055                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1056                semantic_type: SemanticType::Tag,
1057                column_id: 3,
1058            })
1059            .push_column_metadata(ColumnMetadata {
1060                column_schema: ColumnSchema::new(
1061                    "field0",
1062                    ConcreteDataType::int64_datatype(),
1063                    true,
1064                ),
1065                semantic_type: SemanticType::Field,
1066                column_id: 2,
1067            })
1068            .push_column_metadata(ColumnMetadata {
1069                column_schema: ColumnSchema::new(
1070                    "ts",
1071                    ConcreteDataType::timestamp_millisecond_datatype(),
1072                    false,
1073                ),
1074                semantic_type: SemanticType::Timestamp,
1075                column_id: 5,
1076            })
1077            .primary_key(vec![1, 3]);
1078        Arc::new(builder.build().unwrap())
1079    }
1080
1081    fn build_test_arrow_schema() -> SchemaRef {
1082        let fields = vec![
1083            Field::new("field1", ArrowDataType::Int64, true),
1084            Field::new("field0", ArrowDataType::Int64, true),
1085            Field::new(
1086                "ts",
1087                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1088                false,
1089            ),
1090            Field::new(
1091                "__primary_key",
1092                ArrowDataType::Dictionary(
1093                    Box::new(ArrowDataType::UInt32),
1094                    Box::new(ArrowDataType::Binary),
1095                ),
1096                false,
1097            ),
1098            Field::new("__sequence", ArrowDataType::UInt64, false),
1099            Field::new("__op_type", ArrowDataType::UInt8, false),
1100        ];
1101        Arc::new(Schema::new(fields))
1102    }
1103
1104    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1105        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1106    }
1107
1108    fn new_batch_with_sequence(
1109        primary_key: &[u8],
1110        start_ts: i64,
1111        start_field: i64,
1112        num_rows: usize,
1113        sequence: u64,
1114    ) -> Batch {
1115        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1116        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1117        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1118        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1119        let fields = vec![
1120            BatchColumn {
1121                column_id: 4,
1122                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1123            }, // field1
1124            BatchColumn {
1125                column_id: 2,
1126                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1127            }, // field0
1128        ];
1129
1130        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1131            .with_fields(fields)
1132            .build()
1133            .unwrap()
1134    }
1135
1136    #[test]
1137    fn test_to_sst_arrow_schema() {
1138        let metadata = build_test_region_metadata();
1139        let write_format = PrimaryKeyWriteFormat::new(metadata);
1140        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1141    }
1142
1143    #[test]
1144    fn test_new_primary_key_array() {
1145        let array = new_primary_key_array(b"test", 3);
1146        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1147        assert_eq!(&expect, &array);
1148    }
1149
1150    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1151        let values = Arc::new(BinaryArray::from_iter_values(
1152            pk_row_nums.iter().map(|v| &v.0),
1153        ));
1154        let mut keys = vec![];
1155        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1156            keys.extend(std::iter::repeat_n(index as u32, num_rows));
1157        }
1158        let keys = UInt32Array::from(keys);
1159        Arc::new(DictionaryArray::new(keys, values))
1160    }
1161
1162    #[test]
1163    fn test_convert_batch() {
1164        let metadata = build_test_region_metadata();
1165        let write_format = PrimaryKeyWriteFormat::new(metadata);
1166
1167        let num_rows = 4;
1168        let batch = new_batch(b"test", 1, 2, num_rows);
1169        let columns: Vec<ArrayRef> = vec![
1170            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1171            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1172            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1173            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1174            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1175            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1176        ];
1177        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1178
1179        let actual = write_format.convert_batch(&batch).unwrap();
1180        assert_eq!(expect_record, actual);
1181    }
1182
1183    #[test]
1184    fn test_convert_batch_with_override_sequence() {
1185        let metadata = build_test_region_metadata();
1186        let write_format =
1187            PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1188
1189        let num_rows = 4;
1190        let batch = new_batch(b"test", 1, 2, num_rows);
1191        let columns: Vec<ArrayRef> = vec![
1192            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1193            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1194            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1195            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1196            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
1197            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1198        ];
1199        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1200
1201        let actual = write_format.convert_batch(&batch).unwrap();
1202        assert_eq!(expect_record, actual);
1203    }
1204
1205    #[test]
1206    fn test_projection_indices() {
1207        let metadata = build_test_region_metadata();
1208        // Only read tag1
1209        let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1210        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1211        // Only read field1
1212        let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1213        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1214        // Only read ts
1215        let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1216        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1217        // Read field0, tag0, ts
1218        let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1219        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1220    }
1221
1222    #[test]
1223    fn test_empty_primary_key_offsets() {
1224        let array = build_test_pk_array(&[]);
1225        assert!(primary_key_offsets(&array).unwrap().is_empty());
1226    }
1227
1228    #[test]
1229    fn test_primary_key_offsets_one_series() {
1230        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1231        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1232
1233        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1234        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1235
1236        let array = build_test_pk_array(&[
1237            (b"one".to_vec(), 1),
1238            (b"two".to_vec(), 1),
1239            (b"three".to_vec(), 1),
1240        ]);
1241        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1242    }
1243
1244    #[test]
1245    fn test_primary_key_offsets_multi_series() {
1246        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1247        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1248
1249        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1250        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1251
1252        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1253        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1254    }
1255
1256    #[test]
1257    fn test_convert_empty_record_batch() {
1258        let metadata = build_test_region_metadata();
1259        let arrow_schema = build_test_arrow_schema();
1260        let column_ids: Vec<_> = metadata
1261            .column_metadatas
1262            .iter()
1263            .map(|col| col.column_id)
1264            .collect();
1265        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1266        assert_eq!(arrow_schema, *read_format.arrow_schema());
1267
1268        let record_batch = RecordBatch::new_empty(arrow_schema);
1269        let mut batches = VecDeque::new();
1270        read_format
1271            .convert_record_batch(&record_batch, None, &mut batches)
1272            .unwrap();
1273        assert!(batches.is_empty());
1274    }
1275
1276    #[test]
1277    fn test_convert_record_batch() {
1278        let metadata = build_test_region_metadata();
1279        let column_ids: Vec<_> = metadata
1280            .column_metadatas
1281            .iter()
1282            .map(|col| col.column_id)
1283            .collect();
1284        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1285
1286        let columns: Vec<ArrayRef> = vec![
1287            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1288            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1289            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1290            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1291            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1292            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1293        ];
1294        let arrow_schema = build_test_arrow_schema();
1295        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1296        let mut batches = VecDeque::new();
1297        read_format
1298            .convert_record_batch(&record_batch, None, &mut batches)
1299            .unwrap();
1300
1301        assert_eq!(
1302            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1303            batches.into_iter().collect::<Vec<_>>(),
1304        );
1305    }
1306
1307    #[test]
1308    fn test_convert_record_batch_with_override_sequence() {
1309        let metadata = build_test_region_metadata();
1310        let column_ids: Vec<_> = metadata
1311            .column_metadatas
1312            .iter()
1313            .map(|col| col.column_id)
1314            .collect();
1315        let read_format =
1316            ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1317
1318        let columns: Vec<ArrayRef> = vec![
1319            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1320            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1321            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1322            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1323            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1324            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1325        ];
1326        let arrow_schema = build_test_arrow_schema();
1327        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1328
1329        // Create override sequence array with custom values
1330        let override_sequence: u64 = 12345;
1331        let override_sequence_array: ArrayRef =
1332            Arc::new(UInt64Array::from_value(override_sequence, 4));
1333
1334        let mut batches = VecDeque::new();
1335        read_format
1336            .as_primary_key()
1337            .unwrap()
1338            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1339            .unwrap();
1340
1341        // Create expected batches with override sequence
1342        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1343        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1344
1345        assert_eq!(
1346            vec![expected_batch1, expected_batch2],
1347            batches.into_iter().collect::<Vec<_>>(),
1348        );
1349    }
1350
1351    fn build_test_flat_sst_schema() -> SchemaRef {
1352        let fields = vec![
1353            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1354            Field::new("tag1", ArrowDataType::Int64, true),
1355            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1356            Field::new("field0", ArrowDataType::Int64, true),
1357            Field::new(
1358                "ts",
1359                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1360                false,
1361            ),
1362            Field::new(
1363                "__primary_key",
1364                ArrowDataType::Dictionary(
1365                    Box::new(ArrowDataType::UInt32),
1366                    Box::new(ArrowDataType::Binary),
1367                ),
1368                false,
1369            ),
1370            Field::new("__sequence", ArrowDataType::UInt64, false),
1371            Field::new("__op_type", ArrowDataType::UInt8, false),
1372        ];
1373        Arc::new(Schema::new(fields))
1374    }
1375
1376    #[test]
1377    fn test_flat_to_sst_arrow_schema() {
1378        let metadata = build_test_region_metadata();
1379        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1380        assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1381    }
1382
1383    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1384        vec![
1385            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1386            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1387            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1388            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1389            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1390            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1391            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1392            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1393        ]
1394    }
1395
1396    #[test]
1397    fn test_flat_convert_batch() {
1398        let metadata = build_test_region_metadata();
1399        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1400
1401        let num_rows = 4;
1402        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1403        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1404        let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1405
1406        let actual = format.convert_batch(&batch).unwrap();
1407        assert_eq!(expect_record, actual);
1408    }
1409
1410    #[test]
1411    fn test_flat_convert_with_override_sequence() {
1412        let metadata = build_test_region_metadata();
1413        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1414            .with_override_sequence(Some(415411));
1415
1416        let num_rows = 4;
1417        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1418        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1419
1420        let expected_columns: Vec<ArrayRef> = vec![
1421            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1422            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1423            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1424            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1425            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1426            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1427            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1428            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1429        ];
1430        let expected_record =
1431            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1432
1433        let actual = format.convert_batch(&batch).unwrap();
1434        assert_eq!(expected_record, actual);
1435    }
1436
1437    #[test]
1438    fn test_flat_projection_indices() {
1439        let metadata = build_test_region_metadata();
1440        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1441        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1442
1443        // Only read tag1 (column_id=3, index=1) + fixed columns
1444        let read_format =
1445            ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1446                .unwrap();
1447        assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1448
1449        // Only read field1 (column_id=4, index=2) + fixed columns
1450        let read_format =
1451            ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1452                .unwrap();
1453        assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1454
1455        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1456        let read_format =
1457            ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1458                .unwrap();
1459        assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1460
1461        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1462        let read_format =
1463            ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1464        assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1465    }
1466
1467    #[test]
1468    fn test_flat_read_format_convert_batch() {
1469        let metadata = build_test_region_metadata();
1470        let mut format = FlatReadFormat::new(
1471            metadata,
1472            std::iter::once(1), // Just read tag0
1473            Some(8),
1474            "test",
1475            false,
1476        )
1477        .unwrap();
1478
1479        let num_rows = 4;
1480        let original_sequence = 100u64;
1481        let override_sequence = 200u64;
1482
1483        // Create a test record batch
1484        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1485        let mut test_columns = columns.clone();
1486        // Replace sequence column with original sequence values
1487        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1488        let record_batch =
1489            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1490
1491        // Test without override sequence - should return clone
1492        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1493        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1494        let sequence_array = sequence_column
1495            .as_any()
1496            .downcast_ref::<UInt64Array>()
1497            .unwrap();
1498
1499        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1500        assert_eq!(sequence_array, &expected_original);
1501
1502        // Set override sequence and test with new_override_sequence_array
1503        format.set_override_sequence(Some(override_sequence));
1504        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1505        let result = format
1506            .convert_batch(record_batch, Some(&override_sequence_array))
1507            .unwrap();
1508        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1509        let sequence_array = sequence_column
1510            .as_any()
1511            .downcast_ref::<UInt64Array>()
1512            .unwrap();
1513
1514        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1515        assert_eq!(sequence_array, &expected_override);
1516    }
1517
1518    #[test]
1519    fn test_need_convert_to_flat() {
1520        let metadata = build_test_region_metadata();
1521
1522        // Test case 1: Same number of columns, no conversion needed
1523        // For flat format: all columns (5) + internal columns (3)
1524        let expected_columns = metadata.column_metadatas.len() + 3;
1525        let result =
1526            FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1527        assert!(
1528            !result,
1529            "Should not need conversion when column counts match"
1530        );
1531
1532        // Test case 2: Different number of columns, need conversion
1533        // Missing primary key columns (2 primary keys in test metadata)
1534        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1535        let result =
1536            FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1537                .unwrap();
1538        assert!(
1539            result,
1540            "Should need conversion when primary key columns are missing"
1541        );
1542
1543        // Test case 3: Invalid case - actual columns more than expected
1544        let too_many_columns = expected_columns + 1;
1545        let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1546            .unwrap_err();
1547        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1548
1549        // Test case 4: Invalid case - column difference doesn't match primary key count
1550        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1551        let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1552            .unwrap_err();
1553        assert!(
1554            err.to_string().contains("Column number difference"),
1555            "{err:?}"
1556        );
1557    }
1558
1559    fn build_test_dense_pk_array(
1560        codec: &DensePrimaryKeyCodec,
1561        pk_values_per_row: &[&[Option<i64>]],
1562    ) -> Arc<PrimaryKeyArray> {
1563        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1564
1565        for pk_values_row in pk_values_per_row {
1566            let values: Vec<ValueRef> = pk_values_row
1567                .iter()
1568                .map(|opt| match opt {
1569                    Some(val) => ValueRef::Int64(*val),
1570                    None => ValueRef::Null,
1571                })
1572                .collect();
1573
1574            let encoded = codec.encode(values.into_iter()).unwrap();
1575            builder.append_value(&encoded);
1576        }
1577
1578        Arc::new(builder.finish())
1579    }
1580
1581    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1582        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1583        builder
1584            .push_column_metadata(ColumnMetadata {
1585                column_schema: ColumnSchema::new(
1586                    "__table_id",
1587                    ConcreteDataType::uint32_datatype(),
1588                    false,
1589                ),
1590                semantic_type: SemanticType::Tag,
1591                column_id: ReservedColumnId::table_id(),
1592            })
1593            .push_column_metadata(ColumnMetadata {
1594                column_schema: ColumnSchema::new(
1595                    "__tsid",
1596                    ConcreteDataType::uint64_datatype(),
1597                    false,
1598                ),
1599                semantic_type: SemanticType::Tag,
1600                column_id: ReservedColumnId::tsid(),
1601            })
1602            .push_column_metadata(ColumnMetadata {
1603                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1604                semantic_type: SemanticType::Tag,
1605                column_id: 1,
1606            })
1607            .push_column_metadata(ColumnMetadata {
1608                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1609                semantic_type: SemanticType::Tag,
1610                column_id: 3,
1611            })
1612            .push_column_metadata(ColumnMetadata {
1613                column_schema: ColumnSchema::new(
1614                    "field1",
1615                    ConcreteDataType::int64_datatype(),
1616                    true,
1617                ),
1618                semantic_type: SemanticType::Field,
1619                column_id: 4,
1620            })
1621            .push_column_metadata(ColumnMetadata {
1622                column_schema: ColumnSchema::new(
1623                    "field0",
1624                    ConcreteDataType::int64_datatype(),
1625                    true,
1626                ),
1627                semantic_type: SemanticType::Field,
1628                column_id: 2,
1629            })
1630            .push_column_metadata(ColumnMetadata {
1631                column_schema: ColumnSchema::new(
1632                    "ts",
1633                    ConcreteDataType::timestamp_millisecond_datatype(),
1634                    false,
1635                ),
1636                semantic_type: SemanticType::Timestamp,
1637                column_id: 5,
1638            })
1639            .primary_key(vec![
1640                ReservedColumnId::table_id(),
1641                ReservedColumnId::tsid(),
1642                1,
1643                3,
1644            ])
1645            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1646        Arc::new(builder.build().unwrap())
1647    }
1648
1649    fn build_test_sparse_pk_array(
1650        codec: &SparsePrimaryKeyCodec,
1651        pk_values_per_row: &[SparseTestRow],
1652    ) -> Arc<PrimaryKeyArray> {
1653        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1654        for row in pk_values_per_row {
1655            let values = vec![
1656                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1657                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1658                (1, ValueRef::String(&row.tag0)),
1659                (3, ValueRef::String(&row.tag1)),
1660            ];
1661
1662            let mut buffer = Vec::new();
1663            codec.encode_value_refs(&values, &mut buffer).unwrap();
1664            builder.append_value(&buffer);
1665        }
1666
1667        Arc::new(builder.finish())
1668    }
1669
1670    #[derive(Clone)]
1671    struct SparseTestRow {
1672        table_id: u32,
1673        tsid: u64,
1674        tag0: String,
1675        tag1: String,
1676    }
1677
1678    #[test]
1679    fn test_flat_read_format_convert_format_with_dense_encoding() {
1680        let metadata = build_test_region_metadata();
1681
1682        let column_ids: Vec<_> = metadata
1683            .column_metadatas
1684            .iter()
1685            .map(|c| c.column_id)
1686            .collect();
1687        let format = FlatReadFormat::new(
1688            metadata.clone(),
1689            column_ids.into_iter(),
1690            Some(6),
1691            "test",
1692            false,
1693        )
1694        .unwrap();
1695
1696        let num_rows = 4;
1697        let original_sequence = 100u64;
1698
1699        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1700        let pk_values_per_row = vec![
1701                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1702            ];
1703
1704        // Create a test record batch in old format using dense encoding
1705        let codec = DensePrimaryKeyCodec::new(&metadata);
1706        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1707        let columns: Vec<ArrayRef> = vec![
1708            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1709            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1710            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1711            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1712            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1713            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1714        ];
1715
1716        // Create schema for old format (without primary key columns)
1717        let old_format_fields = vec![
1718            Field::new("field1", ArrowDataType::Int64, true),
1719            Field::new("field0", ArrowDataType::Int64, true),
1720            Field::new(
1721                "ts",
1722                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1723                false,
1724            ),
1725            Field::new(
1726                "__primary_key",
1727                ArrowDataType::Dictionary(
1728                    Box::new(ArrowDataType::UInt32),
1729                    Box::new(ArrowDataType::Binary),
1730                ),
1731                false,
1732            ),
1733            Field::new("__sequence", ArrowDataType::UInt64, false),
1734            Field::new("__op_type", ArrowDataType::UInt8, false),
1735        ];
1736        let old_schema = Arc::new(Schema::new(old_format_fields));
1737        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1738
1739        // Test conversion with dense encoding
1740        let result = format.convert_batch(record_batch, None).unwrap();
1741
1742        // Construct expected RecordBatch in flat format with decoded primary key columns
1743        let expected_columns: Vec<ArrayRef> = vec![
1744            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1745            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1746            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1747            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1748            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1749            dense_pk_array,                                // __primary_key (preserved)
1750            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1751            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1752        ];
1753        let expected_record_batch =
1754            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1755
1756        // Compare the actual result with the expected record batch
1757        assert_eq!(expected_record_batch, result);
1758    }
1759
1760    #[test]
1761    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1762        let metadata = build_test_sparse_region_metadata();
1763
1764        let column_ids: Vec<_> = metadata
1765            .column_metadatas
1766            .iter()
1767            .map(|c| c.column_id)
1768            .collect();
1769        let format = FlatReadFormat::new(
1770            metadata.clone(),
1771            column_ids.clone().into_iter(),
1772            None,
1773            "test",
1774            false,
1775        )
1776        .unwrap();
1777
1778        let num_rows = 4;
1779        let original_sequence = 100u64;
1780
1781        // Create sparse test data with table_id, tsid and string tags
1782        let pk_test_rows = vec![
1783            SparseTestRow {
1784                table_id: 1,
1785                tsid: 123,
1786                tag0: "frontend".to_string(),
1787                tag1: "pod1".to_string(),
1788            };
1789            num_rows
1790        ];
1791
1792        let codec = SparsePrimaryKeyCodec::new(&metadata);
1793        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1794        // Create a test record batch in old format using sparse encoding
1795        let columns: Vec<ArrayRef> = vec![
1796            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1797            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1798            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1799            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1800            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1801            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1802        ];
1803
1804        // Create schema for old format (without primary key columns)
1805        let old_format_fields = vec![
1806            Field::new("field1", ArrowDataType::Int64, true),
1807            Field::new("field0", ArrowDataType::Int64, true),
1808            Field::new(
1809                "ts",
1810                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1811                false,
1812            ),
1813            Field::new(
1814                "__primary_key",
1815                ArrowDataType::Dictionary(
1816                    Box::new(ArrowDataType::UInt32),
1817                    Box::new(ArrowDataType::Binary),
1818                ),
1819                false,
1820            ),
1821            Field::new("__sequence", ArrowDataType::UInt64, false),
1822            Field::new("__op_type", ArrowDataType::UInt8, false),
1823        ];
1824        let old_schema = Arc::new(Schema::new(old_format_fields));
1825        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1826
1827        // Test conversion with sparse encoding
1828        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1829
1830        // Construct expected RecordBatch in flat format with decoded primary key columns
1831        let tag0_array = Arc::new(DictionaryArray::new(
1832            UInt32Array::from(vec![0; num_rows]),
1833            Arc::new(StringArray::from(vec!["frontend"])),
1834        ));
1835        let tag1_array = Arc::new(DictionaryArray::new(
1836            UInt32Array::from(vec![0; num_rows]),
1837            Arc::new(StringArray::from(vec!["pod1"])),
1838        ));
1839        let expected_columns: Vec<ArrayRef> = vec![
1840            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1841            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1842            tag0_array,                                     // tag0 (decoded from primary key)
1843            tag1_array,                                     // tag1 (decoded from primary key)
1844            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1845            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1846            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1847            sparse_pk_array,                                // __primary_key (preserved)
1848            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1849            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1850        ];
1851        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1852        let expected_record_batch =
1853            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1854
1855        // Compare the actual result with the expected record batch
1856        assert_eq!(expected_record_batch, result);
1857
1858        let format =
1859            FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1860                .unwrap();
1861        // Test conversion with sparse encoding and skip convert.
1862        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1863        assert_eq!(record_batch, result);
1864    }
1865}