Skip to main content

mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::Helper;
43use mito_codec::row_converter::{
44    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields,
45};
46use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
47use parquet::file::statistics::Statistics;
48use snafu::{OptionExt, ResultExt, ensure};
49use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
50use store_api::storage::{ColumnId, SequenceNumber};
51
52use crate::error::{
53    ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
54};
55use crate::read::read_columns::ReadColumns;
56use crate::read::{Batch, BatchBuilder, BatchColumn};
57use crate::sst::file::{FileMeta, FileTimeRange};
58use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
59use crate::sst::to_sst_arrow_schema;
60
61/// Arrow array type for the primary key dictionary.
62pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
63/// Builder type for primary key dictionary array.
64pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
65
66/// Number of columns that have fixed positions.
67///
68/// Contains: time index and internal columns.
69pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
70/// Number of internal columns.
71pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
72
73/// Helper for writing the SST format with primary key.
74pub(crate) struct PrimaryKeyWriteFormat {
75    /// SST file schema.
76    arrow_schema: SchemaRef,
77    override_sequence: Option<SequenceNumber>,
78}
79
80impl PrimaryKeyWriteFormat {
81    /// Creates a new helper.
82    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
83        let arrow_schema = to_sst_arrow_schema(&metadata);
84        PrimaryKeyWriteFormat {
85            arrow_schema,
86            override_sequence: None,
87        }
88    }
89
90    /// Set override sequence.
91    pub(crate) fn with_override_sequence(
92        mut self,
93        override_sequence: Option<SequenceNumber>,
94    ) -> Self {
95        self.override_sequence = override_sequence;
96        self
97    }
98
99    /// Gets the arrow schema to store in parquet.
100    #[cfg(test)]
101    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
102        &self.arrow_schema
103    }
104
105    /// Convert a flat `RecordBatch` to primary-key format, retaining only
106    /// field columns, time index, and internal columns.
107    ///
108    /// `num_fields` is the number of field columns. The method strips
109    /// leading tag columns: `num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM`.
110    pub(crate) fn convert_flat_batch(
111        &self,
112        batch: &RecordBatch,
113        num_fields: usize,
114    ) -> Result<RecordBatch> {
115        let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
116        let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
117
118        if let Some(override_sequence) = self.override_sequence {
119            let num_cols = columns.len();
120            // sequence is at num_cols - 2 (before op_type)
121            columns[num_cols - 2] =
122                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
123        }
124
125        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
126    }
127}
128
129/// Returns min/max values of specific columns.
130/// Returns None if the column does not have statistics.
131/// The column should not be encoded as a part of a primary key.
132pub(crate) fn column_values(
133    row_groups: &[impl Borrow<RowGroupMetaData>],
134    column: &ColumnMetadata,
135    column_index: usize,
136    is_min: bool,
137) -> Option<ArrayRef> {
138    let null_scalar: ScalarValue = column
139        .column_schema
140        .data_type
141        .as_arrow_type()
142        .try_into()
143        .ok()?;
144    let scalar_values = row_groups
145        .iter()
146        .map(|meta| {
147            let stats = meta.borrow().column(column_index).statistics()?;
148            match stats {
149                Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
150                    *s.min_opt()?
151                } else {
152                    *s.max_opt()?
153                }))),
154                Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
155                    *s.min_opt()?
156                } else {
157                    *s.max_opt()?
158                }))),
159                Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
160                    *s.min_opt()?
161                } else {
162                    *s.max_opt()?
163                }))),
164                Statistics::Int96(_) => None,
165                Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
166                    *s.min_opt()?
167                } else {
168                    *s.max_opt()?
169                }))),
170                Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
171                    *s.min_opt()?
172                } else {
173                    *s.max_opt()?
174                }))),
175                Statistics::ByteArray(s) => {
176                    let bytes = if is_min {
177                        s.min_bytes_opt()?
178                    } else {
179                        s.max_bytes_opt()?
180                    };
181                    let s = String::from_utf8(bytes.to_vec()).ok();
182                    Some(ScalarValue::Utf8(s))
183                }
184                Statistics::FixedLenByteArray(_) => None,
185            }
186        })
187        .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
188        .collect::<Vec<ScalarValue>>();
189    debug_assert_eq!(scalar_values.len(), row_groups.len());
190    ScalarValue::iter_to_array(scalar_values).ok()
191}
192
193/// Returns null counts of specific columns.
194/// The column should not be encoded as a part of a primary key.
195pub(crate) fn column_null_counts(
196    row_groups: &[impl Borrow<RowGroupMetaData>],
197    column_index: usize,
198) -> Option<ArrayRef> {
199    let values = row_groups.iter().map(|meta| {
200        let col = meta.borrow().column(column_index);
201        let stat = col.statistics()?;
202        stat.null_count_opt()
203    });
204    Some(Arc::new(UInt64Array::from_iter(values)))
205}
206
207/// Helper for reading the SST format.
208pub struct PrimaryKeyReadFormat {
209    /// The metadata stored in the SST.
210    metadata: RegionMetadataRef,
211    /// SST file schema.
212    arrow_schema: SchemaRef,
213    /// Field column id to its index in `schema` (SST schema).
214    /// In SST schema, fields are stored in the front of the schema.
215    field_id_to_index: HashMap<ColumnId, usize>,
216    /// Indices of columns to read from the SST. It contains all internal columns.
217    parquet_read_cols: ParquetReadColumns,
218    /// Field column id to their index in the projected schema (
219    /// the schema of [Batch]).
220    field_id_to_projected_index: HashMap<ColumnId, usize>,
221    /// Codec used to decode primary key values if eager decoding is enabled.
222    primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
223}
224
225impl PrimaryKeyReadFormat {
226    /// Creates a helper with existing `metadata` and `column_ids` to read.
227    pub fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> PrimaryKeyReadFormat {
228        let field_id_to_index: HashMap<_, _> = metadata
229            .field_columns()
230            .enumerate()
231            .map(|(index, column)| (column.column_id, index))
232            .collect();
233        let arrow_schema = to_sst_arrow_schema(&metadata);
234
235        let format_projection = FormatProjection::compute_format_projection(
236            &field_id_to_index,
237            arrow_schema.fields.len(),
238            read_cols,
239        );
240
241        PrimaryKeyReadFormat {
242            metadata,
243            arrow_schema,
244            field_id_to_index,
245            parquet_read_cols: format_projection.parquet_read_cols,
246            field_id_to_projected_index: format_projection.column_id_to_projected_index,
247            primary_key_codec: None,
248        }
249    }
250
251    /// Gets the arrow schema of the SST file.
252    ///
253    /// This schema is computed from the region metadata but should be the same
254    /// as the arrow schema decoded from the file metadata.
255    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
256        &self.arrow_schema
257    }
258
259    /// Gets the metadata of the SST.
260    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
261        &self.metadata
262    }
263
264    pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
265        &self.parquet_read_cols
266    }
267
268    /// Gets the field id to projected index.
269    pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
270        &self.field_id_to_projected_index
271    }
272
273    /// Convert a arrow record batch into `batches`.
274    ///
275    /// The length of `override_sequence_array` must be larger than the length of the record batch.
276    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
277    pub fn convert_record_batch(
278        &self,
279        record_batch: &RecordBatch,
280        override_sequence_array: Option<&ArrayRef>,
281        batches: &mut VecDeque<Batch>,
282    ) -> Result<()> {
283        debug_assert!(batches.is_empty());
284
285        // The record batch must has time index and internal columns.
286        ensure!(
287            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
288            InvalidRecordBatchSnafu {
289                reason: format!(
290                    "record batch only has {} columns",
291                    record_batch.num_columns()
292                ),
293            }
294        );
295
296        let mut fixed_pos_columns = record_batch
297            .columns()
298            .iter()
299            .rev()
300            .take(FIXED_POS_COLUMN_NUM);
301        // Safety: We have checked the column number.
302        let op_type_array = fixed_pos_columns.next().unwrap();
303        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
304        let pk_array = fixed_pos_columns.next().unwrap();
305        let ts_array = fixed_pos_columns.next().unwrap();
306        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
307
308        // Override sequence array if provided.
309        if let Some(override_array) = override_sequence_array {
310            assert!(override_array.len() >= sequence_array.len());
311            // It's fine to assign the override array directly, but we slice it to make
312            // sure it matches the length of the original sequence array.
313            sequence_array = if override_array.len() > sequence_array.len() {
314                override_array.slice(0, sequence_array.len())
315            } else {
316                override_array.clone()
317            };
318        }
319
320        // Compute primary key offsets.
321        let pk_dict_array = pk_array
322            .as_any()
323            .downcast_ref::<PrimaryKeyArray>()
324            .with_context(|| InvalidRecordBatchSnafu {
325                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
326            })?;
327        let offsets = primary_key_offsets(pk_dict_array)?;
328        if offsets.is_empty() {
329            return Ok(());
330        }
331
332        // Split record batch according to pk offsets.
333        let keys = pk_dict_array.keys();
334        let pk_values = pk_dict_array
335            .values()
336            .as_any()
337            .downcast_ref::<BinaryArray>()
338            .with_context(|| InvalidRecordBatchSnafu {
339                reason: format!(
340                    "values of primary key array should not be {:?}",
341                    pk_dict_array.values().data_type()
342                ),
343            })?;
344        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
345            let end = offsets[i + 1];
346            let rows_in_batch = end - start;
347            let dict_key = keys.value(*start);
348            let primary_key = pk_values.value(dict_key as usize).to_vec();
349
350            let mut builder = BatchBuilder::new(primary_key);
351            builder
352                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
353                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
354                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
355            // Push all fields
356            for batch_column in &field_batch_columns {
357                builder.push_field(BatchColumn {
358                    column_id: batch_column.column_id,
359                    data: batch_column.data.slice(*start, rows_in_batch),
360                });
361            }
362
363            let mut batch = builder.build()?;
364            if let Some(codec) = &self.primary_key_codec {
365                let pk_values: CompositeValues =
366                    codec.decode(batch.primary_key()).context(DecodeSnafu)?;
367                batch.set_pk_values(pk_values);
368            }
369            batches.push_back(batch);
370        }
371
372        Ok(())
373    }
374
375    /// Returns min values of specific column in row groups.
376    pub fn min_values(
377        &self,
378        row_groups: &[impl Borrow<RowGroupMetaData>],
379        column_id: ColumnId,
380    ) -> StatValues {
381        let Some(column) = self.metadata.column_by_id(column_id) else {
382            // No such column in the SST.
383            return StatValues::NoColumn;
384        };
385        match column.semantic_type {
386            SemanticType::Tag => self.tag_values(row_groups, column, true),
387            SemanticType::Field => {
388                // Safety: `field_id_to_index` is initialized by the semantic type.
389                let index = self.field_id_to_index.get(&column_id).unwrap();
390                let stats = column_values(row_groups, column, *index, true);
391                StatValues::from_stats_opt(stats)
392            }
393            SemanticType::Timestamp => {
394                let index = self.time_index_position();
395                let stats = column_values(row_groups, column, index, true);
396                StatValues::from_stats_opt(stats)
397            }
398        }
399    }
400
401    /// Returns max values of specific column in row groups.
402    pub fn max_values(
403        &self,
404        row_groups: &[impl Borrow<RowGroupMetaData>],
405        column_id: ColumnId,
406    ) -> StatValues {
407        let Some(column) = self.metadata.column_by_id(column_id) else {
408            // No such column in the SST.
409            return StatValues::NoColumn;
410        };
411        match column.semantic_type {
412            SemanticType::Tag => self.tag_values(row_groups, column, false),
413            SemanticType::Field => {
414                // Safety: `field_id_to_index` is initialized by the semantic type.
415                let index = self.field_id_to_index.get(&column_id).unwrap();
416                let stats = column_values(row_groups, column, *index, false);
417                StatValues::from_stats_opt(stats)
418            }
419            SemanticType::Timestamp => {
420                let index = self.time_index_position();
421                let stats = column_values(row_groups, column, index, false);
422                StatValues::from_stats_opt(stats)
423            }
424        }
425    }
426
427    /// Returns null counts of specific column in row groups.
428    pub fn null_counts(
429        &self,
430        row_groups: &[impl Borrow<RowGroupMetaData>],
431        column_id: ColumnId,
432    ) -> StatValues {
433        let Some(column) = self.metadata.column_by_id(column_id) else {
434            // No such column in the SST.
435            return StatValues::NoColumn;
436        };
437        match column.semantic_type {
438            SemanticType::Tag => StatValues::NoStats,
439            SemanticType::Field => {
440                // Safety: `field_id_to_index` is initialized by the semantic type.
441                let index = self.field_id_to_index.get(&column_id).unwrap();
442                let stats = column_null_counts(row_groups, *index);
443                StatValues::from_stats_opt(stats)
444            }
445            SemanticType::Timestamp => {
446                let index = self.time_index_position();
447                let stats = column_null_counts(row_groups, index);
448                StatValues::from_stats_opt(stats)
449            }
450        }
451    }
452
453    /// Get fields from `record_batch`.
454    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
455        record_batch
456            .columns()
457            .iter()
458            .zip(record_batch.schema().fields())
459            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
460            .map(|(array, field)| {
461                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
462                let column = self
463                    .metadata
464                    .column_by_name(field.name())
465                    .with_context(|| InvalidRecordBatchSnafu {
466                        reason: format!("column {} not found in metadata", field.name()),
467                    })?;
468
469                Ok(BatchColumn {
470                    column_id: column.column_id,
471                    data: vector,
472                })
473            })
474            .collect()
475    }
476
477    /// Returns min/max values of specific tag.
478    fn tag_values(
479        &self,
480        row_groups: &[impl Borrow<RowGroupMetaData>],
481        column: &ColumnMetadata,
482        is_min: bool,
483    ) -> StatValues {
484        let is_first_tag = self
485            .metadata
486            .primary_key
487            .first()
488            .map(|id| *id == column.column_id)
489            .unwrap_or(false);
490        if !is_first_tag {
491            // Only the min-max of the first tag is available in the primary key.
492            return StatValues::NoStats;
493        }
494
495        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
496    }
497
498    /// Returns min/max values of the first tag.
499    /// Returns None if the tag does not have statistics.
500    fn first_tag_values(
501        &self,
502        row_groups: &[impl Borrow<RowGroupMetaData>],
503        column: &ColumnMetadata,
504        is_min: bool,
505    ) -> Option<ArrayRef> {
506        debug_assert!(
507            self.metadata
508                .primary_key
509                .first()
510                .map(|id| *id == column.column_id)
511                .unwrap_or(false)
512        );
513
514        let primary_key_encoding = self.metadata.primary_key_encoding;
515        let converter = build_primary_key_codec_with_fields(
516            primary_key_encoding,
517            [(
518                column.column_id,
519                SortField::new(column.column_schema.data_type.clone()),
520            )]
521            .into_iter(),
522        );
523
524        let values = row_groups.iter().map(|meta| {
525            let stats = meta
526                .borrow()
527                .column(self.primary_key_position())
528                .statistics()?;
529            match stats {
530                Statistics::Boolean(_) => None,
531                Statistics::Int32(_) => None,
532                Statistics::Int64(_) => None,
533                Statistics::Int96(_) => None,
534                Statistics::Float(_) => None,
535                Statistics::Double(_) => None,
536                Statistics::ByteArray(s) => {
537                    let bytes = if is_min {
538                        s.min_bytes_opt()?
539                    } else {
540                        s.max_bytes_opt()?
541                    };
542                    converter.decode_leftmost(bytes).ok()?
543                }
544                Statistics::FixedLenByteArray(_) => None,
545            }
546        });
547        let mut builder = column
548            .column_schema
549            .data_type
550            .create_mutable_vector(row_groups.len());
551        for value_opt in values {
552            match value_opt {
553                // Safety: We use the same data type to create the converter.
554                Some(v) => builder.push_value_ref(&v.as_value_ref()),
555                None => builder.push_null(),
556            }
557        }
558        let vector = builder.to_vector();
559
560        Some(vector.to_arrow_array())
561    }
562
563    /// Index in SST of the primary key.
564    fn primary_key_position(&self) -> usize {
565        self.arrow_schema.fields.len() - 3
566    }
567
568    /// Index in SST of the time index.
569    fn time_index_position(&self) -> usize {
570        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
571    }
572
573    /// Index of a field column by its column id.
574    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
575        self.field_id_to_projected_index.get(&column_id).copied()
576    }
577}
578
579/// Helper to compute the projection for the SST.
580pub(crate) struct FormatProjection {
581    /// The columns to read from the SST. It contains all internal columns.
582    pub(crate) parquet_read_cols: ParquetReadColumns,
583    /// Column id to their index in the projected schema (
584    /// the schema after projection).
585    ///
586    /// It doesn't contain time index column if it is not present in the projection.
587    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
588}
589
590impl FormatProjection {
591    /// Computes the projection.
592    ///
593    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
594    pub(crate) fn compute_format_projection(
595        id_to_index: &HashMap<ColumnId, usize>,
596        sst_column_num: usize,
597        cols: ReadColumns,
598    ) -> Self {
599        let mut projected_columns: Vec<_> = cols
600            .cols
601            .into_iter()
602            .filter_map(|col| {
603                id_to_index
604                    .get(&col.column_id)
605                    .copied()
606                    .map(|index_of_sst| (col.column_id, index_of_sst, col.nested_paths))
607            })
608            .collect();
609        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
610        // This ensures the schema of `projected_columns` is the same as the batch returned from the SST.
611        projected_columns.sort_unstable_by_key(|(_, index, _)| *index);
612
613        let mut parquet_read_cols: Vec<ParquetReadColumn> =
614            Vec::with_capacity(projected_columns.len() + FIXED_POS_COLUMN_NUM);
615        // Creates a map from column id to the index of that column in the projected record batch.
616        let mut column_id_to_projected_index = HashMap::with_capacity(projected_columns.len());
617
618        for (col_id, index_of_sst, nested_paths) in projected_columns {
619            Self::merge_or_push_parquet_column(&mut parquet_read_cols, index_of_sst, nested_paths);
620
621            column_id_to_projected_index
622                .entry(col_id)
623                .or_insert_with(|| parquet_read_cols.len() - 1);
624        }
625
626        // In SST schema, fixed-position columns are always in the tail:
627        // `time index, __primary_key, __sequence, __op_type`.
628        Self::append_time_index_if_needed(&mut parquet_read_cols, sst_column_num);
629        Self::append_fixed_internal_columns(&mut parquet_read_cols, sst_column_num);
630
631        Self {
632            parquet_read_cols: ParquetReadColumns::from_deduped(parquet_read_cols),
633            column_id_to_projected_index,
634        }
635    }
636
637    fn merge_or_push_parquet_column(
638        parquet_read_cols: &mut Vec<ParquetReadColumn>,
639        index_of_sst: usize,
640        nested_paths: Vec<Vec<String>>,
641    ) {
642        // `projected_columns` is sorted by parquet root index, so repeated reads
643        // for the same root column are always adjacent.
644        if let Some(last_col) = parquet_read_cols.last_mut()
645            && last_col.root_index() == index_of_sst
646        {
647            last_col.merge_nested_paths(nested_paths);
648            return;
649        }
650
651        let parquet_col = ParquetReadColumn::new(index_of_sst).with_nested_paths(nested_paths);
652        parquet_read_cols.push(parquet_col);
653    }
654
655    fn append_time_index_if_needed(
656        parquet_read_cols: &mut Vec<ParquetReadColumn>,
657        sst_column_num: usize,
658    ) {
659        let time_index = sst_column_num - FIXED_POS_COLUMN_NUM;
660        // Existing projected roots are already sorted by SST root index, and may
661        // already include the time index, so we compare against the last root to
662        // decide whether we still need to append `time index`.
663        let needs_time_index = parquet_read_cols
664            .last()
665            .map(|col| col.root_index() != time_index)
666            .unwrap_or(true);
667        if needs_time_index {
668            parquet_read_cols.push(ParquetReadColumn::new(time_index));
669        }
670    }
671
672    // Append internal columns in fixed order: `__primary_key`, `__sequence`,
673    // `__op_type`.
674    fn append_fixed_internal_columns(
675        parquet_read_cols: &mut Vec<ParquetReadColumn>,
676        sst_column_num: usize,
677    ) {
678        for index in sst_column_num - INTERNAL_COLUMN_NUM..sst_column_num {
679            parquet_read_cols.push(ParquetReadColumn::new(index));
680        }
681    }
682}
683
684/// Values of column statistics of the SST.
685///
686/// It also distinguishes the case that a column is not found and
687/// the column exists but has no statistics.
688pub enum StatValues {
689    /// Values of each row group.
690    Values(ArrayRef),
691    /// No such column.
692    NoColumn,
693    /// Column exists but has no statistics.
694    NoStats,
695}
696
697impl StatValues {
698    /// Creates a new `StatValues` instance from optional statistics.
699    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
700        match stats {
701            Some(stats) => StatValues::Values(stats),
702            None => StatValues::NoStats,
703        }
704    }
705}
706
707#[cfg(test)]
708impl PrimaryKeyReadFormat {
709    /// Creates a helper with existing `metadata` and all columns.
710    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
711        Self::new(
712            Arc::clone(&metadata),
713            ReadColumns::from_deduped_column_ids(
714                metadata.column_metadatas.iter().map(|c| c.column_id),
715            ),
716        )
717    }
718}
719
720/// Compute offsets of different primary keys in the array.
721pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
722    if pk_dict_array.is_empty() {
723        return Ok(Vec::new());
724    }
725
726    // Init offsets.
727    let mut offsets = vec![0];
728    let keys = pk_dict_array.keys();
729    // We know that primary keys are always not null so we iterate `keys.values()` directly.
730    let pk_indices = keys.values();
731    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
732        // Compare each key with next key
733        if *key != pk_indices[i + 1] {
734            // We meet a new key, push the next index as end of the offset.
735            offsets.push(i + 1);
736        }
737    }
738    offsets.push(keys.len());
739
740    Ok(offsets)
741}
742
743/// Gets the min/max time index of the row group from the parquet meta.
744/// It assumes the parquet is created by the mito engine.
745pub(crate) fn parquet_row_group_time_range(
746    file_meta: &FileMeta,
747    parquet_meta: &ParquetMetaData,
748    row_group_idx: usize,
749) -> Option<FileTimeRange> {
750    let row_group_meta = parquet_meta.row_group(row_group_idx);
751    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
752    assert!(
753        num_columns >= FIXED_POS_COLUMN_NUM,
754        "file only has {} columns",
755        num_columns
756    );
757    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
758
759    let stats = row_group_meta.column(time_index_pos).statistics()?;
760    // The physical type for the timestamp should be i64.
761    let (min, max) = match stats {
762        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
763        Statistics::Int32(_)
764        | Statistics::Boolean(_)
765        | Statistics::Int96(_)
766        | Statistics::Float(_)
767        | Statistics::Double(_)
768        | Statistics::ByteArray(_)
769        | Statistics::FixedLenByteArray(_) => {
770            common_telemetry::warn!(
771                "Invalid statistics {:?} for time index in parquet in {}",
772                stats,
773                file_meta.file_id
774            );
775            return None;
776        }
777    };
778
779    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
780    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
781    let unit = file_meta.time_range.0.unit();
782
783    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
784}
785
786/// Checks if sequence override is needed based on all row groups' statistics.
787/// Returns true if ALL row groups have sequence min-max values of 0.
788pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
789    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
790    if num_columns < FIXED_POS_COLUMN_NUM {
791        return false;
792    }
793
794    // The sequence column is the second-to-last column (before op_type)
795    let sequence_pos = num_columns - 2;
796
797    // Check all row groups - all must have sequence min-max of 0
798    for row_group in parquet_meta.row_groups() {
799        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
800            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
801                // If any row group doesn't have min=0 and max=0, return false
802                if *min_val != 0 || *max_val != 0 {
803                    return false;
804                }
805            } else {
806                // If any row group doesn't have statistics, return false
807                return false;
808            }
809        } else {
810            // If any row group doesn't have Int64 statistics, return false
811            return false;
812        }
813    }
814
815    // All row groups have sequence min-max of 0, or there are no row groups
816    !parquet_meta.row_groups().is_empty()
817}
818
819#[cfg(test)]
820mod tests {
821    use std::sync::Arc;
822
823    use api::v1::OpType;
824    use datatypes::arrow::array::{
825        Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
826    };
827    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
828    use datatypes::prelude::ConcreteDataType;
829    use datatypes::schema::ColumnSchema;
830    use datatypes::value::ValueRef;
831    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
832    use mito_codec::row_converter::{
833        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
834    };
835    use store_api::codec::PrimaryKeyEncoding;
836    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
837    use store_api::storage::RegionId;
838    use store_api::storage::consts::ReservedColumnId;
839
840    use super::*;
841    use crate::sst::parquet::flat_format::{
842        FlatReadFormat, FlatWriteFormat, sequence_column_index,
843    };
844    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema, with_field_id};
845
846    const TEST_SEQUENCE: u64 = 1;
847    const TEST_OP_TYPE: u8 = OpType::Put as u8;
848
849    fn build_test_region_metadata() -> RegionMetadataRef {
850        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
851        builder
852            .push_column_metadata(ColumnMetadata {
853                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
854                semantic_type: SemanticType::Tag,
855                column_id: 1,
856            })
857            .push_column_metadata(ColumnMetadata {
858                column_schema: ColumnSchema::new(
859                    "field1",
860                    ConcreteDataType::int64_datatype(),
861                    true,
862                ),
863                semantic_type: SemanticType::Field,
864                column_id: 4, // We change the order of fields columns.
865            })
866            .push_column_metadata(ColumnMetadata {
867                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
868                semantic_type: SemanticType::Tag,
869                column_id: 3,
870            })
871            .push_column_metadata(ColumnMetadata {
872                column_schema: ColumnSchema::new(
873                    "field0",
874                    ConcreteDataType::int64_datatype(),
875                    true,
876                ),
877                semantic_type: SemanticType::Field,
878                column_id: 2,
879            })
880            .push_column_metadata(ColumnMetadata {
881                column_schema: ColumnSchema::new(
882                    "ts",
883                    ConcreteDataType::timestamp_millisecond_datatype(),
884                    false,
885                ),
886                semantic_type: SemanticType::Timestamp,
887                column_id: 5,
888            })
889            .primary_key(vec![1, 3]);
890        Arc::new(builder.build().unwrap())
891    }
892
893    fn build_test_arrow_schema() -> SchemaRef {
894        let fields = vec![
895            make_field("field1", ArrowDataType::Int64, true, Some(4)),
896            make_field("field0", ArrowDataType::Int64, true, Some(2)),
897            make_field(
898                "ts",
899                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
900                false,
901                Some(5),
902            ),
903            make_field(
904                "__primary_key",
905                ArrowDataType::Dictionary(
906                    Box::new(ArrowDataType::UInt32),
907                    Box::new(ArrowDataType::Binary),
908                ),
909                false,
910                None,
911            ),
912            make_field("__sequence", ArrowDataType::UInt64, false, None),
913            make_field("__op_type", ArrowDataType::UInt8, false, None),
914        ];
915        Arc::new(Schema::new(fields))
916    }
917
918    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
919        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
920    }
921
922    fn new_batch_with_sequence(
923        primary_key: &[u8],
924        start_ts: i64,
925        start_field: i64,
926        num_rows: usize,
927        sequence: u64,
928    ) -> Batch {
929        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
930        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
931        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
932        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
933        let fields = vec![
934            BatchColumn {
935                column_id: 4,
936                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
937            }, // field1
938            BatchColumn {
939                column_id: 2,
940                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
941            }, // field0
942        ];
943
944        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
945            .with_fields(fields)
946            .build()
947            .unwrap()
948    }
949
950    #[test]
951    fn test_to_sst_arrow_schema() {
952        let metadata = build_test_region_metadata();
953        let write_format = PrimaryKeyWriteFormat::new(metadata);
954        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
955    }
956
957    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
958        let values = Arc::new(BinaryArray::from_iter_values(
959            pk_row_nums.iter().map(|v| &v.0),
960        ));
961        let mut keys = vec![];
962        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
963            keys.extend(std::iter::repeat_n(index as u32, num_rows));
964        }
965        let keys = UInt32Array::from(keys);
966        Arc::new(DictionaryArray::new(keys, values))
967    }
968
969    #[test]
970    fn test_projection_indices() {
971        let metadata = build_test_region_metadata();
972        // Only read tag1
973        let read_format =
974            PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([3]));
975        assert_eq!(
976            &[2, 3, 4, 5],
977            read_format.parquet_read_columns().root_indices()
978        );
979        // Only read field1
980        let read_format =
981            PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([4]));
982        assert_eq!(
983            &[0, 2, 3, 4, 5],
984            read_format.parquet_read_columns().root_indices()
985        );
986        // Only read ts
987        let read_format =
988            PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([5]));
989        assert_eq!(
990            &[2, 3, 4, 5],
991            read_format.parquet_read_columns().root_indices()
992        );
993        // Read field0, tag0, ts
994        let read_format =
995            PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids([2, 1, 5]));
996        assert_eq!(
997            &[1, 2, 3, 4, 5],
998            read_format.parquet_read_columns().root_indices()
999        );
1000    }
1001
1002    #[test]
1003    fn test_empty_primary_key_offsets() {
1004        let array = build_test_pk_array(&[]);
1005        assert!(primary_key_offsets(&array).unwrap().is_empty());
1006    }
1007
1008    #[test]
1009    fn test_primary_key_offsets_one_series() {
1010        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1011        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1012
1013        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1014        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1015
1016        let array = build_test_pk_array(&[
1017            (b"one".to_vec(), 1),
1018            (b"two".to_vec(), 1),
1019            (b"three".to_vec(), 1),
1020        ]);
1021        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1022    }
1023
1024    #[test]
1025    fn test_primary_key_offsets_multi_series() {
1026        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1027        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1028
1029        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1030        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1031
1032        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1033        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1034    }
1035
1036    #[test]
1037    fn test_convert_empty_record_batch() {
1038        let metadata = build_test_region_metadata();
1039        let arrow_schema = build_test_arrow_schema();
1040        let column_ids: Vec<_> = metadata
1041            .column_metadatas
1042            .iter()
1043            .map(|col| col.column_id)
1044            .collect();
1045        let read_format =
1046            PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1047        assert_eq!(arrow_schema, *read_format.arrow_schema());
1048
1049        let record_batch = RecordBatch::new_empty(arrow_schema);
1050        let mut batches = VecDeque::new();
1051        read_format
1052            .convert_record_batch(&record_batch, None, &mut batches)
1053            .unwrap();
1054        assert!(batches.is_empty());
1055    }
1056
1057    #[test]
1058    fn test_convert_record_batch() {
1059        let metadata = build_test_region_metadata();
1060        let column_ids: Vec<_> = metadata
1061            .column_metadatas
1062            .iter()
1063            .map(|col| col.column_id)
1064            .collect();
1065        let read_format =
1066            PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1067
1068        let columns: Vec<ArrayRef> = vec![
1069            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1070            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1071            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1072            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1073            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1074            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1075        ];
1076        let arrow_schema = build_test_arrow_schema();
1077        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1078        let mut batches = VecDeque::new();
1079        read_format
1080            .convert_record_batch(&record_batch, None, &mut batches)
1081            .unwrap();
1082
1083        assert_eq!(
1084            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1085            batches.into_iter().collect::<Vec<_>>(),
1086        );
1087    }
1088
1089    #[test]
1090    fn test_convert_record_batch_with_override_sequence() {
1091        let metadata = build_test_region_metadata();
1092        let read_format = PrimaryKeyReadFormat::new(
1093            metadata.clone(),
1094            ReadColumns::from_deduped_column_ids(
1095                metadata.column_metadatas.iter().map(|c| c.column_id),
1096            ),
1097        );
1098
1099        let columns: Vec<ArrayRef> = vec![
1100            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1101            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1102            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1103            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1104            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1105            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1106        ];
1107        let arrow_schema = build_test_arrow_schema();
1108        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1109
1110        // Create override sequence array with custom values
1111        let override_sequence: u64 = 12345;
1112        let override_sequence_array: ArrayRef =
1113            Arc::new(UInt64Array::from_value(override_sequence, 4));
1114
1115        let mut batches = VecDeque::new();
1116        read_format
1117            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1118            .unwrap();
1119
1120        // Create expected batches with override sequence
1121        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1122        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1123
1124        assert_eq!(
1125            vec![expected_batch1, expected_batch2],
1126            batches.into_iter().collect::<Vec<_>>(),
1127        );
1128    }
1129
1130    fn make_field(name: &str, dt: ArrowDataType, nullable: bool, field_id: Option<u32>) -> Field {
1131        let mut field = Field::new(name, dt, nullable);
1132        if let Some(id) = field_id {
1133            field = with_field_id(field, id);
1134        }
1135        field
1136    }
1137
1138    fn build_test_flat_sst_schema() -> SchemaRef {
1139        let fields = vec![
1140            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1141            Field::new("tag1", ArrowDataType::Int64, true),
1142            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1143            Field::new("field0", ArrowDataType::Int64, true),
1144            Field::new(
1145                "ts",
1146                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1147                false,
1148            ),
1149            Field::new(
1150                "__primary_key",
1151                ArrowDataType::Dictionary(
1152                    Box::new(ArrowDataType::UInt32),
1153                    Box::new(ArrowDataType::Binary),
1154                ),
1155                false,
1156            ),
1157            Field::new("__sequence", ArrowDataType::UInt64, false),
1158            Field::new("__op_type", ArrowDataType::UInt8, false),
1159        ];
1160        Arc::new(Schema::new(fields))
1161    }
1162
1163    fn build_test_flat_sst_schema_with_field_ids() -> SchemaRef {
1164        let ids = [
1165            Some(1u32),
1166            Some(3),
1167            Some(4),
1168            Some(2),
1169            Some(5),
1170            None,
1171            None,
1172            None,
1173        ];
1174        let fields: Vec<_> = build_test_flat_sst_schema()
1175            .fields()
1176            .iter()
1177            .zip(ids)
1178            .map(|(f, id)| match id {
1179                Some(id) => Arc::new(with_field_id((**f).clone(), id)) as _,
1180                None => f.clone(),
1181            })
1182            .collect();
1183        Arc::new(Schema::new(fields))
1184    }
1185
1186    #[test]
1187    fn test_flat_to_sst_arrow_schema() {
1188        let metadata = build_test_region_metadata();
1189        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1190        assert_eq!(
1191            &build_test_flat_sst_schema_with_field_ids(),
1192            format.arrow_schema()
1193        );
1194    }
1195
1196    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1197        vec![
1198            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1199            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1200            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1201            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1202            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1203            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1204            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1205            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1206        ]
1207    }
1208
1209    #[test]
1210    fn test_flat_convert_batch() {
1211        let metadata = build_test_region_metadata();
1212        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1213
1214        let num_rows = 4;
1215        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1216        let batch =
1217            RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns.clone())
1218                .unwrap();
1219        let expect_record =
1220            RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1221
1222        let actual = format.convert_batch(&batch).unwrap();
1223        assert_eq!(expect_record, actual);
1224    }
1225
1226    #[test]
1227    fn test_flat_convert_with_override_sequence() {
1228        let metadata = build_test_region_metadata();
1229        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1230            .with_override_sequence(Some(415411));
1231
1232        let num_rows = 4;
1233        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1234        let batch =
1235            RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1236
1237        let expected_columns: Vec<ArrayRef> = vec![
1238            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1239            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1240            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1241            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1242            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1243            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1244            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1245            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1246        ];
1247        let expected_record = RecordBatch::try_new(
1248            build_test_flat_sst_schema_with_field_ids(),
1249            expected_columns,
1250        )
1251        .unwrap();
1252
1253        let actual = format.convert_batch(&batch).unwrap();
1254        assert_eq!(expected_record, actual);
1255    }
1256
1257    #[test]
1258    fn test_flat_projection_indices() {
1259        let metadata = build_test_region_metadata();
1260        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1261        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1262
1263        // Only read tag1 (column_id=3, index=1) + fixed columns
1264        let read_format = FlatReadFormat::new(
1265            metadata.clone(),
1266            ReadColumns::from_deduped_column_ids([3]),
1267            None,
1268            "test",
1269            false,
1270        )
1271        .unwrap();
1272        assert_eq!(
1273            &[1, 4, 5, 6, 7],
1274            read_format.parquet_read_columns().root_indices()
1275        );
1276
1277        // Only read field1 (column_id=4, index=2) + fixed columns
1278        let read_format = FlatReadFormat::new(
1279            metadata.clone(),
1280            ReadColumns::from_deduped_column_ids([4]),
1281            None,
1282            "test",
1283            false,
1284        )
1285        .unwrap();
1286        assert_eq!(
1287            &[2, 4, 5, 6, 7],
1288            read_format.parquet_read_columns().root_indices()
1289        );
1290
1291        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1292        let read_format = FlatReadFormat::new(
1293            metadata.clone(),
1294            ReadColumns::from_deduped_column_ids([5]),
1295            None,
1296            "test",
1297            false,
1298        )
1299        .unwrap();
1300        assert_eq!(
1301            &[4, 5, 6, 7],
1302            read_format.parquet_read_columns().root_indices()
1303        );
1304
1305        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1306        let read_format = FlatReadFormat::new(
1307            metadata,
1308            ReadColumns::from_deduped_column_ids([2, 1, 5]),
1309            None,
1310            "test",
1311            false,
1312        )
1313        .unwrap();
1314        assert_eq!(
1315            &[0, 3, 4, 5, 6, 7],
1316            read_format.parquet_read_columns().root_indices()
1317        );
1318    }
1319
1320    #[test]
1321    fn test_flat_read_format_convert_batch() {
1322        let metadata = build_test_region_metadata();
1323        let mut format = FlatReadFormat::new(
1324            metadata,
1325            ReadColumns::from_deduped_column_ids(std::iter::once(1)), // Just read tag0
1326            Some(8),
1327            "test",
1328            false,
1329        )
1330        .unwrap();
1331
1332        let num_rows = 4;
1333        let original_sequence = 100u64;
1334        let override_sequence = 200u64;
1335
1336        // Create a test record batch
1337        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1338        let mut test_columns = columns.clone();
1339        // Replace sequence column with original sequence values
1340        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1341        let record_batch =
1342            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1343
1344        // Test without override sequence - should return clone
1345        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1346        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1347        let sequence_array = sequence_column
1348            .as_any()
1349            .downcast_ref::<UInt64Array>()
1350            .unwrap();
1351
1352        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1353        assert_eq!(sequence_array, &expected_original);
1354
1355        // Set override sequence and test with new_override_sequence_array
1356        format.set_override_sequence(Some(override_sequence));
1357        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1358        let result = format
1359            .convert_batch(record_batch, Some(&override_sequence_array))
1360            .unwrap();
1361        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1362        let sequence_array = sequence_column
1363            .as_any()
1364            .downcast_ref::<UInt64Array>()
1365            .unwrap();
1366
1367        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1368        assert_eq!(sequence_array, &expected_override);
1369    }
1370
1371    #[test]
1372    fn test_need_convert_to_flat() {
1373        let metadata = build_test_region_metadata();
1374
1375        // Test case 1: Same number of columns, no conversion needed
1376        // For flat format: all columns (5) + internal columns (3)
1377        let expected_columns = metadata.column_metadatas.len() + 3;
1378        let result =
1379            FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1380        assert!(
1381            !result,
1382            "Should not need conversion when column counts match"
1383        );
1384
1385        // Test case 2: Different number of columns, need conversion
1386        // Missing primary key columns (2 primary keys in test metadata)
1387        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1388        let result =
1389            FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1390                .unwrap();
1391        assert!(
1392            result,
1393            "Should need conversion when primary key columns are missing"
1394        );
1395
1396        // Test case 3: Invalid case - actual columns more than expected
1397        let too_many_columns = expected_columns + 1;
1398        let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1399            .unwrap_err();
1400        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1401
1402        // Test case 4: Invalid case - column difference doesn't match primary key count
1403        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1404        let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1405            .unwrap_err();
1406        assert!(
1407            err.to_string().contains("Column number difference"),
1408            "{err:?}"
1409        );
1410    }
1411
1412    fn build_test_dense_pk_array(
1413        codec: &DensePrimaryKeyCodec,
1414        pk_values_per_row: &[&[Option<i64>]],
1415    ) -> Arc<PrimaryKeyArray> {
1416        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1417
1418        for pk_values_row in pk_values_per_row {
1419            let values: Vec<ValueRef> = pk_values_row
1420                .iter()
1421                .map(|opt| match opt {
1422                    Some(val) => ValueRef::Int64(*val),
1423                    None => ValueRef::Null,
1424                })
1425                .collect();
1426
1427            let encoded = codec.encode(values.into_iter()).unwrap();
1428            builder.append_value(&encoded);
1429        }
1430
1431        Arc::new(builder.finish())
1432    }
1433
1434    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1435        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1436        builder
1437            .push_column_metadata(ColumnMetadata {
1438                column_schema: ColumnSchema::new(
1439                    "__table_id",
1440                    ConcreteDataType::uint32_datatype(),
1441                    false,
1442                ),
1443                semantic_type: SemanticType::Tag,
1444                column_id: ReservedColumnId::table_id(),
1445            })
1446            .push_column_metadata(ColumnMetadata {
1447                column_schema: ColumnSchema::new(
1448                    "__tsid",
1449                    ConcreteDataType::uint64_datatype(),
1450                    false,
1451                ),
1452                semantic_type: SemanticType::Tag,
1453                column_id: ReservedColumnId::tsid(),
1454            })
1455            .push_column_metadata(ColumnMetadata {
1456                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1457                semantic_type: SemanticType::Tag,
1458                column_id: 1,
1459            })
1460            .push_column_metadata(ColumnMetadata {
1461                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1462                semantic_type: SemanticType::Tag,
1463                column_id: 3,
1464            })
1465            .push_column_metadata(ColumnMetadata {
1466                column_schema: ColumnSchema::new(
1467                    "field1",
1468                    ConcreteDataType::int64_datatype(),
1469                    true,
1470                ),
1471                semantic_type: SemanticType::Field,
1472                column_id: 4,
1473            })
1474            .push_column_metadata(ColumnMetadata {
1475                column_schema: ColumnSchema::new(
1476                    "field0",
1477                    ConcreteDataType::int64_datatype(),
1478                    true,
1479                ),
1480                semantic_type: SemanticType::Field,
1481                column_id: 2,
1482            })
1483            .push_column_metadata(ColumnMetadata {
1484                column_schema: ColumnSchema::new(
1485                    "ts",
1486                    ConcreteDataType::timestamp_millisecond_datatype(),
1487                    false,
1488                ),
1489                semantic_type: SemanticType::Timestamp,
1490                column_id: 5,
1491            })
1492            .primary_key(vec![
1493                ReservedColumnId::table_id(),
1494                ReservedColumnId::tsid(),
1495                1,
1496                3,
1497            ])
1498            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1499        Arc::new(builder.build().unwrap())
1500    }
1501
1502    fn build_test_sparse_pk_array(
1503        codec: &SparsePrimaryKeyCodec,
1504        pk_values_per_row: &[SparseTestRow],
1505    ) -> Arc<PrimaryKeyArray> {
1506        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1507        for row in pk_values_per_row {
1508            let values = vec![
1509                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1510                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1511                (1, ValueRef::String(&row.tag0)),
1512                (3, ValueRef::String(&row.tag1)),
1513            ];
1514
1515            let mut buffer = Vec::new();
1516            codec.encode_value_refs(&values, &mut buffer).unwrap();
1517            builder.append_value(&buffer);
1518        }
1519
1520        Arc::new(builder.finish())
1521    }
1522
1523    #[derive(Clone)]
1524    struct SparseTestRow {
1525        table_id: u32,
1526        tsid: u64,
1527        tag0: String,
1528        tag1: String,
1529    }
1530
1531    #[test]
1532    fn test_flat_read_format_convert_format_with_dense_encoding() {
1533        let metadata = build_test_region_metadata();
1534
1535        let column_ids: Vec<_> = metadata
1536            .column_metadatas
1537            .iter()
1538            .map(|c| c.column_id)
1539            .collect();
1540        let format = FlatReadFormat::new(
1541            metadata.clone(),
1542            ReadColumns::from_deduped_column_ids(column_ids),
1543            Some(6),
1544            "test",
1545            false,
1546        )
1547        .unwrap();
1548
1549        let num_rows = 4;
1550        let original_sequence = 100u64;
1551
1552        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1553        let pk_values_per_row = vec![
1554                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1555            ];
1556
1557        // Create a test record batch in old format using dense encoding
1558        let codec = DensePrimaryKeyCodec::new(&metadata);
1559        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1560        let columns: Vec<ArrayRef> = vec![
1561            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1562            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1563            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1564            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1565            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1566            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1567        ];
1568
1569        // Create schema for old format (without primary key columns)
1570        let old_schema = build_test_arrow_schema();
1571        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1572
1573        // Test conversion with dense encoding
1574        let result = format.convert_batch(record_batch, None).unwrap();
1575
1576        // Construct expected RecordBatch in flat format with decoded primary key columns
1577        let expected_columns: Vec<ArrayRef> = vec![
1578            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1579            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1580            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1581            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1582            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1583            dense_pk_array,                                // __primary_key (preserved)
1584            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1585            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1586        ];
1587        let expected_record_batch = RecordBatch::try_new(
1588            build_test_flat_sst_schema_with_field_ids(),
1589            expected_columns,
1590        )
1591        .unwrap();
1592
1593        // Compare the actual result with the expected record batch
1594        assert_eq!(expected_record_batch, result);
1595    }
1596
1597    #[test]
1598    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1599        let metadata = build_test_sparse_region_metadata();
1600
1601        let column_ids: Vec<_> = metadata
1602            .column_metadatas
1603            .iter()
1604            .map(|c| c.column_id)
1605            .collect();
1606        let format = FlatReadFormat::new(
1607            metadata.clone(),
1608            ReadColumns::from_deduped_column_ids(column_ids.clone()),
1609            None,
1610            "test",
1611            false,
1612        )
1613        .unwrap();
1614
1615        let num_rows = 4;
1616        let original_sequence = 100u64;
1617
1618        // Create sparse test data with table_id, tsid and string tags
1619        let pk_test_rows = vec![
1620            SparseTestRow {
1621                table_id: 1,
1622                tsid: 123,
1623                tag0: "frontend".to_string(),
1624                tag1: "pod1".to_string(),
1625            };
1626            num_rows
1627        ];
1628
1629        let codec = SparsePrimaryKeyCodec::new(&metadata);
1630        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1631        // Create a test record batch in old format using sparse encoding
1632        let columns: Vec<ArrayRef> = vec![
1633            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1634            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1635            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1636            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1637            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1638            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1639        ];
1640
1641        // Create schema for old format (without primary key columns)
1642        let old_schema = build_test_arrow_schema();
1643        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1644
1645        // Test conversion with sparse encoding
1646        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1647
1648        // Construct expected RecordBatch in flat format with decoded primary key columns
1649        let tag0_array = Arc::new(DictionaryArray::new(
1650            UInt32Array::from(vec![0; num_rows]),
1651            Arc::new(StringArray::from(vec!["frontend"])),
1652        ));
1653        let tag1_array = Arc::new(DictionaryArray::new(
1654            UInt32Array::from(vec![0; num_rows]),
1655            Arc::new(StringArray::from(vec!["pod1"])),
1656        ));
1657        let expected_columns: Vec<ArrayRef> = vec![
1658            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1659            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1660            tag0_array,                                     // tag0 (decoded from primary key)
1661            tag1_array,                                     // tag1 (decoded from primary key)
1662            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1663            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1664            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1665            sparse_pk_array,                                // __primary_key (preserved)
1666            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1667            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1668        ];
1669        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1670        let expected_record_batch =
1671            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1672
1673        // Compare the actual result with the expected record batch
1674        assert_eq!(expected_record_batch, result);
1675
1676        let format = FlatReadFormat::new(
1677            metadata.clone(),
1678            ReadColumns::from_deduped_column_ids(column_ids),
1679            None,
1680            "test",
1681            true,
1682        )
1683        .unwrap();
1684        // Test conversion with sparse encoding and skip convert.
1685        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1686        assert_eq!(record_batch, result);
1687    }
1688
1689    #[test]
1690    fn test_convert_flat_batch() {
1691        let metadata = build_test_region_metadata();
1692        let write_format = PrimaryKeyWriteFormat::new(metadata);
1693
1694        let num_rows = 4;
1695        // Build a flat record batch: tag0, tag1, field1, field0, ts, __primary_key, __sequence, __op_type
1696        let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1697        let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1698
1699        // num_fields = 2 (field1, field0)
1700        let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1701
1702        // Expected: tag columns stripped, only field1, field0, ts, __primary_key, __sequence, __op_type
1703        let expected_columns: Vec<ArrayRef> = vec![
1704            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1705            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1706            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1707            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1708            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
1709            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1710        ];
1711        let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1712
1713        assert_eq!(expected, result);
1714    }
1715
1716    #[test]
1717    fn test_convert_flat_batch_with_override_sequence() {
1718        let metadata = build_test_region_metadata();
1719        let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
1720
1721        let num_rows = 4;
1722        let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1723        let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1724
1725        let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1726
1727        let expected_columns: Vec<ArrayRef> = vec![
1728            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1729            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1730            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1731            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1732            Arc::new(UInt64Array::from(vec![999; num_rows])), // overridden __sequence
1733            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1734        ];
1735        let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1736
1737        assert_eq!(expected, result);
1738    }
1739
1740    #[test]
1741    fn test_convert_flat_batch_no_tags() {
1742        // Test with a region that has no primary key columns (no tags to strip).
1743        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1744        builder
1745            .push_column_metadata(ColumnMetadata {
1746                column_schema: ColumnSchema::new(
1747                    "field0",
1748                    ConcreteDataType::int64_datatype(),
1749                    true,
1750                ),
1751                semantic_type: SemanticType::Field,
1752                column_id: 1,
1753            })
1754            .push_column_metadata(ColumnMetadata {
1755                column_schema: ColumnSchema::new(
1756                    "ts",
1757                    ConcreteDataType::timestamp_millisecond_datatype(),
1758                    false,
1759                ),
1760                semantic_type: SemanticType::Timestamp,
1761                column_id: 2,
1762            });
1763        let metadata = Arc::new(builder.build().unwrap());
1764        let write_format = PrimaryKeyWriteFormat::new(metadata);
1765
1766        let num_rows = 3;
1767        // No tag columns, so flat batch is: field0, ts, __primary_key, __sequence, __op_type
1768        let sst_schema = write_format.arrow_schema().clone();
1769        let columns: Vec<ArrayRef> = vec![
1770            Arc::new(Int64Array::from(vec![10; num_rows])), // field0
1771            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), // ts
1772            build_test_pk_array(&[(b"".to_vec(), num_rows)]), // __primary_key
1773            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
1774            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1775        ];
1776        let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
1777
1778        // num_fields = 1, num_tag_columns = 5 - 1 - 4 = 0, so nothing is stripped
1779        let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
1780        let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
1781
1782        assert_eq!(expected, result);
1783    }
1784}