mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
44use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
45use parquet::file::statistics::Statistics;
46use snafu::{ensure, OptionExt, ResultExt};
47use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
52};
53use crate::read::{Batch, BatchBuilder, BatchColumn};
54use crate::sst::file::{FileMeta, FileTimeRange};
55use crate::sst::parquet::flat_format::FlatReadFormat;
56use crate::sst::to_sst_arrow_schema;
57
58/// Arrow array type for the primary key dictionary.
59pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
60/// Builder type for primary key dictionary array.
61pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
62
63/// Number of columns that have fixed positions.
64///
65/// Contains: time index and internal columns.
66pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
67/// Number of internal columns.
68pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
69
70/// Helper for writing the SST format with primary key.
71pub(crate) struct PrimaryKeyWriteFormat {
72    metadata: RegionMetadataRef,
73    /// SST file schema.
74    arrow_schema: SchemaRef,
75    override_sequence: Option<SequenceNumber>,
76}
77
78impl PrimaryKeyWriteFormat {
79    /// Creates a new helper.
80    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
81        let arrow_schema = to_sst_arrow_schema(&metadata);
82        PrimaryKeyWriteFormat {
83            metadata,
84            arrow_schema,
85            override_sequence: None,
86        }
87    }
88
89    /// Set override sequence.
90    pub(crate) fn with_override_sequence(
91        mut self,
92        override_sequence: Option<SequenceNumber>,
93    ) -> Self {
94        self.override_sequence = override_sequence;
95        self
96    }
97
98    /// Gets the arrow schema to store in parquet.
99    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
100        &self.arrow_schema
101    }
102
103    /// Convert `batch` to a arrow record batch to store in parquet.
104    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
105        debug_assert_eq!(
106            batch.fields().len() + FIXED_POS_COLUMN_NUM,
107            self.arrow_schema.fields().len()
108        );
109        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
110        // Store all fields first.
111        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
112            ensure!(
113                column.column_id == column_metadata.column_id,
114                InvalidBatchSnafu {
115                    reason: format!(
116                        "Batch has column {} but metadata has column {}",
117                        column.column_id, column_metadata.column_id
118                    ),
119                }
120            );
121
122            columns.push(column.data.to_arrow_array());
123        }
124        // Add time index column.
125        columns.push(batch.timestamps().to_arrow_array());
126        // Add internal columns: primary key, sequences, op types.
127        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
128
129        if let Some(override_sequence) = self.override_sequence {
130            let sequence_array =
131                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
132            columns.push(sequence_array);
133        } else {
134            columns.push(batch.sequences().to_arrow_array());
135        }
136        columns.push(batch.op_types().to_arrow_array());
137
138        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
139    }
140}
141
142/// Helper to read parquet formats.
143pub enum ReadFormat {
144    PrimaryKey(PrimaryKeyReadFormat),
145    Flat(FlatReadFormat),
146}
147
148impl ReadFormat {
149    pub(crate) fn new(
150        metadata: RegionMetadataRef,
151        column_ids: impl Iterator<Item = ColumnId>,
152        flat_format: bool,
153    ) -> Self {
154        if flat_format {
155            Self::new_flat(metadata, column_ids, false)
156        } else {
157            Self::new_primary_key(metadata, column_ids)
158        }
159    }
160
161    /// Creates a helper to read the primary key format.
162    pub fn new_primary_key(
163        metadata: RegionMetadataRef,
164        column_ids: impl Iterator<Item = ColumnId>,
165    ) -> Self {
166        ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
167    }
168
169    /// Creates a helper to read the flat format.
170    pub fn new_flat(
171        metadata: RegionMetadataRef,
172        column_ids: impl Iterator<Item = ColumnId>,
173        convert_to_flat: bool,
174    ) -> Self {
175        ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat))
176    }
177
178    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
179        match self {
180            ReadFormat::PrimaryKey(format) => Some(format),
181            _ => None,
182        }
183    }
184
185    pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
186        match self {
187            ReadFormat::Flat(format) => Some(format),
188            _ => None,
189        }
190    }
191
192    /// Gets the arrow schema of the SST file.
193    ///
194    /// This schema is computed from the region metadata but should be the same
195    /// as the arrow schema decoded from the file metadata.
196    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
197        match self {
198            ReadFormat::PrimaryKey(format) => format.arrow_schema(),
199            ReadFormat::Flat(format) => format.arrow_schema(),
200        }
201    }
202
203    /// Gets the metadata of the SST.
204    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
205        match self {
206            ReadFormat::PrimaryKey(format) => format.metadata(),
207            ReadFormat::Flat(format) => format.metadata(),
208        }
209    }
210
211    /// Gets sorted projection indices to read.
212    pub(crate) fn projection_indices(&self) -> &[usize] {
213        match self {
214            ReadFormat::PrimaryKey(format) => format.projection_indices(),
215            ReadFormat::Flat(format) => format.projection_indices(),
216        }
217    }
218
219    /// Returns min values of specific column in row groups.
220    pub fn min_values(
221        &self,
222        row_groups: &[impl Borrow<RowGroupMetaData>],
223        column_id: ColumnId,
224    ) -> StatValues {
225        match self {
226            ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
227            ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
228        }
229    }
230
231    /// Returns max values of specific column in row groups.
232    pub fn max_values(
233        &self,
234        row_groups: &[impl Borrow<RowGroupMetaData>],
235        column_id: ColumnId,
236    ) -> StatValues {
237        match self {
238            ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
239            ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
240        }
241    }
242
243    /// Returns null counts of specific column in row groups.
244    pub fn null_counts(
245        &self,
246        row_groups: &[impl Borrow<RowGroupMetaData>],
247        column_id: ColumnId,
248    ) -> StatValues {
249        match self {
250            ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
251            ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
252        }
253    }
254
255    /// Returns min/max values of specific columns.
256    /// Returns None if the column does not have statistics.
257    /// The column should not be encoded as a part of a primary key.
258    pub(crate) fn column_values(
259        row_groups: &[impl Borrow<RowGroupMetaData>],
260        column: &ColumnMetadata,
261        column_index: usize,
262        is_min: bool,
263    ) -> Option<ArrayRef> {
264        let null_scalar: ScalarValue = column
265            .column_schema
266            .data_type
267            .as_arrow_type()
268            .try_into()
269            .ok()?;
270        let scalar_values = row_groups
271            .iter()
272            .map(|meta| {
273                let stats = meta.borrow().column(column_index).statistics()?;
274                match stats {
275                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
276                        *s.min_opt()?
277                    } else {
278                        *s.max_opt()?
279                    }))),
280                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
281                        *s.min_opt()?
282                    } else {
283                        *s.max_opt()?
284                    }))),
285                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
286                        *s.min_opt()?
287                    } else {
288                        *s.max_opt()?
289                    }))),
290
291                    Statistics::Int96(_) => None,
292                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
293                        *s.min_opt()?
294                    } else {
295                        *s.max_opt()?
296                    }))),
297                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
298                        *s.min_opt()?
299                    } else {
300                        *s.max_opt()?
301                    }))),
302                    Statistics::ByteArray(s) => {
303                        let bytes = if is_min {
304                            s.min_bytes_opt()?
305                        } else {
306                            s.max_bytes_opt()?
307                        };
308                        let s = String::from_utf8(bytes.to_vec()).ok();
309                        Some(ScalarValue::Utf8(s))
310                    }
311
312                    Statistics::FixedLenByteArray(_) => None,
313                }
314            })
315            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
316            .collect::<Vec<ScalarValue>>();
317        debug_assert_eq!(scalar_values.len(), row_groups.len());
318        ScalarValue::iter_to_array(scalar_values).ok()
319    }
320
321    /// Returns null counts of specific columns.
322    /// The column should not be encoded as a part of a primary key.
323    pub(crate) fn column_null_counts(
324        row_groups: &[impl Borrow<RowGroupMetaData>],
325        column_index: usize,
326    ) -> Option<ArrayRef> {
327        let values = row_groups.iter().map(|meta| {
328            let col = meta.borrow().column(column_index);
329            let stat = col.statistics()?;
330            stat.null_count_opt()
331        });
332        Some(Arc::new(UInt64Array::from_iter(values)))
333    }
334
335    /// Sets the sequence number to override.
336    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
337        match self {
338            ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
339            ReadFormat::Flat(format) => format.set_override_sequence(sequence),
340        }
341    }
342
343    /// Creates a sequence array to override.
344    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
345        match self {
346            ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
347            ReadFormat::Flat(format) => format.new_override_sequence_array(length),
348        }
349    }
350}
351
352/// Helper for reading the SST format.
353pub struct PrimaryKeyReadFormat {
354    /// The metadata stored in the SST.
355    metadata: RegionMetadataRef,
356    /// SST file schema.
357    arrow_schema: SchemaRef,
358    /// Field column id to its index in `schema` (SST schema).
359    /// In SST schema, fields are stored in the front of the schema.
360    field_id_to_index: HashMap<ColumnId, usize>,
361    /// Indices of columns to read from the SST. It contains all internal columns.
362    projection_indices: Vec<usize>,
363    /// Field column id to their index in the projected schema (
364    /// the schema of [Batch]).
365    field_id_to_projected_index: HashMap<ColumnId, usize>,
366    /// Sequence number to override the sequence read from the SST.
367    override_sequence: Option<SequenceNumber>,
368}
369
370impl PrimaryKeyReadFormat {
371    /// Creates a helper with existing `metadata` and `column_ids` to read.
372    pub fn new(
373        metadata: RegionMetadataRef,
374        column_ids: impl Iterator<Item = ColumnId>,
375    ) -> PrimaryKeyReadFormat {
376        let field_id_to_index: HashMap<_, _> = metadata
377            .field_columns()
378            .enumerate()
379            .map(|(index, column)| (column.column_id, index))
380            .collect();
381        let arrow_schema = to_sst_arrow_schema(&metadata);
382
383        let format_projection = FormatProjection::compute_format_projection(
384            &field_id_to_index,
385            arrow_schema.fields.len(),
386            column_ids,
387        );
388
389        PrimaryKeyReadFormat {
390            metadata,
391            arrow_schema,
392            field_id_to_index,
393            projection_indices: format_projection.projection_indices,
394            field_id_to_projected_index: format_projection.column_id_to_projected_index,
395            override_sequence: None,
396        }
397    }
398
399    /// Sets the sequence number to override.
400    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
401        self.override_sequence = sequence;
402    }
403
404    /// Gets the arrow schema of the SST file.
405    ///
406    /// This schema is computed from the region metadata but should be the same
407    /// as the arrow schema decoded from the file metadata.
408    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
409        &self.arrow_schema
410    }
411
412    /// Gets the metadata of the SST.
413    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
414        &self.metadata
415    }
416
417    /// Gets sorted projection indices to read.
418    pub(crate) fn projection_indices(&self) -> &[usize] {
419        &self.projection_indices
420    }
421
422    /// Creates a sequence array to override.
423    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
424        self.override_sequence
425            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
426    }
427
428    /// Convert a arrow record batch into `batches`.
429    ///
430    /// The length of `override_sequence_array` must be larger than the length of the record batch.
431    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
432    pub fn convert_record_batch(
433        &self,
434        record_batch: &RecordBatch,
435        override_sequence_array: Option<&ArrayRef>,
436        batches: &mut VecDeque<Batch>,
437    ) -> Result<()> {
438        debug_assert!(batches.is_empty());
439
440        // The record batch must has time index and internal columns.
441        ensure!(
442            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
443            InvalidRecordBatchSnafu {
444                reason: format!(
445                    "record batch only has {} columns",
446                    record_batch.num_columns()
447                ),
448            }
449        );
450
451        let mut fixed_pos_columns = record_batch
452            .columns()
453            .iter()
454            .rev()
455            .take(FIXED_POS_COLUMN_NUM);
456        // Safety: We have checked the column number.
457        let op_type_array = fixed_pos_columns.next().unwrap();
458        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
459        let pk_array = fixed_pos_columns.next().unwrap();
460        let ts_array = fixed_pos_columns.next().unwrap();
461        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
462
463        // Override sequence array if provided.
464        if let Some(override_array) = override_sequence_array {
465            assert!(override_array.len() >= sequence_array.len());
466            // It's fine to assign the override array directly, but we slice it to make
467            // sure it matches the length of the original sequence array.
468            sequence_array = if override_array.len() > sequence_array.len() {
469                override_array.slice(0, sequence_array.len())
470            } else {
471                override_array.clone()
472            };
473        }
474
475        // Compute primary key offsets.
476        let pk_dict_array = pk_array
477            .as_any()
478            .downcast_ref::<PrimaryKeyArray>()
479            .with_context(|| InvalidRecordBatchSnafu {
480                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
481            })?;
482        let offsets = primary_key_offsets(pk_dict_array)?;
483        if offsets.is_empty() {
484            return Ok(());
485        }
486
487        // Split record batch according to pk offsets.
488        let keys = pk_dict_array.keys();
489        let pk_values = pk_dict_array
490            .values()
491            .as_any()
492            .downcast_ref::<BinaryArray>()
493            .with_context(|| InvalidRecordBatchSnafu {
494                reason: format!(
495                    "values of primary key array should not be {:?}",
496                    pk_dict_array.values().data_type()
497                ),
498            })?;
499        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
500            let end = offsets[i + 1];
501            let rows_in_batch = end - start;
502            let dict_key = keys.value(*start);
503            let primary_key = pk_values.value(dict_key as usize).to_vec();
504
505            let mut builder = BatchBuilder::new(primary_key);
506            builder
507                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
508                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
509                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
510            // Push all fields
511            for batch_column in &field_batch_columns {
512                builder.push_field(BatchColumn {
513                    column_id: batch_column.column_id,
514                    data: batch_column.data.slice(*start, rows_in_batch),
515                });
516            }
517
518            let batch = builder.build()?;
519            batches.push_back(batch);
520        }
521
522        Ok(())
523    }
524
525    /// Returns min values of specific column in row groups.
526    pub fn min_values(
527        &self,
528        row_groups: &[impl Borrow<RowGroupMetaData>],
529        column_id: ColumnId,
530    ) -> StatValues {
531        let Some(column) = self.metadata.column_by_id(column_id) else {
532            // No such column in the SST.
533            return StatValues::NoColumn;
534        };
535        match column.semantic_type {
536            SemanticType::Tag => self.tag_values(row_groups, column, true),
537            SemanticType::Field => {
538                // Safety: `field_id_to_index` is initialized by the semantic type.
539                let index = self.field_id_to_index.get(&column_id).unwrap();
540                let stats = ReadFormat::column_values(row_groups, column, *index, true);
541                StatValues::from_stats_opt(stats)
542            }
543            SemanticType::Timestamp => {
544                let index = self.time_index_position();
545                let stats = ReadFormat::column_values(row_groups, column, index, true);
546                StatValues::from_stats_opt(stats)
547            }
548        }
549    }
550
551    /// Returns max values of specific column in row groups.
552    pub fn max_values(
553        &self,
554        row_groups: &[impl Borrow<RowGroupMetaData>],
555        column_id: ColumnId,
556    ) -> StatValues {
557        let Some(column) = self.metadata.column_by_id(column_id) else {
558            // No such column in the SST.
559            return StatValues::NoColumn;
560        };
561        match column.semantic_type {
562            SemanticType::Tag => self.tag_values(row_groups, column, false),
563            SemanticType::Field => {
564                // Safety: `field_id_to_index` is initialized by the semantic type.
565                let index = self.field_id_to_index.get(&column_id).unwrap();
566                let stats = ReadFormat::column_values(row_groups, column, *index, false);
567                StatValues::from_stats_opt(stats)
568            }
569            SemanticType::Timestamp => {
570                let index = self.time_index_position();
571                let stats = ReadFormat::column_values(row_groups, column, index, false);
572                StatValues::from_stats_opt(stats)
573            }
574        }
575    }
576
577    /// Returns null counts of specific column in row groups.
578    pub fn null_counts(
579        &self,
580        row_groups: &[impl Borrow<RowGroupMetaData>],
581        column_id: ColumnId,
582    ) -> StatValues {
583        let Some(column) = self.metadata.column_by_id(column_id) else {
584            // No such column in the SST.
585            return StatValues::NoColumn;
586        };
587        match column.semantic_type {
588            SemanticType::Tag => StatValues::NoStats,
589            SemanticType::Field => {
590                // Safety: `field_id_to_index` is initialized by the semantic type.
591                let index = self.field_id_to_index.get(&column_id).unwrap();
592                let stats = ReadFormat::column_null_counts(row_groups, *index);
593                StatValues::from_stats_opt(stats)
594            }
595            SemanticType::Timestamp => {
596                let index = self.time_index_position();
597                let stats = ReadFormat::column_null_counts(row_groups, index);
598                StatValues::from_stats_opt(stats)
599            }
600        }
601    }
602
603    /// Get fields from `record_batch`.
604    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
605        record_batch
606            .columns()
607            .iter()
608            .zip(record_batch.schema().fields())
609            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
610            .map(|(array, field)| {
611                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
612                let column = self
613                    .metadata
614                    .column_by_name(field.name())
615                    .with_context(|| InvalidRecordBatchSnafu {
616                        reason: format!("column {} not found in metadata", field.name()),
617                    })?;
618
619                Ok(BatchColumn {
620                    column_id: column.column_id,
621                    data: vector,
622                })
623            })
624            .collect()
625    }
626
627    /// Returns min/max values of specific tag.
628    fn tag_values(
629        &self,
630        row_groups: &[impl Borrow<RowGroupMetaData>],
631        column: &ColumnMetadata,
632        is_min: bool,
633    ) -> StatValues {
634        let is_first_tag = self
635            .metadata
636            .primary_key
637            .first()
638            .map(|id| *id == column.column_id)
639            .unwrap_or(false);
640        if !is_first_tag {
641            // Only the min-max of the first tag is available in the primary key.
642            return StatValues::NoStats;
643        }
644
645        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
646    }
647
648    /// Returns min/max values of the first tag.
649    /// Returns None if the tag does not have statistics.
650    fn first_tag_values(
651        &self,
652        row_groups: &[impl Borrow<RowGroupMetaData>],
653        column: &ColumnMetadata,
654        is_min: bool,
655    ) -> Option<ArrayRef> {
656        debug_assert!(self
657            .metadata
658            .primary_key
659            .first()
660            .map(|id| *id == column.column_id)
661            .unwrap_or(false));
662
663        let primary_key_encoding = self.metadata.primary_key_encoding;
664        let converter = build_primary_key_codec_with_fields(
665            primary_key_encoding,
666            [(
667                column.column_id,
668                SortField::new(column.column_schema.data_type.clone()),
669            )]
670            .into_iter(),
671        );
672
673        let values = row_groups.iter().map(|meta| {
674            let stats = meta
675                .borrow()
676                .column(self.primary_key_position())
677                .statistics()?;
678            match stats {
679                Statistics::Boolean(_) => None,
680                Statistics::Int32(_) => None,
681                Statistics::Int64(_) => None,
682                Statistics::Int96(_) => None,
683                Statistics::Float(_) => None,
684                Statistics::Double(_) => None,
685                Statistics::ByteArray(s) => {
686                    let bytes = if is_min {
687                        s.min_bytes_opt()?
688                    } else {
689                        s.max_bytes_opt()?
690                    };
691                    converter.decode_leftmost(bytes).ok()?
692                }
693                Statistics::FixedLenByteArray(_) => None,
694            }
695        });
696        let mut builder = column
697            .column_schema
698            .data_type
699            .create_mutable_vector(row_groups.len());
700        for value_opt in values {
701            match value_opt {
702                // Safety: We use the same data type to create the converter.
703                Some(v) => builder.push_value_ref(v.as_value_ref()),
704                None => builder.push_null(),
705            }
706        }
707        let vector = builder.to_vector();
708
709        Some(vector.to_arrow_array())
710    }
711
712    /// Index in SST of the primary key.
713    fn primary_key_position(&self) -> usize {
714        self.arrow_schema.fields.len() - 3
715    }
716
717    /// Index in SST of the time index.
718    fn time_index_position(&self) -> usize {
719        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
720    }
721
722    /// Index of a field column by its column id.
723    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
724        self.field_id_to_projected_index.get(&column_id).copied()
725    }
726}
727
728/// Helper to compute the projection for the SST.
729pub(crate) struct FormatProjection {
730    /// Indices of columns to read from the SST. It contains all internal columns.
731    pub(crate) projection_indices: Vec<usize>,
732    /// Column id to their index in the projected schema (
733    /// the schema after projection).
734    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
735}
736
737impl FormatProjection {
738    /// Computes the projection.
739    ///
740    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
741    pub(crate) fn compute_format_projection(
742        id_to_index: &HashMap<ColumnId, usize>,
743        sst_column_num: usize,
744        column_ids: impl Iterator<Item = ColumnId>,
745    ) -> Self {
746        // Maps column id of a projected column to its index in SST.
747        // It also ignores columns not in the SST.
748        // [(column id, index in SST)]
749        let mut projected_schema: Vec<_> = column_ids
750            .filter_map(|column_id| {
751                id_to_index
752                    .get(&column_id)
753                    .copied()
754                    .map(|index| (column_id, index))
755            })
756            .collect();
757        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
758        // This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
759        projected_schema.sort_unstable_by_key(|x| x.1);
760        // Dedups the entries to avoid the case that `column_ids` has duplicated columns.
761        projected_schema.dedup_by_key(|x| x.1);
762
763        // Collects all projected indices.
764        // It contains the positions of all columns we need to read.
765        let mut projection_indices: Vec<_> = projected_schema
766            .iter()
767            .map(|(_column_id, index)| *index)
768            // We need to add all fixed position columns.
769            .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
770            .collect();
771        projection_indices.sort_unstable();
772        // Removes duplications.
773        projection_indices.dedup();
774
775        // Creates a map from column id to the index of that column in the projected record batch.
776        let column_id_to_projected_index = projected_schema
777            .into_iter()
778            .map(|(column_id, _)| column_id)
779            .enumerate()
780            .map(|(index, column_id)| (column_id, index))
781            .collect();
782
783        Self {
784            projection_indices,
785            column_id_to_projected_index,
786        }
787    }
788}
789
790/// Values of column statistics of the SST.
791///
792/// It also distinguishes the case that a column is not found and
793/// the column exists but has no statistics.
794pub enum StatValues {
795    /// Values of each row group.
796    Values(ArrayRef),
797    /// No such column.
798    NoColumn,
799    /// Column exists but has no statistics.
800    NoStats,
801}
802
803impl StatValues {
804    /// Creates a new `StatValues` instance from optional statistics.
805    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
806        match stats {
807            Some(stats) => StatValues::Values(stats),
808            None => StatValues::NoStats,
809        }
810    }
811}
812
813#[cfg(test)]
814impl PrimaryKeyReadFormat {
815    /// Creates a helper with existing `metadata` and all columns.
816    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
817        Self::new(
818            Arc::clone(&metadata),
819            metadata.column_metadatas.iter().map(|c| c.column_id),
820        )
821    }
822}
823
824/// Compute offsets of different primary keys in the array.
825fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
826    if pk_dict_array.is_empty() {
827        return Ok(Vec::new());
828    }
829
830    // Init offsets.
831    let mut offsets = vec![0];
832    let keys = pk_dict_array.keys();
833    // We know that primary keys are always not null so we iterate `keys.values()` directly.
834    let pk_indices = keys.values();
835    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
836        // Compare each key with next key
837        if *key != pk_indices[i + 1] {
838            // We meet a new key, push the next index as end of the offset.
839            offsets.push(i + 1);
840        }
841    }
842    offsets.push(keys.len());
843
844    Ok(offsets)
845}
846
847/// Creates a new array for specific `primary_key`.
848fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
849    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
850    let keys = UInt32Array::from_value(0, num_rows);
851
852    // Safety: The key index is valid.
853    Arc::new(DictionaryArray::new(keys, values))
854}
855
856/// Gets the min/max time index of the row group from the parquet meta.
857/// It assumes the parquet is created by the mito engine.
858pub(crate) fn parquet_row_group_time_range(
859    file_meta: &FileMeta,
860    parquet_meta: &ParquetMetaData,
861    row_group_idx: usize,
862) -> Option<FileTimeRange> {
863    let row_group_meta = parquet_meta.row_group(row_group_idx);
864    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
865    assert!(
866        num_columns >= FIXED_POS_COLUMN_NUM,
867        "file only has {} columns",
868        num_columns
869    );
870    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
871
872    let stats = row_group_meta.column(time_index_pos).statistics()?;
873    // The physical type for the timestamp should be i64.
874    let (min, max) = match stats {
875        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
876        Statistics::Int32(_)
877        | Statistics::Boolean(_)
878        | Statistics::Int96(_)
879        | Statistics::Float(_)
880        | Statistics::Double(_)
881        | Statistics::ByteArray(_)
882        | Statistics::FixedLenByteArray(_) => {
883            common_telemetry::warn!(
884                "Invalid statistics {:?} for time index in parquet in {}",
885                stats,
886                file_meta.file_id
887            );
888            return None;
889        }
890    };
891
892    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
893    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
894    let unit = file_meta.time_range.0.unit();
895
896    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
897}
898
899/// Checks if sequence override is needed based on all row groups' statistics.
900/// Returns true if ALL row groups have sequence min-max values of 0.
901pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
902    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
903    if num_columns < FIXED_POS_COLUMN_NUM {
904        return false;
905    }
906
907    // The sequence column is the second-to-last column (before op_type)
908    let sequence_pos = num_columns - 2;
909
910    // Check all row groups - all must have sequence min-max of 0
911    for row_group in parquet_meta.row_groups() {
912        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
913            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
914                // If any row group doesn't have min=0 and max=0, return false
915                if *min_val != 0 || *max_val != 0 {
916                    return false;
917                }
918            } else {
919                // If any row group doesn't have statistics, return false
920                return false;
921            }
922        } else {
923            // If any row group doesn't have Int64 statistics, return false
924            return false;
925        }
926    }
927
928    // All row groups have sequence min-max of 0, or there are no row groups
929    !parquet_meta.row_groups().is_empty()
930}
931
932#[cfg(test)]
933mod tests {
934    use std::sync::Arc;
935
936    use api::v1::OpType;
937    use datatypes::arrow::array::{
938        Int64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
939    };
940    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
941    use datatypes::prelude::ConcreteDataType;
942    use datatypes::schema::ColumnSchema;
943    use datatypes::value::ValueRef;
944    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
945    use mito_codec::row_converter::{
946        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
947    };
948    use store_api::codec::PrimaryKeyEncoding;
949    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
950    use store_api::storage::consts::ReservedColumnId;
951    use store_api::storage::RegionId;
952
953    use super::*;
954    use crate::sst::parquet::flat_format::{sequence_column_index, FlatWriteFormat};
955    use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
956
957    const TEST_SEQUENCE: u64 = 1;
958    const TEST_OP_TYPE: u8 = OpType::Put as u8;
959
960    fn build_test_region_metadata() -> RegionMetadataRef {
961        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
962        builder
963            .push_column_metadata(ColumnMetadata {
964                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
965                semantic_type: SemanticType::Tag,
966                column_id: 1,
967            })
968            .push_column_metadata(ColumnMetadata {
969                column_schema: ColumnSchema::new(
970                    "field1",
971                    ConcreteDataType::int64_datatype(),
972                    true,
973                ),
974                semantic_type: SemanticType::Field,
975                column_id: 4, // We change the order of fields columns.
976            })
977            .push_column_metadata(ColumnMetadata {
978                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
979                semantic_type: SemanticType::Tag,
980                column_id: 3,
981            })
982            .push_column_metadata(ColumnMetadata {
983                column_schema: ColumnSchema::new(
984                    "field0",
985                    ConcreteDataType::int64_datatype(),
986                    true,
987                ),
988                semantic_type: SemanticType::Field,
989                column_id: 2,
990            })
991            .push_column_metadata(ColumnMetadata {
992                column_schema: ColumnSchema::new(
993                    "ts",
994                    ConcreteDataType::timestamp_millisecond_datatype(),
995                    false,
996                ),
997                semantic_type: SemanticType::Timestamp,
998                column_id: 5,
999            })
1000            .primary_key(vec![1, 3]);
1001        Arc::new(builder.build().unwrap())
1002    }
1003
1004    fn build_test_arrow_schema() -> SchemaRef {
1005        let fields = vec![
1006            Field::new("field1", ArrowDataType::Int64, true),
1007            Field::new("field0", ArrowDataType::Int64, true),
1008            Field::new(
1009                "ts",
1010                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1011                false,
1012            ),
1013            Field::new(
1014                "__primary_key",
1015                ArrowDataType::Dictionary(
1016                    Box::new(ArrowDataType::UInt32),
1017                    Box::new(ArrowDataType::Binary),
1018                ),
1019                false,
1020            ),
1021            Field::new("__sequence", ArrowDataType::UInt64, false),
1022            Field::new("__op_type", ArrowDataType::UInt8, false),
1023        ];
1024        Arc::new(Schema::new(fields))
1025    }
1026
1027    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1028        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1029    }
1030
1031    fn new_batch_with_sequence(
1032        primary_key: &[u8],
1033        start_ts: i64,
1034        start_field: i64,
1035        num_rows: usize,
1036        sequence: u64,
1037    ) -> Batch {
1038        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1039        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1040        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1041        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1042        let fields = vec![
1043            BatchColumn {
1044                column_id: 4,
1045                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1046            }, // field1
1047            BatchColumn {
1048                column_id: 2,
1049                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1050            }, // field0
1051        ];
1052
1053        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1054            .with_fields(fields)
1055            .build()
1056            .unwrap()
1057    }
1058
1059    #[test]
1060    fn test_to_sst_arrow_schema() {
1061        let metadata = build_test_region_metadata();
1062        let write_format = PrimaryKeyWriteFormat::new(metadata);
1063        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1064    }
1065
1066    #[test]
1067    fn test_new_primary_key_array() {
1068        let array = new_primary_key_array(b"test", 3);
1069        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1070        assert_eq!(&expect, &array);
1071    }
1072
1073    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1074        let values = Arc::new(BinaryArray::from_iter_values(
1075            pk_row_nums.iter().map(|v| &v.0),
1076        ));
1077        let mut keys = vec![];
1078        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1079            keys.extend(std::iter::repeat_n(index as u32, num_rows));
1080        }
1081        let keys = UInt32Array::from(keys);
1082        Arc::new(DictionaryArray::new(keys, values))
1083    }
1084
1085    #[test]
1086    fn test_convert_batch() {
1087        let metadata = build_test_region_metadata();
1088        let write_format = PrimaryKeyWriteFormat::new(metadata);
1089
1090        let num_rows = 4;
1091        let batch = new_batch(b"test", 1, 2, num_rows);
1092        let columns: Vec<ArrayRef> = vec![
1093            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1094            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1095            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1096            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1097            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1098            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1099        ];
1100        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1101
1102        let actual = write_format.convert_batch(&batch).unwrap();
1103        assert_eq!(expect_record, actual);
1104    }
1105
1106    #[test]
1107    fn test_convert_batch_with_override_sequence() {
1108        let metadata = build_test_region_metadata();
1109        let write_format =
1110            PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1111
1112        let num_rows = 4;
1113        let batch = new_batch(b"test", 1, 2, num_rows);
1114        let columns: Vec<ArrayRef> = vec![
1115            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1116            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1117            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1118            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1119            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
1120            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1121        ];
1122        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1123
1124        let actual = write_format.convert_batch(&batch).unwrap();
1125        assert_eq!(expect_record, actual);
1126    }
1127
1128    #[test]
1129    fn test_projection_indices() {
1130        let metadata = build_test_region_metadata();
1131        // Only read tag1
1132        let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1133        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1134        // Only read field1
1135        let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1136        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1137        // Only read ts
1138        let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1139        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1140        // Read field0, tag0, ts
1141        let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1142        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1143    }
1144
1145    #[test]
1146    fn test_empty_primary_key_offsets() {
1147        let array = build_test_pk_array(&[]);
1148        assert!(primary_key_offsets(&array).unwrap().is_empty());
1149    }
1150
1151    #[test]
1152    fn test_primary_key_offsets_one_series() {
1153        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1154        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1155
1156        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1157        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1158
1159        let array = build_test_pk_array(&[
1160            (b"one".to_vec(), 1),
1161            (b"two".to_vec(), 1),
1162            (b"three".to_vec(), 1),
1163        ]);
1164        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1165    }
1166
1167    #[test]
1168    fn test_primary_key_offsets_multi_series() {
1169        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1170        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1171
1172        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1173        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1174
1175        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1176        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1177    }
1178
1179    #[test]
1180    fn test_convert_empty_record_batch() {
1181        let metadata = build_test_region_metadata();
1182        let arrow_schema = build_test_arrow_schema();
1183        let column_ids: Vec<_> = metadata
1184            .column_metadatas
1185            .iter()
1186            .map(|col| col.column_id)
1187            .collect();
1188        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1189        assert_eq!(arrow_schema, *read_format.arrow_schema());
1190
1191        let record_batch = RecordBatch::new_empty(arrow_schema);
1192        let mut batches = VecDeque::new();
1193        read_format
1194            .convert_record_batch(&record_batch, None, &mut batches)
1195            .unwrap();
1196        assert!(batches.is_empty());
1197    }
1198
1199    #[test]
1200    fn test_convert_record_batch() {
1201        let metadata = build_test_region_metadata();
1202        let column_ids: Vec<_> = metadata
1203            .column_metadatas
1204            .iter()
1205            .map(|col| col.column_id)
1206            .collect();
1207        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1208
1209        let columns: Vec<ArrayRef> = vec![
1210            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1211            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1212            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1213            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1214            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1215            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1216        ];
1217        let arrow_schema = build_test_arrow_schema();
1218        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1219        let mut batches = VecDeque::new();
1220        read_format
1221            .convert_record_batch(&record_batch, None, &mut batches)
1222            .unwrap();
1223
1224        assert_eq!(
1225            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1226            batches.into_iter().collect::<Vec<_>>(),
1227        );
1228    }
1229
1230    #[test]
1231    fn test_convert_record_batch_with_override_sequence() {
1232        let metadata = build_test_region_metadata();
1233        let column_ids: Vec<_> = metadata
1234            .column_metadatas
1235            .iter()
1236            .map(|col| col.column_id)
1237            .collect();
1238        let read_format = ReadFormat::new(metadata, column_ids.iter().copied(), false);
1239
1240        let columns: Vec<ArrayRef> = vec![
1241            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1242            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1243            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1244            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1245            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1246            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1247        ];
1248        let arrow_schema = build_test_arrow_schema();
1249        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1250
1251        // Create override sequence array with custom values
1252        let override_sequence: u64 = 12345;
1253        let override_sequence_array: ArrayRef =
1254            Arc::new(UInt64Array::from_value(override_sequence, 4));
1255
1256        let mut batches = VecDeque::new();
1257        read_format
1258            .as_primary_key()
1259            .unwrap()
1260            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1261            .unwrap();
1262
1263        // Create expected batches with override sequence
1264        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1265        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1266
1267        assert_eq!(
1268            vec![expected_batch1, expected_batch2],
1269            batches.into_iter().collect::<Vec<_>>(),
1270        );
1271    }
1272
1273    fn build_test_flat_sst_schema() -> SchemaRef {
1274        let fields = vec![
1275            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1276            Field::new("tag1", ArrowDataType::Int64, true),
1277            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1278            Field::new("field0", ArrowDataType::Int64, true),
1279            Field::new(
1280                "ts",
1281                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1282                false,
1283            ),
1284            Field::new(
1285                "__primary_key",
1286                ArrowDataType::Dictionary(
1287                    Box::new(ArrowDataType::UInt32),
1288                    Box::new(ArrowDataType::Binary),
1289                ),
1290                false,
1291            ),
1292            Field::new("__sequence", ArrowDataType::UInt64, false),
1293            Field::new("__op_type", ArrowDataType::UInt8, false),
1294        ];
1295        Arc::new(Schema::new(fields))
1296    }
1297
1298    #[test]
1299    fn test_flat_to_sst_arrow_schema() {
1300        let metadata = build_test_region_metadata();
1301        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1302        assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1303    }
1304
1305    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1306        vec![
1307            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1308            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1309            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1310            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1311            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1312            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1313            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1314            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1315        ]
1316    }
1317
1318    #[test]
1319    fn test_flat_convert_batch() {
1320        let metadata = build_test_region_metadata();
1321        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1322
1323        let num_rows = 4;
1324        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1325        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1326        let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1327
1328        let actual = format.convert_batch(&batch).unwrap();
1329        assert_eq!(expect_record, actual);
1330    }
1331
1332    #[test]
1333    fn test_flat_convert_with_override_sequence() {
1334        let metadata = build_test_region_metadata();
1335        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1336            .with_override_sequence(Some(415411));
1337
1338        let num_rows = 4;
1339        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1340        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1341
1342        let expected_columns: Vec<ArrayRef> = vec![
1343            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1344            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1345            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1346            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1347            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1348            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1349            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1350            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1351        ];
1352        let expected_record =
1353            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1354
1355        let actual = format.convert_batch(&batch).unwrap();
1356        assert_eq!(expected_record, actual);
1357    }
1358
1359    #[test]
1360    fn test_flat_projection_indices() {
1361        let metadata = build_test_region_metadata();
1362        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1363        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1364
1365        // Only read tag1 (column_id=3, index=1) + fixed columns
1366        let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false);
1367        assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1368
1369        // Only read field1 (column_id=4, index=2) + fixed columns
1370        let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false);
1371        assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1372
1373        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1374        let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false);
1375        assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1376
1377        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1378        let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false);
1379        assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1380    }
1381
1382    #[test]
1383    fn test_flat_read_format_convert_batch() {
1384        let metadata = build_test_region_metadata();
1385        let mut format = FlatReadFormat::new(
1386            metadata,
1387            std::iter::once(1), // Just read tag0
1388            false,
1389        );
1390
1391        let num_rows = 4;
1392        let original_sequence = 100u64;
1393        let override_sequence = 200u64;
1394
1395        // Create a test record batch
1396        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1397        let mut test_columns = columns.clone();
1398        // Replace sequence column with original sequence values
1399        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1400        let record_batch =
1401            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1402
1403        // Test without override sequence - should return clone
1404        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1405        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1406        let sequence_array = sequence_column
1407            .as_any()
1408            .downcast_ref::<UInt64Array>()
1409            .unwrap();
1410
1411        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1412        assert_eq!(sequence_array, &expected_original);
1413
1414        // Set override sequence and test with new_override_sequence_array
1415        format.set_override_sequence(Some(override_sequence));
1416        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1417        let result = format
1418            .convert_batch(record_batch, Some(&override_sequence_array))
1419            .unwrap();
1420        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1421        let sequence_array = sequence_column
1422            .as_any()
1423            .downcast_ref::<UInt64Array>()
1424            .unwrap();
1425
1426        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1427        assert_eq!(sequence_array, &expected_override);
1428    }
1429
1430    #[test]
1431    fn test_need_convert_to_flat() {
1432        let metadata = build_test_region_metadata();
1433
1434        // Test case 1: Same number of columns, no conversion needed
1435        // For flat format: all columns (5) + internal columns (3)
1436        let expected_columns = metadata.column_metadatas.len() + 3;
1437        let result =
1438            FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata)
1439                .unwrap();
1440        assert!(
1441            !result,
1442            "Should not need conversion when column counts match"
1443        );
1444
1445        // Test case 2: Different number of columns, need conversion
1446        // Missing primary key columns (2 primary keys in test metadata)
1447        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1448        let result =
1449            FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata)
1450                .unwrap();
1451        assert!(
1452            result,
1453            "Should need conversion when primary key columns are missing"
1454        );
1455
1456        // Test case 3: Invalid case - actual columns more than expected
1457        let too_many_columns = expected_columns + 1;
1458        let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata)
1459            .unwrap_err();
1460        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1461
1462        // Test case 4: Invalid case - column difference doesn't match primary key count
1463        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1464        let err =
1465            FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata)
1466                .unwrap_err();
1467        assert!(
1468            err.to_string().contains("Column number difference"),
1469            "{err:?}"
1470        );
1471    }
1472
1473    fn build_test_dense_pk_array(
1474        codec: &DensePrimaryKeyCodec,
1475        pk_values_per_row: &[&[Option<i64>]],
1476    ) -> Arc<PrimaryKeyArray> {
1477        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1478
1479        for pk_values_row in pk_values_per_row {
1480            let values: Vec<ValueRef> = pk_values_row
1481                .iter()
1482                .map(|opt| match opt {
1483                    Some(val) => ValueRef::Int64(*val),
1484                    None => ValueRef::Null,
1485                })
1486                .collect();
1487
1488            let encoded = codec.encode(values.into_iter()).unwrap();
1489            builder.append_value(&encoded);
1490        }
1491
1492        Arc::new(builder.finish())
1493    }
1494
1495    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1496        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1497        builder
1498            .push_column_metadata(ColumnMetadata {
1499                column_schema: ColumnSchema::new(
1500                    "__table_id",
1501                    ConcreteDataType::uint32_datatype(),
1502                    false,
1503                ),
1504                semantic_type: SemanticType::Tag,
1505                column_id: ReservedColumnId::table_id(),
1506            })
1507            .push_column_metadata(ColumnMetadata {
1508                column_schema: ColumnSchema::new(
1509                    "__tsid",
1510                    ConcreteDataType::uint64_datatype(),
1511                    false,
1512                ),
1513                semantic_type: SemanticType::Tag,
1514                column_id: ReservedColumnId::tsid(),
1515            })
1516            .push_column_metadata(ColumnMetadata {
1517                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1518                semantic_type: SemanticType::Tag,
1519                column_id: 1,
1520            })
1521            .push_column_metadata(ColumnMetadata {
1522                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1523                semantic_type: SemanticType::Tag,
1524                column_id: 3,
1525            })
1526            .push_column_metadata(ColumnMetadata {
1527                column_schema: ColumnSchema::new(
1528                    "field1",
1529                    ConcreteDataType::int64_datatype(),
1530                    true,
1531                ),
1532                semantic_type: SemanticType::Field,
1533                column_id: 4,
1534            })
1535            .push_column_metadata(ColumnMetadata {
1536                column_schema: ColumnSchema::new(
1537                    "field0",
1538                    ConcreteDataType::int64_datatype(),
1539                    true,
1540                ),
1541                semantic_type: SemanticType::Field,
1542                column_id: 2,
1543            })
1544            .push_column_metadata(ColumnMetadata {
1545                column_schema: ColumnSchema::new(
1546                    "ts",
1547                    ConcreteDataType::timestamp_millisecond_datatype(),
1548                    false,
1549                ),
1550                semantic_type: SemanticType::Timestamp,
1551                column_id: 5,
1552            })
1553            .primary_key(vec![
1554                ReservedColumnId::table_id(),
1555                ReservedColumnId::tsid(),
1556                1,
1557                3,
1558            ])
1559            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1560        Arc::new(builder.build().unwrap())
1561    }
1562
1563    fn build_test_sparse_pk_array(
1564        codec: &SparsePrimaryKeyCodec,
1565        pk_values_per_row: &[SparseTestRow],
1566    ) -> Arc<PrimaryKeyArray> {
1567        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1568        for row in pk_values_per_row {
1569            let values = vec![
1570                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1571                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1572                (1, ValueRef::String(&row.tag0)),
1573                (3, ValueRef::String(&row.tag1)),
1574            ];
1575
1576            let mut buffer = Vec::new();
1577            codec.encode_value_refs(&values, &mut buffer).unwrap();
1578            builder.append_value(&buffer);
1579        }
1580
1581        Arc::new(builder.finish())
1582    }
1583
1584    #[derive(Clone)]
1585    struct SparseTestRow {
1586        table_id: u32,
1587        tsid: u64,
1588        tag0: String,
1589        tag1: String,
1590    }
1591
1592    #[test]
1593    fn test_flat_read_format_convert_format_with_dense_encoding() {
1594        let metadata = build_test_region_metadata();
1595
1596        let column_ids: Vec<_> = metadata
1597            .column_metadatas
1598            .iter()
1599            .map(|c| c.column_id)
1600            .collect();
1601        let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
1602
1603        let num_rows = 4;
1604        let original_sequence = 100u64;
1605
1606        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1607        let pk_values_per_row = vec![
1608                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1609            ];
1610
1611        // Create a test record batch in old format using dense encoding
1612        let codec = DensePrimaryKeyCodec::new(&metadata);
1613        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1614        let columns: Vec<ArrayRef> = vec![
1615            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1616            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1617            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1618            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1619            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1620            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1621        ];
1622
1623        // Create schema for old format (without primary key columns)
1624        let old_format_fields = vec![
1625            Field::new("field1", ArrowDataType::Int64, true),
1626            Field::new("field0", ArrowDataType::Int64, true),
1627            Field::new(
1628                "ts",
1629                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1630                false,
1631            ),
1632            Field::new(
1633                "__primary_key",
1634                ArrowDataType::Dictionary(
1635                    Box::new(ArrowDataType::UInt32),
1636                    Box::new(ArrowDataType::Binary),
1637                ),
1638                false,
1639            ),
1640            Field::new("__sequence", ArrowDataType::UInt64, false),
1641            Field::new("__op_type", ArrowDataType::UInt8, false),
1642        ];
1643        let old_schema = Arc::new(Schema::new(old_format_fields));
1644        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1645
1646        // Test conversion with dense encoding
1647        let result = format.convert_batch(record_batch, None).unwrap();
1648
1649        // Construct expected RecordBatch in flat format with decoded primary key columns
1650        let expected_columns: Vec<ArrayRef> = vec![
1651            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1652            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1653            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1654            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1655            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1656            dense_pk_array,                                // __primary_key (preserved)
1657            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1658            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1659        ];
1660        let expected_record_batch =
1661            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1662
1663        // Compare the actual result with the expected record batch
1664        assert_eq!(expected_record_batch, result);
1665    }
1666
1667    #[test]
1668    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1669        let metadata = build_test_sparse_region_metadata();
1670
1671        let column_ids: Vec<_> = metadata
1672            .column_metadatas
1673            .iter()
1674            .map(|c| c.column_id)
1675            .collect();
1676        let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
1677
1678        let num_rows = 4;
1679        let original_sequence = 100u64;
1680
1681        // Create sparse test data with table_id, tsid and string tags
1682        let pk_test_rows = vec![
1683            SparseTestRow {
1684                table_id: 1,
1685                tsid: 123,
1686                tag0: "frontend".to_string(),
1687                tag1: "pod1".to_string(),
1688            };
1689            num_rows
1690        ];
1691
1692        let codec = SparsePrimaryKeyCodec::new(&metadata);
1693        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1694        // Create a test record batch in old format using sparse encoding
1695        let columns: Vec<ArrayRef> = vec![
1696            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1697            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1698            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1699            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1700            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1701            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1702        ];
1703
1704        // Create schema for old format (without primary key columns)
1705        let old_format_fields = vec![
1706            Field::new("field1", ArrowDataType::Int64, true),
1707            Field::new("field0", ArrowDataType::Int64, true),
1708            Field::new(
1709                "ts",
1710                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1711                false,
1712            ),
1713            Field::new(
1714                "__primary_key",
1715                ArrowDataType::Dictionary(
1716                    Box::new(ArrowDataType::UInt32),
1717                    Box::new(ArrowDataType::Binary),
1718                ),
1719                false,
1720            ),
1721            Field::new("__sequence", ArrowDataType::UInt64, false),
1722            Field::new("__op_type", ArrowDataType::UInt8, false),
1723        ];
1724        let old_schema = Arc::new(Schema::new(old_format_fields));
1725        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1726
1727        // Test conversion with sparse encoding
1728        let result = format.convert_batch(record_batch, None).unwrap();
1729
1730        // Construct expected RecordBatch in flat format with decoded primary key columns
1731        let tag0_array = Arc::new(DictionaryArray::new(
1732            UInt32Array::from(vec![0; num_rows]),
1733            Arc::new(StringArray::from(vec!["frontend"])),
1734        ));
1735        let tag1_array = Arc::new(DictionaryArray::new(
1736            UInt32Array::from(vec![0; num_rows]),
1737            Arc::new(StringArray::from(vec!["pod1"])),
1738        ));
1739        let expected_columns: Vec<ArrayRef> = vec![
1740            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1741            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1742            tag0_array,                                     // tag0 (decoded from primary key)
1743            tag1_array,                                     // tag1 (decoded from primary key)
1744            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1745            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1746            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1747            sparse_pk_array,                                // __primary_key (preserved)
1748            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1749            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1750        ];
1751        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1752        let expected_record_batch =
1753            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1754
1755        // Compare the actual result with the expected record batch
1756        assert_eq!(expected_record_batch, result);
1757    }
1758}