mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{
44    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45    build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54    ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
55    NewRecordBatchSnafu, Result,
56};
57use crate::read::{Batch, BatchBuilder, BatchColumn};
58use crate::sst::file::{FileMeta, FileTimeRange};
59use crate::sst::parquet::flat_format::FlatReadFormat;
60use crate::sst::to_sst_arrow_schema;
61
62/// Arrow array type for the primary key dictionary.
63pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
64/// Builder type for primary key dictionary array.
65pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
66
67/// Number of columns that have fixed positions.
68///
69/// Contains: time index and internal columns.
70pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
71/// Number of internal columns.
72pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
73
74/// Helper for writing the SST format with primary key.
75pub(crate) struct PrimaryKeyWriteFormat {
76    metadata: RegionMetadataRef,
77    /// SST file schema.
78    arrow_schema: SchemaRef,
79    override_sequence: Option<SequenceNumber>,
80}
81
82impl PrimaryKeyWriteFormat {
83    /// Creates a new helper.
84    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
85        let arrow_schema = to_sst_arrow_schema(&metadata);
86        PrimaryKeyWriteFormat {
87            metadata,
88            arrow_schema,
89            override_sequence: None,
90        }
91    }
92
93    /// Set override sequence.
94    pub(crate) fn with_override_sequence(
95        mut self,
96        override_sequence: Option<SequenceNumber>,
97    ) -> Self {
98        self.override_sequence = override_sequence;
99        self
100    }
101
102    /// Gets the arrow schema to store in parquet.
103    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
104        &self.arrow_schema
105    }
106
107    /// Convert `batch` to a arrow record batch to store in parquet.
108    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
109        debug_assert_eq!(
110            batch.fields().len() + FIXED_POS_COLUMN_NUM,
111            self.arrow_schema.fields().len()
112        );
113        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
114        // Store all fields first.
115        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
116            ensure!(
117                column.column_id == column_metadata.column_id,
118                InvalidBatchSnafu {
119                    reason: format!(
120                        "Batch has column {} but metadata has column {}",
121                        column.column_id, column_metadata.column_id
122                    ),
123                }
124            );
125
126            columns.push(column.data.to_arrow_array());
127        }
128        // Add time index column.
129        columns.push(batch.timestamps().to_arrow_array());
130        // Add internal columns: primary key, sequences, op types.
131        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
132
133        if let Some(override_sequence) = self.override_sequence {
134            let sequence_array =
135                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
136            columns.push(sequence_array);
137        } else {
138            columns.push(batch.sequences().to_arrow_array());
139        }
140        columns.push(batch.op_types().to_arrow_array());
141
142        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
143    }
144}
145
146/// Helper to read parquet formats.
147pub enum ReadFormat {
148    /// The parquet is in the old primary key format.
149    PrimaryKey(PrimaryKeyReadFormat),
150    /// The parquet is in the new flat format.
151    Flat(FlatReadFormat),
152}
153
154impl ReadFormat {
155    /// Creates a helper to read the primary key format.
156    pub fn new_primary_key(
157        metadata: RegionMetadataRef,
158        column_ids: impl Iterator<Item = ColumnId>,
159    ) -> Self {
160        ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
161    }
162
163    /// Creates a helper to read the flat format.
164    pub fn new_flat(
165        metadata: RegionMetadataRef,
166        column_ids: impl Iterator<Item = ColumnId>,
167        num_columns: Option<usize>,
168        file_path: &str,
169        skip_auto_convert: bool,
170    ) -> Result<Self> {
171        Ok(ReadFormat::Flat(FlatReadFormat::new(
172            metadata,
173            column_ids,
174            num_columns,
175            file_path,
176            skip_auto_convert,
177        )?))
178    }
179
180    /// Creates a new read format.
181    pub fn new(
182        region_metadata: RegionMetadataRef,
183        projection: Option<&[ColumnId]>,
184        flat_format: bool,
185        num_columns: Option<usize>,
186        file_path: &str,
187        skip_auto_convert: bool,
188    ) -> Result<ReadFormat> {
189        if flat_format {
190            if let Some(column_ids) = projection {
191                ReadFormat::new_flat(
192                    region_metadata,
193                    column_ids.iter().copied(),
194                    num_columns,
195                    file_path,
196                    skip_auto_convert,
197                )
198            } else {
199                // No projection, lists all column ids to read.
200                ReadFormat::new_flat(
201                    region_metadata.clone(),
202                    region_metadata
203                        .column_metadatas
204                        .iter()
205                        .map(|col| col.column_id),
206                    num_columns,
207                    file_path,
208                    skip_auto_convert,
209                )
210            }
211        } else if let Some(column_ids) = projection {
212            Ok(ReadFormat::new_primary_key(
213                region_metadata,
214                column_ids.iter().copied(),
215            ))
216        } else {
217            // No projection, lists all column ids to read.
218            Ok(ReadFormat::new_primary_key(
219                region_metadata.clone(),
220                region_metadata
221                    .column_metadatas
222                    .iter()
223                    .map(|col| col.column_id),
224            ))
225        }
226    }
227
228    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
229        match self {
230            ReadFormat::PrimaryKey(format) => Some(format),
231            _ => None,
232        }
233    }
234
235    pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
236        match self {
237            ReadFormat::Flat(format) => Some(format),
238            _ => None,
239        }
240    }
241
242    /// Gets the arrow schema of the SST file.
243    ///
244    /// This schema is computed from the region metadata but should be the same
245    /// as the arrow schema decoded from the file metadata.
246    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
247        match self {
248            ReadFormat::PrimaryKey(format) => format.arrow_schema(),
249            ReadFormat::Flat(format) => format.arrow_schema(),
250        }
251    }
252
253    /// Gets the metadata of the SST.
254    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
255        match self {
256            ReadFormat::PrimaryKey(format) => format.metadata(),
257            ReadFormat::Flat(format) => format.metadata(),
258        }
259    }
260
261    /// Gets sorted projection indices to read.
262    pub(crate) fn projection_indices(&self) -> &[usize] {
263        match self {
264            ReadFormat::PrimaryKey(format) => format.projection_indices(),
265            ReadFormat::Flat(format) => format.projection_indices(),
266        }
267    }
268
269    /// Returns min values of specific column in row groups.
270    pub fn min_values(
271        &self,
272        row_groups: &[impl Borrow<RowGroupMetaData>],
273        column_id: ColumnId,
274    ) -> StatValues {
275        match self {
276            ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
277            ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
278        }
279    }
280
281    /// Returns max values of specific column in row groups.
282    pub fn max_values(
283        &self,
284        row_groups: &[impl Borrow<RowGroupMetaData>],
285        column_id: ColumnId,
286    ) -> StatValues {
287        match self {
288            ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
289            ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
290        }
291    }
292
293    /// Returns null counts of specific column in row groups.
294    pub fn null_counts(
295        &self,
296        row_groups: &[impl Borrow<RowGroupMetaData>],
297        column_id: ColumnId,
298    ) -> StatValues {
299        match self {
300            ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
301            ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
302        }
303    }
304
305    /// Returns min/max values of specific columns.
306    /// Returns None if the column does not have statistics.
307    /// The column should not be encoded as a part of a primary key.
308    pub(crate) fn column_values(
309        row_groups: &[impl Borrow<RowGroupMetaData>],
310        column: &ColumnMetadata,
311        column_index: usize,
312        is_min: bool,
313    ) -> Option<ArrayRef> {
314        let null_scalar: ScalarValue = column
315            .column_schema
316            .data_type
317            .as_arrow_type()
318            .try_into()
319            .ok()?;
320        let scalar_values = row_groups
321            .iter()
322            .map(|meta| {
323                let stats = meta.borrow().column(column_index).statistics()?;
324                match stats {
325                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
326                        *s.min_opt()?
327                    } else {
328                        *s.max_opt()?
329                    }))),
330                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
331                        *s.min_opt()?
332                    } else {
333                        *s.max_opt()?
334                    }))),
335                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
336                        *s.min_opt()?
337                    } else {
338                        *s.max_opt()?
339                    }))),
340
341                    Statistics::Int96(_) => None,
342                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
343                        *s.min_opt()?
344                    } else {
345                        *s.max_opt()?
346                    }))),
347                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
348                        *s.min_opt()?
349                    } else {
350                        *s.max_opt()?
351                    }))),
352                    Statistics::ByteArray(s) => {
353                        let bytes = if is_min {
354                            s.min_bytes_opt()?
355                        } else {
356                            s.max_bytes_opt()?
357                        };
358                        let s = String::from_utf8(bytes.to_vec()).ok();
359                        Some(ScalarValue::Utf8(s))
360                    }
361
362                    Statistics::FixedLenByteArray(_) => None,
363                }
364            })
365            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
366            .collect::<Vec<ScalarValue>>();
367        debug_assert_eq!(scalar_values.len(), row_groups.len());
368        ScalarValue::iter_to_array(scalar_values).ok()
369    }
370
371    /// Returns null counts of specific columns.
372    /// The column should not be encoded as a part of a primary key.
373    pub(crate) fn column_null_counts(
374        row_groups: &[impl Borrow<RowGroupMetaData>],
375        column_index: usize,
376    ) -> Option<ArrayRef> {
377        let values = row_groups.iter().map(|meta| {
378            let col = meta.borrow().column(column_index);
379            let stat = col.statistics()?;
380            stat.null_count_opt()
381        });
382        Some(Arc::new(UInt64Array::from_iter(values)))
383    }
384
385    /// Sets the sequence number to override.
386    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
387        match self {
388            ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
389            ReadFormat::Flat(format) => format.set_override_sequence(sequence),
390        }
391    }
392
393    /// Enables or disables eager decoding of primary key values into batches.
394    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
395        if let ReadFormat::PrimaryKey(format) = self {
396            format.set_decode_primary_key_values(decode);
397        }
398    }
399
400    /// Creates a sequence array to override.
401    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
402        match self {
403            ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
404            ReadFormat::Flat(format) => format.new_override_sequence_array(length),
405        }
406    }
407}
408
409/// Helper for reading the SST format.
410pub struct PrimaryKeyReadFormat {
411    /// The metadata stored in the SST.
412    metadata: RegionMetadataRef,
413    /// SST file schema.
414    arrow_schema: SchemaRef,
415    /// Field column id to its index in `schema` (SST schema).
416    /// In SST schema, fields are stored in the front of the schema.
417    field_id_to_index: HashMap<ColumnId, usize>,
418    /// Indices of columns to read from the SST. It contains all internal columns.
419    projection_indices: Vec<usize>,
420    /// Field column id to their index in the projected schema (
421    /// the schema of [Batch]).
422    field_id_to_projected_index: HashMap<ColumnId, usize>,
423    /// Sequence number to override the sequence read from the SST.
424    override_sequence: Option<SequenceNumber>,
425    /// Codec used to decode primary key values if eager decoding is enabled.
426    primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
427}
428
429impl PrimaryKeyReadFormat {
430    /// Creates a helper with existing `metadata` and `column_ids` to read.
431    pub fn new(
432        metadata: RegionMetadataRef,
433        column_ids: impl Iterator<Item = ColumnId>,
434    ) -> PrimaryKeyReadFormat {
435        let field_id_to_index: HashMap<_, _> = metadata
436            .field_columns()
437            .enumerate()
438            .map(|(index, column)| (column.column_id, index))
439            .collect();
440        let arrow_schema = to_sst_arrow_schema(&metadata);
441
442        let format_projection = FormatProjection::compute_format_projection(
443            &field_id_to_index,
444            arrow_schema.fields.len(),
445            column_ids,
446        );
447
448        PrimaryKeyReadFormat {
449            metadata,
450            arrow_schema,
451            field_id_to_index,
452            projection_indices: format_projection.projection_indices,
453            field_id_to_projected_index: format_projection.column_id_to_projected_index,
454            override_sequence: None,
455            primary_key_codec: None,
456        }
457    }
458
459    /// Sets the sequence number to override.
460    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
461        self.override_sequence = sequence;
462    }
463
464    /// Enables or disables eager decoding of primary key values into batches.
465    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
466        self.primary_key_codec = if decode {
467            Some(build_primary_key_codec(&self.metadata))
468        } else {
469            None
470        };
471    }
472
473    /// Gets the arrow schema of the SST file.
474    ///
475    /// This schema is computed from the region metadata but should be the same
476    /// as the arrow schema decoded from the file metadata.
477    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
478        &self.arrow_schema
479    }
480
481    /// Gets the metadata of the SST.
482    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
483        &self.metadata
484    }
485
486    /// Gets sorted projection indices to read.
487    pub(crate) fn projection_indices(&self) -> &[usize] {
488        &self.projection_indices
489    }
490
491    /// Gets the field id to projected index.
492    pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
493        &self.field_id_to_projected_index
494    }
495
496    /// Creates a sequence array to override.
497    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
498        self.override_sequence
499            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
500    }
501
502    /// Convert a arrow record batch into `batches`.
503    ///
504    /// The length of `override_sequence_array` must be larger than the length of the record batch.
505    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
506    pub fn convert_record_batch(
507        &self,
508        record_batch: &RecordBatch,
509        override_sequence_array: Option<&ArrayRef>,
510        batches: &mut VecDeque<Batch>,
511    ) -> Result<()> {
512        debug_assert!(batches.is_empty());
513
514        // The record batch must has time index and internal columns.
515        ensure!(
516            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
517            InvalidRecordBatchSnafu {
518                reason: format!(
519                    "record batch only has {} columns",
520                    record_batch.num_columns()
521                ),
522            }
523        );
524
525        let mut fixed_pos_columns = record_batch
526            .columns()
527            .iter()
528            .rev()
529            .take(FIXED_POS_COLUMN_NUM);
530        // Safety: We have checked the column number.
531        let op_type_array = fixed_pos_columns.next().unwrap();
532        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
533        let pk_array = fixed_pos_columns.next().unwrap();
534        let ts_array = fixed_pos_columns.next().unwrap();
535        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
536
537        // Override sequence array if provided.
538        if let Some(override_array) = override_sequence_array {
539            assert!(override_array.len() >= sequence_array.len());
540            // It's fine to assign the override array directly, but we slice it to make
541            // sure it matches the length of the original sequence array.
542            sequence_array = if override_array.len() > sequence_array.len() {
543                override_array.slice(0, sequence_array.len())
544            } else {
545                override_array.clone()
546            };
547        }
548
549        // Compute primary key offsets.
550        let pk_dict_array = pk_array
551            .as_any()
552            .downcast_ref::<PrimaryKeyArray>()
553            .with_context(|| InvalidRecordBatchSnafu {
554                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
555            })?;
556        let offsets = primary_key_offsets(pk_dict_array)?;
557        if offsets.is_empty() {
558            return Ok(());
559        }
560
561        // Split record batch according to pk offsets.
562        let keys = pk_dict_array.keys();
563        let pk_values = pk_dict_array
564            .values()
565            .as_any()
566            .downcast_ref::<BinaryArray>()
567            .with_context(|| InvalidRecordBatchSnafu {
568                reason: format!(
569                    "values of primary key array should not be {:?}",
570                    pk_dict_array.values().data_type()
571                ),
572            })?;
573        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
574            let end = offsets[i + 1];
575            let rows_in_batch = end - start;
576            let dict_key = keys.value(*start);
577            let primary_key = pk_values.value(dict_key as usize).to_vec();
578
579            let mut builder = BatchBuilder::new(primary_key);
580            builder
581                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
582                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
583                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
584            // Push all fields
585            for batch_column in &field_batch_columns {
586                builder.push_field(BatchColumn {
587                    column_id: batch_column.column_id,
588                    data: batch_column.data.slice(*start, rows_in_batch),
589                });
590            }
591
592            let mut batch = builder.build()?;
593            if let Some(codec) = &self.primary_key_codec {
594                let pk_values: CompositeValues =
595                    codec.decode(batch.primary_key()).context(DecodeSnafu)?;
596                batch.set_pk_values(pk_values);
597            }
598            batches.push_back(batch);
599        }
600
601        Ok(())
602    }
603
604    /// Returns min values of specific column in row groups.
605    pub fn min_values(
606        &self,
607        row_groups: &[impl Borrow<RowGroupMetaData>],
608        column_id: ColumnId,
609    ) -> StatValues {
610        let Some(column) = self.metadata.column_by_id(column_id) else {
611            // No such column in the SST.
612            return StatValues::NoColumn;
613        };
614        match column.semantic_type {
615            SemanticType::Tag => self.tag_values(row_groups, column, true),
616            SemanticType::Field => {
617                // Safety: `field_id_to_index` is initialized by the semantic type.
618                let index = self.field_id_to_index.get(&column_id).unwrap();
619                let stats = ReadFormat::column_values(row_groups, column, *index, true);
620                StatValues::from_stats_opt(stats)
621            }
622            SemanticType::Timestamp => {
623                let index = self.time_index_position();
624                let stats = ReadFormat::column_values(row_groups, column, index, true);
625                StatValues::from_stats_opt(stats)
626            }
627        }
628    }
629
630    /// Returns max values of specific column in row groups.
631    pub fn max_values(
632        &self,
633        row_groups: &[impl Borrow<RowGroupMetaData>],
634        column_id: ColumnId,
635    ) -> StatValues {
636        let Some(column) = self.metadata.column_by_id(column_id) else {
637            // No such column in the SST.
638            return StatValues::NoColumn;
639        };
640        match column.semantic_type {
641            SemanticType::Tag => self.tag_values(row_groups, column, false),
642            SemanticType::Field => {
643                // Safety: `field_id_to_index` is initialized by the semantic type.
644                let index = self.field_id_to_index.get(&column_id).unwrap();
645                let stats = ReadFormat::column_values(row_groups, column, *index, false);
646                StatValues::from_stats_opt(stats)
647            }
648            SemanticType::Timestamp => {
649                let index = self.time_index_position();
650                let stats = ReadFormat::column_values(row_groups, column, index, false);
651                StatValues::from_stats_opt(stats)
652            }
653        }
654    }
655
656    /// Returns null counts of specific column in row groups.
657    pub fn null_counts(
658        &self,
659        row_groups: &[impl Borrow<RowGroupMetaData>],
660        column_id: ColumnId,
661    ) -> StatValues {
662        let Some(column) = self.metadata.column_by_id(column_id) else {
663            // No such column in the SST.
664            return StatValues::NoColumn;
665        };
666        match column.semantic_type {
667            SemanticType::Tag => StatValues::NoStats,
668            SemanticType::Field => {
669                // Safety: `field_id_to_index` is initialized by the semantic type.
670                let index = self.field_id_to_index.get(&column_id).unwrap();
671                let stats = ReadFormat::column_null_counts(row_groups, *index);
672                StatValues::from_stats_opt(stats)
673            }
674            SemanticType::Timestamp => {
675                let index = self.time_index_position();
676                let stats = ReadFormat::column_null_counts(row_groups, index);
677                StatValues::from_stats_opt(stats)
678            }
679        }
680    }
681
682    /// Get fields from `record_batch`.
683    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
684        record_batch
685            .columns()
686            .iter()
687            .zip(record_batch.schema().fields())
688            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
689            .map(|(array, field)| {
690                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
691                let column = self
692                    .metadata
693                    .column_by_name(field.name())
694                    .with_context(|| InvalidRecordBatchSnafu {
695                        reason: format!("column {} not found in metadata", field.name()),
696                    })?;
697
698                Ok(BatchColumn {
699                    column_id: column.column_id,
700                    data: vector,
701                })
702            })
703            .collect()
704    }
705
706    /// Returns min/max values of specific tag.
707    fn tag_values(
708        &self,
709        row_groups: &[impl Borrow<RowGroupMetaData>],
710        column: &ColumnMetadata,
711        is_min: bool,
712    ) -> StatValues {
713        let is_first_tag = self
714            .metadata
715            .primary_key
716            .first()
717            .map(|id| *id == column.column_id)
718            .unwrap_or(false);
719        if !is_first_tag {
720            // Only the min-max of the first tag is available in the primary key.
721            return StatValues::NoStats;
722        }
723
724        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
725    }
726
727    /// Returns min/max values of the first tag.
728    /// Returns None if the tag does not have statistics.
729    fn first_tag_values(
730        &self,
731        row_groups: &[impl Borrow<RowGroupMetaData>],
732        column: &ColumnMetadata,
733        is_min: bool,
734    ) -> Option<ArrayRef> {
735        debug_assert!(
736            self.metadata
737                .primary_key
738                .first()
739                .map(|id| *id == column.column_id)
740                .unwrap_or(false)
741        );
742
743        let primary_key_encoding = self.metadata.primary_key_encoding;
744        let converter = build_primary_key_codec_with_fields(
745            primary_key_encoding,
746            [(
747                column.column_id,
748                SortField::new(column.column_schema.data_type.clone()),
749            )]
750            .into_iter(),
751        );
752
753        let values = row_groups.iter().map(|meta| {
754            let stats = meta
755                .borrow()
756                .column(self.primary_key_position())
757                .statistics()?;
758            match stats {
759                Statistics::Boolean(_) => None,
760                Statistics::Int32(_) => None,
761                Statistics::Int64(_) => None,
762                Statistics::Int96(_) => None,
763                Statistics::Float(_) => None,
764                Statistics::Double(_) => None,
765                Statistics::ByteArray(s) => {
766                    let bytes = if is_min {
767                        s.min_bytes_opt()?
768                    } else {
769                        s.max_bytes_opt()?
770                    };
771                    converter.decode_leftmost(bytes).ok()?
772                }
773                Statistics::FixedLenByteArray(_) => None,
774            }
775        });
776        let mut builder = column
777            .column_schema
778            .data_type
779            .create_mutable_vector(row_groups.len());
780        for value_opt in values {
781            match value_opt {
782                // Safety: We use the same data type to create the converter.
783                Some(v) => builder.push_value_ref(&v.as_value_ref()),
784                None => builder.push_null(),
785            }
786        }
787        let vector = builder.to_vector();
788
789        Some(vector.to_arrow_array())
790    }
791
792    /// Index in SST of the primary key.
793    fn primary_key_position(&self) -> usize {
794        self.arrow_schema.fields.len() - 3
795    }
796
797    /// Index in SST of the time index.
798    fn time_index_position(&self) -> usize {
799        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
800    }
801
802    /// Index of a field column by its column id.
803    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
804        self.field_id_to_projected_index.get(&column_id).copied()
805    }
806}
807
808/// Helper to compute the projection for the SST.
809pub(crate) struct FormatProjection {
810    /// Indices of columns to read from the SST. It contains all internal columns.
811    pub(crate) projection_indices: Vec<usize>,
812    /// Column id to their index in the projected schema (
813    /// the schema after projection).
814    ///
815    /// It doesn't contain time index column if it is not present in the projection.
816    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
817}
818
819impl FormatProjection {
820    /// Computes the projection.
821    ///
822    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
823    pub(crate) fn compute_format_projection(
824        id_to_index: &HashMap<ColumnId, usize>,
825        sst_column_num: usize,
826        column_ids: impl Iterator<Item = ColumnId>,
827    ) -> Self {
828        // Maps column id of a projected column to its index in SST.
829        // It also ignores columns not in the SST.
830        // [(column id, index in SST)]
831        let mut projected_schema: Vec<_> = column_ids
832            .filter_map(|column_id| {
833                id_to_index
834                    .get(&column_id)
835                    .copied()
836                    .map(|index| (column_id, index))
837            })
838            .collect();
839        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
840        // This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
841        projected_schema.sort_unstable_by_key(|x| x.1);
842        // Dedups the entries to avoid the case that `column_ids` has duplicated columns.
843        projected_schema.dedup_by_key(|x| x.1);
844
845        // Collects all projected indices.
846        // It contains the positions of all columns we need to read.
847        let mut projection_indices: Vec<_> = projected_schema
848            .iter()
849            .map(|(_column_id, index)| *index)
850            // We need to add all fixed position columns.
851            .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
852            .collect();
853        projection_indices.sort_unstable();
854        // Removes duplications.
855        projection_indices.dedup();
856
857        // Creates a map from column id to the index of that column in the projected record batch.
858        let column_id_to_projected_index = projected_schema
859            .into_iter()
860            .map(|(column_id, _)| column_id)
861            .enumerate()
862            .map(|(index, column_id)| (column_id, index))
863            .collect();
864
865        Self {
866            projection_indices,
867            column_id_to_projected_index,
868        }
869    }
870}
871
872/// Values of column statistics of the SST.
873///
874/// It also distinguishes the case that a column is not found and
875/// the column exists but has no statistics.
876pub enum StatValues {
877    /// Values of each row group.
878    Values(ArrayRef),
879    /// No such column.
880    NoColumn,
881    /// Column exists but has no statistics.
882    NoStats,
883}
884
885impl StatValues {
886    /// Creates a new `StatValues` instance from optional statistics.
887    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
888        match stats {
889            Some(stats) => StatValues::Values(stats),
890            None => StatValues::NoStats,
891        }
892    }
893}
894
895#[cfg(test)]
896impl PrimaryKeyReadFormat {
897    /// Creates a helper with existing `metadata` and all columns.
898    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
899        Self::new(
900            Arc::clone(&metadata),
901            metadata.column_metadatas.iter().map(|c| c.column_id),
902        )
903    }
904}
905
906/// Compute offsets of different primary keys in the array.
907fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
908    if pk_dict_array.is_empty() {
909        return Ok(Vec::new());
910    }
911
912    // Init offsets.
913    let mut offsets = vec![0];
914    let keys = pk_dict_array.keys();
915    // We know that primary keys are always not null so we iterate `keys.values()` directly.
916    let pk_indices = keys.values();
917    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
918        // Compare each key with next key
919        if *key != pk_indices[i + 1] {
920            // We meet a new key, push the next index as end of the offset.
921            offsets.push(i + 1);
922        }
923    }
924    offsets.push(keys.len());
925
926    Ok(offsets)
927}
928
929/// Creates a new array for specific `primary_key`.
930fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
931    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
932    let keys = UInt32Array::from_value(0, num_rows);
933
934    // Safety: The key index is valid.
935    Arc::new(DictionaryArray::new(keys, values))
936}
937
938/// Gets the min/max time index of the row group from the parquet meta.
939/// It assumes the parquet is created by the mito engine.
940pub(crate) fn parquet_row_group_time_range(
941    file_meta: &FileMeta,
942    parquet_meta: &ParquetMetaData,
943    row_group_idx: usize,
944) -> Option<FileTimeRange> {
945    let row_group_meta = parquet_meta.row_group(row_group_idx);
946    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
947    assert!(
948        num_columns >= FIXED_POS_COLUMN_NUM,
949        "file only has {} columns",
950        num_columns
951    );
952    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
953
954    let stats = row_group_meta.column(time_index_pos).statistics()?;
955    // The physical type for the timestamp should be i64.
956    let (min, max) = match stats {
957        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
958        Statistics::Int32(_)
959        | Statistics::Boolean(_)
960        | Statistics::Int96(_)
961        | Statistics::Float(_)
962        | Statistics::Double(_)
963        | Statistics::ByteArray(_)
964        | Statistics::FixedLenByteArray(_) => {
965            common_telemetry::warn!(
966                "Invalid statistics {:?} for time index in parquet in {}",
967                stats,
968                file_meta.file_id
969            );
970            return None;
971        }
972    };
973
974    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
975    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
976    let unit = file_meta.time_range.0.unit();
977
978    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
979}
980
981/// Checks if sequence override is needed based on all row groups' statistics.
982/// Returns true if ALL row groups have sequence min-max values of 0.
983pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
984    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
985    if num_columns < FIXED_POS_COLUMN_NUM {
986        return false;
987    }
988
989    // The sequence column is the second-to-last column (before op_type)
990    let sequence_pos = num_columns - 2;
991
992    // Check all row groups - all must have sequence min-max of 0
993    for row_group in parquet_meta.row_groups() {
994        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
995            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
996                // If any row group doesn't have min=0 and max=0, return false
997                if *min_val != 0 || *max_val != 0 {
998                    return false;
999                }
1000            } else {
1001                // If any row group doesn't have statistics, return false
1002                return false;
1003            }
1004        } else {
1005            // If any row group doesn't have Int64 statistics, return false
1006            return false;
1007        }
1008    }
1009
1010    // All row groups have sequence min-max of 0, or there are no row groups
1011    !parquet_meta.row_groups().is_empty()
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016    use std::sync::Arc;
1017
1018    use api::v1::OpType;
1019    use datatypes::arrow::array::{
1020        Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
1021    };
1022    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
1023    use datatypes::prelude::ConcreteDataType;
1024    use datatypes::schema::ColumnSchema;
1025    use datatypes::value::ValueRef;
1026    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1027    use mito_codec::row_converter::{
1028        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1029    };
1030    use store_api::codec::PrimaryKeyEncoding;
1031    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1032    use store_api::storage::RegionId;
1033    use store_api::storage::consts::ReservedColumnId;
1034
1035    use super::*;
1036    use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1037    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1038
1039    const TEST_SEQUENCE: u64 = 1;
1040    const TEST_OP_TYPE: u8 = OpType::Put as u8;
1041
1042    fn build_test_region_metadata() -> RegionMetadataRef {
1043        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1044        builder
1045            .push_column_metadata(ColumnMetadata {
1046                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1047                semantic_type: SemanticType::Tag,
1048                column_id: 1,
1049            })
1050            .push_column_metadata(ColumnMetadata {
1051                column_schema: ColumnSchema::new(
1052                    "field1",
1053                    ConcreteDataType::int64_datatype(),
1054                    true,
1055                ),
1056                semantic_type: SemanticType::Field,
1057                column_id: 4, // We change the order of fields columns.
1058            })
1059            .push_column_metadata(ColumnMetadata {
1060                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1061                semantic_type: SemanticType::Tag,
1062                column_id: 3,
1063            })
1064            .push_column_metadata(ColumnMetadata {
1065                column_schema: ColumnSchema::new(
1066                    "field0",
1067                    ConcreteDataType::int64_datatype(),
1068                    true,
1069                ),
1070                semantic_type: SemanticType::Field,
1071                column_id: 2,
1072            })
1073            .push_column_metadata(ColumnMetadata {
1074                column_schema: ColumnSchema::new(
1075                    "ts",
1076                    ConcreteDataType::timestamp_millisecond_datatype(),
1077                    false,
1078                ),
1079                semantic_type: SemanticType::Timestamp,
1080                column_id: 5,
1081            })
1082            .primary_key(vec![1, 3]);
1083        Arc::new(builder.build().unwrap())
1084    }
1085
1086    fn build_test_arrow_schema() -> SchemaRef {
1087        let fields = vec![
1088            Field::new("field1", ArrowDataType::Int64, true),
1089            Field::new("field0", ArrowDataType::Int64, true),
1090            Field::new(
1091                "ts",
1092                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1093                false,
1094            ),
1095            Field::new(
1096                "__primary_key",
1097                ArrowDataType::Dictionary(
1098                    Box::new(ArrowDataType::UInt32),
1099                    Box::new(ArrowDataType::Binary),
1100                ),
1101                false,
1102            ),
1103            Field::new("__sequence", ArrowDataType::UInt64, false),
1104            Field::new("__op_type", ArrowDataType::UInt8, false),
1105        ];
1106        Arc::new(Schema::new(fields))
1107    }
1108
1109    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1110        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1111    }
1112
1113    fn new_batch_with_sequence(
1114        primary_key: &[u8],
1115        start_ts: i64,
1116        start_field: i64,
1117        num_rows: usize,
1118        sequence: u64,
1119    ) -> Batch {
1120        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1121        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1122        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1123        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1124        let fields = vec![
1125            BatchColumn {
1126                column_id: 4,
1127                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1128            }, // field1
1129            BatchColumn {
1130                column_id: 2,
1131                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1132            }, // field0
1133        ];
1134
1135        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1136            .with_fields(fields)
1137            .build()
1138            .unwrap()
1139    }
1140
1141    #[test]
1142    fn test_to_sst_arrow_schema() {
1143        let metadata = build_test_region_metadata();
1144        let write_format = PrimaryKeyWriteFormat::new(metadata);
1145        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1146    }
1147
1148    #[test]
1149    fn test_new_primary_key_array() {
1150        let array = new_primary_key_array(b"test", 3);
1151        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1152        assert_eq!(&expect, &array);
1153    }
1154
1155    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1156        let values = Arc::new(BinaryArray::from_iter_values(
1157            pk_row_nums.iter().map(|v| &v.0),
1158        ));
1159        let mut keys = vec![];
1160        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1161            keys.extend(std::iter::repeat_n(index as u32, num_rows));
1162        }
1163        let keys = UInt32Array::from(keys);
1164        Arc::new(DictionaryArray::new(keys, values))
1165    }
1166
1167    #[test]
1168    fn test_convert_batch() {
1169        let metadata = build_test_region_metadata();
1170        let write_format = PrimaryKeyWriteFormat::new(metadata);
1171
1172        let num_rows = 4;
1173        let batch = new_batch(b"test", 1, 2, num_rows);
1174        let columns: Vec<ArrayRef> = vec![
1175            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1176            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1177            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1178            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1179            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1180            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1181        ];
1182        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1183
1184        let actual = write_format.convert_batch(&batch).unwrap();
1185        assert_eq!(expect_record, actual);
1186    }
1187
1188    #[test]
1189    fn test_convert_batch_with_override_sequence() {
1190        let metadata = build_test_region_metadata();
1191        let write_format =
1192            PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1193
1194        let num_rows = 4;
1195        let batch = new_batch(b"test", 1, 2, num_rows);
1196        let columns: Vec<ArrayRef> = vec![
1197            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1198            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1199            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1200            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1201            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
1202            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1203        ];
1204        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1205
1206        let actual = write_format.convert_batch(&batch).unwrap();
1207        assert_eq!(expect_record, actual);
1208    }
1209
1210    #[test]
1211    fn test_projection_indices() {
1212        let metadata = build_test_region_metadata();
1213        // Only read tag1
1214        let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1215        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1216        // Only read field1
1217        let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1218        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1219        // Only read ts
1220        let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1221        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1222        // Read field0, tag0, ts
1223        let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1224        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1225    }
1226
1227    #[test]
1228    fn test_empty_primary_key_offsets() {
1229        let array = build_test_pk_array(&[]);
1230        assert!(primary_key_offsets(&array).unwrap().is_empty());
1231    }
1232
1233    #[test]
1234    fn test_primary_key_offsets_one_series() {
1235        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1236        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1237
1238        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1239        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1240
1241        let array = build_test_pk_array(&[
1242            (b"one".to_vec(), 1),
1243            (b"two".to_vec(), 1),
1244            (b"three".to_vec(), 1),
1245        ]);
1246        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1247    }
1248
1249    #[test]
1250    fn test_primary_key_offsets_multi_series() {
1251        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1252        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1253
1254        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1255        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1256
1257        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1258        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1259    }
1260
1261    #[test]
1262    fn test_convert_empty_record_batch() {
1263        let metadata = build_test_region_metadata();
1264        let arrow_schema = build_test_arrow_schema();
1265        let column_ids: Vec<_> = metadata
1266            .column_metadatas
1267            .iter()
1268            .map(|col| col.column_id)
1269            .collect();
1270        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1271        assert_eq!(arrow_schema, *read_format.arrow_schema());
1272
1273        let record_batch = RecordBatch::new_empty(arrow_schema);
1274        let mut batches = VecDeque::new();
1275        read_format
1276            .convert_record_batch(&record_batch, None, &mut batches)
1277            .unwrap();
1278        assert!(batches.is_empty());
1279    }
1280
1281    #[test]
1282    fn test_convert_record_batch() {
1283        let metadata = build_test_region_metadata();
1284        let column_ids: Vec<_> = metadata
1285            .column_metadatas
1286            .iter()
1287            .map(|col| col.column_id)
1288            .collect();
1289        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1290
1291        let columns: Vec<ArrayRef> = vec![
1292            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1293            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1294            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1295            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1296            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1297            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1298        ];
1299        let arrow_schema = build_test_arrow_schema();
1300        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1301        let mut batches = VecDeque::new();
1302        read_format
1303            .convert_record_batch(&record_batch, None, &mut batches)
1304            .unwrap();
1305
1306        assert_eq!(
1307            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1308            batches.into_iter().collect::<Vec<_>>(),
1309        );
1310    }
1311
1312    #[test]
1313    fn test_convert_record_batch_with_override_sequence() {
1314        let metadata = build_test_region_metadata();
1315        let column_ids: Vec<_> = metadata
1316            .column_metadatas
1317            .iter()
1318            .map(|col| col.column_id)
1319            .collect();
1320        let read_format =
1321            ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1322
1323        let columns: Vec<ArrayRef> = vec![
1324            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1325            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1326            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1327            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1328            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1329            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1330        ];
1331        let arrow_schema = build_test_arrow_schema();
1332        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1333
1334        // Create override sequence array with custom values
1335        let override_sequence: u64 = 12345;
1336        let override_sequence_array: ArrayRef =
1337            Arc::new(UInt64Array::from_value(override_sequence, 4));
1338
1339        let mut batches = VecDeque::new();
1340        read_format
1341            .as_primary_key()
1342            .unwrap()
1343            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1344            .unwrap();
1345
1346        // Create expected batches with override sequence
1347        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1348        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1349
1350        assert_eq!(
1351            vec![expected_batch1, expected_batch2],
1352            batches.into_iter().collect::<Vec<_>>(),
1353        );
1354    }
1355
1356    fn build_test_flat_sst_schema() -> SchemaRef {
1357        let fields = vec![
1358            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1359            Field::new("tag1", ArrowDataType::Int64, true),
1360            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1361            Field::new("field0", ArrowDataType::Int64, true),
1362            Field::new(
1363                "ts",
1364                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1365                false,
1366            ),
1367            Field::new(
1368                "__primary_key",
1369                ArrowDataType::Dictionary(
1370                    Box::new(ArrowDataType::UInt32),
1371                    Box::new(ArrowDataType::Binary),
1372                ),
1373                false,
1374            ),
1375            Field::new("__sequence", ArrowDataType::UInt64, false),
1376            Field::new("__op_type", ArrowDataType::UInt8, false),
1377        ];
1378        Arc::new(Schema::new(fields))
1379    }
1380
1381    #[test]
1382    fn test_flat_to_sst_arrow_schema() {
1383        let metadata = build_test_region_metadata();
1384        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1385        assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1386    }
1387
1388    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1389        vec![
1390            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1391            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1392            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1393            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1394            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1395            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1396            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1397            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1398        ]
1399    }
1400
1401    #[test]
1402    fn test_flat_convert_batch() {
1403        let metadata = build_test_region_metadata();
1404        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1405
1406        let num_rows = 4;
1407        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1408        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1409        let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1410
1411        let actual = format.convert_batch(&batch).unwrap();
1412        assert_eq!(expect_record, actual);
1413    }
1414
1415    #[test]
1416    fn test_flat_convert_with_override_sequence() {
1417        let metadata = build_test_region_metadata();
1418        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1419            .with_override_sequence(Some(415411));
1420
1421        let num_rows = 4;
1422        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1423        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1424
1425        let expected_columns: Vec<ArrayRef> = vec![
1426            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1427            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1428            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1429            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1430            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1431            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1432            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1433            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1434        ];
1435        let expected_record =
1436            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1437
1438        let actual = format.convert_batch(&batch).unwrap();
1439        assert_eq!(expected_record, actual);
1440    }
1441
1442    #[test]
1443    fn test_flat_projection_indices() {
1444        let metadata = build_test_region_metadata();
1445        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1446        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1447
1448        // Only read tag1 (column_id=3, index=1) + fixed columns
1449        let read_format =
1450            ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1451                .unwrap();
1452        assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1453
1454        // Only read field1 (column_id=4, index=2) + fixed columns
1455        let read_format =
1456            ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1457                .unwrap();
1458        assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1459
1460        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1461        let read_format =
1462            ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1463                .unwrap();
1464        assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1465
1466        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1467        let read_format =
1468            ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1469        assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1470    }
1471
1472    #[test]
1473    fn test_flat_read_format_convert_batch() {
1474        let metadata = build_test_region_metadata();
1475        let mut format = FlatReadFormat::new(
1476            metadata,
1477            std::iter::once(1), // Just read tag0
1478            Some(8),
1479            "test",
1480            false,
1481        )
1482        .unwrap();
1483
1484        let num_rows = 4;
1485        let original_sequence = 100u64;
1486        let override_sequence = 200u64;
1487
1488        // Create a test record batch
1489        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1490        let mut test_columns = columns.clone();
1491        // Replace sequence column with original sequence values
1492        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1493        let record_batch =
1494            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1495
1496        // Test without override sequence - should return clone
1497        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1498        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1499        let sequence_array = sequence_column
1500            .as_any()
1501            .downcast_ref::<UInt64Array>()
1502            .unwrap();
1503
1504        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1505        assert_eq!(sequence_array, &expected_original);
1506
1507        // Set override sequence and test with new_override_sequence_array
1508        format.set_override_sequence(Some(override_sequence));
1509        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1510        let result = format
1511            .convert_batch(record_batch, Some(&override_sequence_array))
1512            .unwrap();
1513        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1514        let sequence_array = sequence_column
1515            .as_any()
1516            .downcast_ref::<UInt64Array>()
1517            .unwrap();
1518
1519        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1520        assert_eq!(sequence_array, &expected_override);
1521    }
1522
1523    #[test]
1524    fn test_need_convert_to_flat() {
1525        let metadata = build_test_region_metadata();
1526
1527        // Test case 1: Same number of columns, no conversion needed
1528        // For flat format: all columns (5) + internal columns (3)
1529        let expected_columns = metadata.column_metadatas.len() + 3;
1530        let result =
1531            FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1532        assert!(
1533            !result,
1534            "Should not need conversion when column counts match"
1535        );
1536
1537        // Test case 2: Different number of columns, need conversion
1538        // Missing primary key columns (2 primary keys in test metadata)
1539        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1540        let result =
1541            FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1542                .unwrap();
1543        assert!(
1544            result,
1545            "Should need conversion when primary key columns are missing"
1546        );
1547
1548        // Test case 3: Invalid case - actual columns more than expected
1549        let too_many_columns = expected_columns + 1;
1550        let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1551            .unwrap_err();
1552        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1553
1554        // Test case 4: Invalid case - column difference doesn't match primary key count
1555        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1556        let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1557            .unwrap_err();
1558        assert!(
1559            err.to_string().contains("Column number difference"),
1560            "{err:?}"
1561        );
1562    }
1563
1564    fn build_test_dense_pk_array(
1565        codec: &DensePrimaryKeyCodec,
1566        pk_values_per_row: &[&[Option<i64>]],
1567    ) -> Arc<PrimaryKeyArray> {
1568        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1569
1570        for pk_values_row in pk_values_per_row {
1571            let values: Vec<ValueRef> = pk_values_row
1572                .iter()
1573                .map(|opt| match opt {
1574                    Some(val) => ValueRef::Int64(*val),
1575                    None => ValueRef::Null,
1576                })
1577                .collect();
1578
1579            let encoded = codec.encode(values.into_iter()).unwrap();
1580            builder.append_value(&encoded);
1581        }
1582
1583        Arc::new(builder.finish())
1584    }
1585
1586    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1587        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1588        builder
1589            .push_column_metadata(ColumnMetadata {
1590                column_schema: ColumnSchema::new(
1591                    "__table_id",
1592                    ConcreteDataType::uint32_datatype(),
1593                    false,
1594                ),
1595                semantic_type: SemanticType::Tag,
1596                column_id: ReservedColumnId::table_id(),
1597            })
1598            .push_column_metadata(ColumnMetadata {
1599                column_schema: ColumnSchema::new(
1600                    "__tsid",
1601                    ConcreteDataType::uint64_datatype(),
1602                    false,
1603                ),
1604                semantic_type: SemanticType::Tag,
1605                column_id: ReservedColumnId::tsid(),
1606            })
1607            .push_column_metadata(ColumnMetadata {
1608                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1609                semantic_type: SemanticType::Tag,
1610                column_id: 1,
1611            })
1612            .push_column_metadata(ColumnMetadata {
1613                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1614                semantic_type: SemanticType::Tag,
1615                column_id: 3,
1616            })
1617            .push_column_metadata(ColumnMetadata {
1618                column_schema: ColumnSchema::new(
1619                    "field1",
1620                    ConcreteDataType::int64_datatype(),
1621                    true,
1622                ),
1623                semantic_type: SemanticType::Field,
1624                column_id: 4,
1625            })
1626            .push_column_metadata(ColumnMetadata {
1627                column_schema: ColumnSchema::new(
1628                    "field0",
1629                    ConcreteDataType::int64_datatype(),
1630                    true,
1631                ),
1632                semantic_type: SemanticType::Field,
1633                column_id: 2,
1634            })
1635            .push_column_metadata(ColumnMetadata {
1636                column_schema: ColumnSchema::new(
1637                    "ts",
1638                    ConcreteDataType::timestamp_millisecond_datatype(),
1639                    false,
1640                ),
1641                semantic_type: SemanticType::Timestamp,
1642                column_id: 5,
1643            })
1644            .primary_key(vec![
1645                ReservedColumnId::table_id(),
1646                ReservedColumnId::tsid(),
1647                1,
1648                3,
1649            ])
1650            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1651        Arc::new(builder.build().unwrap())
1652    }
1653
1654    fn build_test_sparse_pk_array(
1655        codec: &SparsePrimaryKeyCodec,
1656        pk_values_per_row: &[SparseTestRow],
1657    ) -> Arc<PrimaryKeyArray> {
1658        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1659        for row in pk_values_per_row {
1660            let values = vec![
1661                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1662                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1663                (1, ValueRef::String(&row.tag0)),
1664                (3, ValueRef::String(&row.tag1)),
1665            ];
1666
1667            let mut buffer = Vec::new();
1668            codec.encode_value_refs(&values, &mut buffer).unwrap();
1669            builder.append_value(&buffer);
1670        }
1671
1672        Arc::new(builder.finish())
1673    }
1674
1675    #[derive(Clone)]
1676    struct SparseTestRow {
1677        table_id: u32,
1678        tsid: u64,
1679        tag0: String,
1680        tag1: String,
1681    }
1682
1683    #[test]
1684    fn test_flat_read_format_convert_format_with_dense_encoding() {
1685        let metadata = build_test_region_metadata();
1686
1687        let column_ids: Vec<_> = metadata
1688            .column_metadatas
1689            .iter()
1690            .map(|c| c.column_id)
1691            .collect();
1692        let format = FlatReadFormat::new(
1693            metadata.clone(),
1694            column_ids.into_iter(),
1695            Some(6),
1696            "test",
1697            false,
1698        )
1699        .unwrap();
1700
1701        let num_rows = 4;
1702        let original_sequence = 100u64;
1703
1704        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1705        let pk_values_per_row = vec![
1706                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1707            ];
1708
1709        // Create a test record batch in old format using dense encoding
1710        let codec = DensePrimaryKeyCodec::new(&metadata);
1711        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1712        let columns: Vec<ArrayRef> = vec![
1713            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1714            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1715            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1716            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1717            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1718            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1719        ];
1720
1721        // Create schema for old format (without primary key columns)
1722        let old_format_fields = vec![
1723            Field::new("field1", ArrowDataType::Int64, true),
1724            Field::new("field0", ArrowDataType::Int64, true),
1725            Field::new(
1726                "ts",
1727                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1728                false,
1729            ),
1730            Field::new(
1731                "__primary_key",
1732                ArrowDataType::Dictionary(
1733                    Box::new(ArrowDataType::UInt32),
1734                    Box::new(ArrowDataType::Binary),
1735                ),
1736                false,
1737            ),
1738            Field::new("__sequence", ArrowDataType::UInt64, false),
1739            Field::new("__op_type", ArrowDataType::UInt8, false),
1740        ];
1741        let old_schema = Arc::new(Schema::new(old_format_fields));
1742        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1743
1744        // Test conversion with dense encoding
1745        let result = format.convert_batch(record_batch, None).unwrap();
1746
1747        // Construct expected RecordBatch in flat format with decoded primary key columns
1748        let expected_columns: Vec<ArrayRef> = vec![
1749            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1750            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1751            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1752            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1753            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1754            dense_pk_array,                                // __primary_key (preserved)
1755            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1756            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1757        ];
1758        let expected_record_batch =
1759            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1760
1761        // Compare the actual result with the expected record batch
1762        assert_eq!(expected_record_batch, result);
1763    }
1764
1765    #[test]
1766    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1767        let metadata = build_test_sparse_region_metadata();
1768
1769        let column_ids: Vec<_> = metadata
1770            .column_metadatas
1771            .iter()
1772            .map(|c| c.column_id)
1773            .collect();
1774        let format = FlatReadFormat::new(
1775            metadata.clone(),
1776            column_ids.clone().into_iter(),
1777            None,
1778            "test",
1779            false,
1780        )
1781        .unwrap();
1782
1783        let num_rows = 4;
1784        let original_sequence = 100u64;
1785
1786        // Create sparse test data with table_id, tsid and string tags
1787        let pk_test_rows = vec![
1788            SparseTestRow {
1789                table_id: 1,
1790                tsid: 123,
1791                tag0: "frontend".to_string(),
1792                tag1: "pod1".to_string(),
1793            };
1794            num_rows
1795        ];
1796
1797        let codec = SparsePrimaryKeyCodec::new(&metadata);
1798        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1799        // Create a test record batch in old format using sparse encoding
1800        let columns: Vec<ArrayRef> = vec![
1801            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1802            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1803            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1804            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1805            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1806            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1807        ];
1808
1809        // Create schema for old format (without primary key columns)
1810        let old_format_fields = vec![
1811            Field::new("field1", ArrowDataType::Int64, true),
1812            Field::new("field0", ArrowDataType::Int64, true),
1813            Field::new(
1814                "ts",
1815                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1816                false,
1817            ),
1818            Field::new(
1819                "__primary_key",
1820                ArrowDataType::Dictionary(
1821                    Box::new(ArrowDataType::UInt32),
1822                    Box::new(ArrowDataType::Binary),
1823                ),
1824                false,
1825            ),
1826            Field::new("__sequence", ArrowDataType::UInt64, false),
1827            Field::new("__op_type", ArrowDataType::UInt8, false),
1828        ];
1829        let old_schema = Arc::new(Schema::new(old_format_fields));
1830        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1831
1832        // Test conversion with sparse encoding
1833        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1834
1835        // Construct expected RecordBatch in flat format with decoded primary key columns
1836        let tag0_array = Arc::new(DictionaryArray::new(
1837            UInt32Array::from(vec![0; num_rows]),
1838            Arc::new(StringArray::from(vec!["frontend"])),
1839        ));
1840        let tag1_array = Arc::new(DictionaryArray::new(
1841            UInt32Array::from(vec![0; num_rows]),
1842            Arc::new(StringArray::from(vec!["pod1"])),
1843        ));
1844        let expected_columns: Vec<ArrayRef> = vec![
1845            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1846            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1847            tag0_array,                                     // tag0 (decoded from primary key)
1848            tag1_array,                                     // tag1 (decoded from primary key)
1849            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1850            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1851            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1852            sparse_pk_array,                                // __primary_key (preserved)
1853            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1854            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1855        ];
1856        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1857        let expected_record_batch =
1858            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1859
1860        // Compare the actual result with the expected record batch
1861        assert_eq!(expected_record_batch, result);
1862
1863        let format =
1864            FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1865                .unwrap();
1866        // Test conversion with sparse encoding and skip convert.
1867        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1868        assert_eq!(record_batch, result);
1869    }
1870}