mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow::record_batch::RecordBatch;
39use datatypes::prelude::DataType;
40use datatypes::vectors::{Helper, Vector};
41use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
42use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
43use parquet::file::statistics::Statistics;
44use snafu::{ensure, OptionExt, ResultExt};
45use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
46use store_api::storage::{ColumnId, SequenceNumber};
47
48use crate::error::{
49    ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
50};
51use crate::read::{Batch, BatchBuilder, BatchColumn};
52use crate::sst::file::{FileMeta, FileTimeRange};
53use crate::sst::to_sst_arrow_schema;
54
55/// Arrow array type for the primary key dictionary.
56pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
57
58/// Number of columns that have fixed positions.
59///
60/// Contains: time index and internal columns.
61const FIXED_POS_COLUMN_NUM: usize = 4;
62
63/// Helper for writing the SST format.
64pub(crate) struct WriteFormat {
65    metadata: RegionMetadataRef,
66    /// SST file schema.
67    arrow_schema: SchemaRef,
68    override_sequence: Option<SequenceNumber>,
69}
70
71impl WriteFormat {
72    /// Creates a new helper.
73    pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
74        let arrow_schema = to_sst_arrow_schema(&metadata);
75        WriteFormat {
76            metadata,
77            arrow_schema,
78            override_sequence: None,
79        }
80    }
81
82    /// Set override sequence.
83    pub(crate) fn with_override_sequence(
84        mut self,
85        override_sequence: Option<SequenceNumber>,
86    ) -> Self {
87        self.override_sequence = override_sequence;
88        self
89    }
90
91    /// Gets the arrow schema to store in parquet.
92    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
93        &self.arrow_schema
94    }
95
96    /// Convert `batch` to a arrow record batch to store in parquet.
97    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
98        debug_assert_eq!(
99            batch.fields().len() + FIXED_POS_COLUMN_NUM,
100            self.arrow_schema.fields().len()
101        );
102        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
103        // Store all fields first.
104        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
105            ensure!(
106                column.column_id == column_metadata.column_id,
107                InvalidBatchSnafu {
108                    reason: format!(
109                        "Batch has column {} but metadata has column {}",
110                        column.column_id, column_metadata.column_id
111                    ),
112                }
113            );
114
115            columns.push(column.data.to_arrow_array());
116        }
117        // Add time index column.
118        columns.push(batch.timestamps().to_arrow_array());
119        // Add internal columns: primary key, sequences, op types.
120        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
121
122        if let Some(override_sequence) = self.override_sequence {
123            let sequence_array =
124                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
125            columns.push(sequence_array);
126        } else {
127            columns.push(batch.sequences().to_arrow_array());
128        }
129        columns.push(batch.op_types().to_arrow_array());
130
131        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
132    }
133}
134
135/// Helper for reading the SST format.
136pub struct ReadFormat {
137    /// The metadata stored in the SST.
138    metadata: RegionMetadataRef,
139    /// SST file schema.
140    arrow_schema: SchemaRef,
141    /// Field column id to its index in `schema` (SST schema).
142    /// In SST schema, fields are stored in the front of the schema.
143    field_id_to_index: HashMap<ColumnId, usize>,
144    /// Indices of columns to read from the SST. It contains all internal columns.
145    projection_indices: Vec<usize>,
146    /// Field column id to their index in the projected schema (
147    /// the schema of [Batch]).
148    field_id_to_projected_index: HashMap<ColumnId, usize>,
149    /// Sequence number to override the sequence read from the SST.
150    override_sequence: Option<SequenceNumber>,
151}
152
153impl ReadFormat {
154    /// Creates a helper with existing `metadata` and `column_ids` to read.
155    pub fn new(
156        metadata: RegionMetadataRef,
157        column_ids: impl Iterator<Item = ColumnId>,
158    ) -> ReadFormat {
159        let field_id_to_index: HashMap<_, _> = metadata
160            .field_columns()
161            .enumerate()
162            .map(|(index, column)| (column.column_id, index))
163            .collect();
164        let arrow_schema = to_sst_arrow_schema(&metadata);
165
166        // Maps column id of a projected field to its index in SST.
167        let mut projected_field_id_index: Vec<_> = column_ids
168            .filter_map(|column_id| {
169                // Only apply projection to fields.
170                field_id_to_index
171                    .get(&column_id)
172                    .copied()
173                    .map(|index| (column_id, index))
174            })
175            .collect();
176        let mut projection_indices: Vec<_> = projected_field_id_index
177            .iter()
178            .map(|(_column_id, index)| *index)
179            // We need to add all fixed position columns.
180            .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
181            .collect();
182        projection_indices.sort_unstable();
183
184        // Sort fields by their indices in the SST. Then the order of fields is their order
185        // in the Batch.
186        projected_field_id_index.sort_unstable_by_key(|x| x.1);
187        // Because the SST put fields before other columns, we don't need to consider other
188        // columns.
189        let field_id_to_projected_index = projected_field_id_index
190            .into_iter()
191            .map(|(column_id, _)| column_id)
192            .enumerate()
193            .map(|(index, column_id)| (column_id, index))
194            .collect();
195
196        ReadFormat {
197            metadata,
198            arrow_schema,
199            field_id_to_index,
200            projection_indices,
201            field_id_to_projected_index,
202            override_sequence: None,
203        }
204    }
205
206    /// Sets the sequence number to override.
207    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
208        self.override_sequence = sequence;
209    }
210
211    /// Gets the arrow schema of the SST file.
212    ///
213    /// This schema is computed from the region metadata but should be the same
214    /// as the arrow schema decoded from the file metadata.
215    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
216        &self.arrow_schema
217    }
218
219    /// Gets the metadata of the SST.
220    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
221        &self.metadata
222    }
223
224    /// Gets sorted projection indices to read.
225    pub(crate) fn projection_indices(&self) -> &[usize] {
226        &self.projection_indices
227    }
228
229    /// Creates a sequence array to override.
230    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
231        self.override_sequence
232            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
233    }
234
235    /// Convert a arrow record batch into `batches`.
236    ///
237    /// The length of `override_sequence_array` must be larger than the length of the record batch.
238    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
239    pub fn convert_record_batch(
240        &self,
241        record_batch: &RecordBatch,
242        override_sequence_array: Option<&ArrayRef>,
243        batches: &mut VecDeque<Batch>,
244    ) -> Result<()> {
245        debug_assert!(batches.is_empty());
246
247        // The record batch must has time index and internal columns.
248        ensure!(
249            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
250            InvalidRecordBatchSnafu {
251                reason: format!(
252                    "record batch only has {} columns",
253                    record_batch.num_columns()
254                ),
255            }
256        );
257
258        let mut fixed_pos_columns = record_batch
259            .columns()
260            .iter()
261            .rev()
262            .take(FIXED_POS_COLUMN_NUM);
263        // Safety: We have checked the column number.
264        let op_type_array = fixed_pos_columns.next().unwrap();
265        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
266        let pk_array = fixed_pos_columns.next().unwrap();
267        let ts_array = fixed_pos_columns.next().unwrap();
268        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
269
270        // Override sequence array if provided.
271        if let Some(override_array) = override_sequence_array {
272            assert!(override_array.len() >= sequence_array.len());
273            // It's fine to assign the override array directly, but we slice it to make
274            // sure it matches the length of the original sequence array.
275            sequence_array = if override_array.len() > sequence_array.len() {
276                override_array.slice(0, sequence_array.len())
277            } else {
278                override_array.clone()
279            };
280        }
281
282        // Compute primary key offsets.
283        let pk_dict_array = pk_array
284            .as_any()
285            .downcast_ref::<PrimaryKeyArray>()
286            .with_context(|| InvalidRecordBatchSnafu {
287                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
288            })?;
289        let offsets = primary_key_offsets(pk_dict_array)?;
290        if offsets.is_empty() {
291            return Ok(());
292        }
293
294        // Split record batch according to pk offsets.
295        let keys = pk_dict_array.keys();
296        let pk_values = pk_dict_array
297            .values()
298            .as_any()
299            .downcast_ref::<BinaryArray>()
300            .with_context(|| InvalidRecordBatchSnafu {
301                reason: format!(
302                    "values of primary key array should not be {:?}",
303                    pk_dict_array.values().data_type()
304                ),
305            })?;
306        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
307            let end = offsets[i + 1];
308            let rows_in_batch = end - start;
309            let dict_key = keys.value(*start);
310            let primary_key = pk_values.value(dict_key as usize).to_vec();
311
312            let mut builder = BatchBuilder::new(primary_key);
313            builder
314                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
315                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
316                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
317            // Push all fields
318            for batch_column in &field_batch_columns {
319                builder.push_field(BatchColumn {
320                    column_id: batch_column.column_id,
321                    data: batch_column.data.slice(*start, rows_in_batch),
322                });
323            }
324
325            let batch = builder.build()?;
326            batches.push_back(batch);
327        }
328
329        Ok(())
330    }
331
332    /// Returns min values of specific column in row groups.
333    pub fn min_values(
334        &self,
335        row_groups: &[impl Borrow<RowGroupMetaData>],
336        column_id: ColumnId,
337    ) -> StatValues {
338        let Some(column) = self.metadata.column_by_id(column_id) else {
339            // No such column in the SST.
340            return StatValues::NoColumn;
341        };
342        match column.semantic_type {
343            SemanticType::Tag => self.tag_values(row_groups, column, true),
344            SemanticType::Field => {
345                // Safety: `field_id_to_index` is initialized by the semantic type.
346                let index = self.field_id_to_index.get(&column_id).unwrap();
347                let stats = Self::column_values(row_groups, column, *index, true);
348                StatValues::from_stats_opt(stats)
349            }
350            SemanticType::Timestamp => {
351                let index = self.time_index_position();
352                let stats = Self::column_values(row_groups, column, index, true);
353                StatValues::from_stats_opt(stats)
354            }
355        }
356    }
357
358    /// Returns max values of specific column in row groups.
359    pub fn max_values(
360        &self,
361        row_groups: &[impl Borrow<RowGroupMetaData>],
362        column_id: ColumnId,
363    ) -> StatValues {
364        let Some(column) = self.metadata.column_by_id(column_id) else {
365            // No such column in the SST.
366            return StatValues::NoColumn;
367        };
368        match column.semantic_type {
369            SemanticType::Tag => self.tag_values(row_groups, column, false),
370            SemanticType::Field => {
371                // Safety: `field_id_to_index` is initialized by the semantic type.
372                let index = self.field_id_to_index.get(&column_id).unwrap();
373                let stats = Self::column_values(row_groups, column, *index, false);
374                StatValues::from_stats_opt(stats)
375            }
376            SemanticType::Timestamp => {
377                let index = self.time_index_position();
378                let stats = Self::column_values(row_groups, column, index, false);
379                StatValues::from_stats_opt(stats)
380            }
381        }
382    }
383
384    /// Returns null counts of specific column in row groups.
385    pub fn null_counts(
386        &self,
387        row_groups: &[impl Borrow<RowGroupMetaData>],
388        column_id: ColumnId,
389    ) -> StatValues {
390        let Some(column) = self.metadata.column_by_id(column_id) else {
391            // No such column in the SST.
392            return StatValues::NoColumn;
393        };
394        match column.semantic_type {
395            SemanticType::Tag => StatValues::NoStats,
396            SemanticType::Field => {
397                // Safety: `field_id_to_index` is initialized by the semantic type.
398                let index = self.field_id_to_index.get(&column_id).unwrap();
399                let stats = Self::column_null_counts(row_groups, *index);
400                StatValues::from_stats_opt(stats)
401            }
402            SemanticType::Timestamp => {
403                let index = self.time_index_position();
404                let stats = Self::column_null_counts(row_groups, index);
405                StatValues::from_stats_opt(stats)
406            }
407        }
408    }
409
410    /// Get fields from `record_batch`.
411    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
412        record_batch
413            .columns()
414            .iter()
415            .zip(record_batch.schema().fields())
416            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
417            .map(|(array, field)| {
418                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
419                let column = self
420                    .metadata
421                    .column_by_name(field.name())
422                    .with_context(|| InvalidRecordBatchSnafu {
423                        reason: format!("column {} not found in metadata", field.name()),
424                    })?;
425
426                Ok(BatchColumn {
427                    column_id: column.column_id,
428                    data: vector,
429                })
430            })
431            .collect()
432    }
433
434    /// Returns min/max values of specific tag.
435    fn tag_values(
436        &self,
437        row_groups: &[impl Borrow<RowGroupMetaData>],
438        column: &ColumnMetadata,
439        is_min: bool,
440    ) -> StatValues {
441        let is_first_tag = self
442            .metadata
443            .primary_key
444            .first()
445            .map(|id| *id == column.column_id)
446            .unwrap_or(false);
447        if !is_first_tag {
448            // Only the min-max of the first tag is available in the primary key.
449            return StatValues::NoStats;
450        }
451
452        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
453    }
454
455    /// Returns min/max values of the first tag.
456    /// Returns None if the tag does not have statistics.
457    fn first_tag_values(
458        &self,
459        row_groups: &[impl Borrow<RowGroupMetaData>],
460        column: &ColumnMetadata,
461        is_min: bool,
462    ) -> Option<ArrayRef> {
463        debug_assert!(self
464            .metadata
465            .primary_key
466            .first()
467            .map(|id| *id == column.column_id)
468            .unwrap_or(false));
469
470        let primary_key_encoding = self.metadata.primary_key_encoding;
471        let converter = build_primary_key_codec_with_fields(
472            primary_key_encoding,
473            [(
474                column.column_id,
475                SortField::new(column.column_schema.data_type.clone()),
476            )]
477            .into_iter(),
478        );
479
480        let values = row_groups.iter().map(|meta| {
481            let stats = meta
482                .borrow()
483                .column(self.primary_key_position())
484                .statistics()?;
485            match stats {
486                Statistics::Boolean(_) => None,
487                Statistics::Int32(_) => None,
488                Statistics::Int64(_) => None,
489                Statistics::Int96(_) => None,
490                Statistics::Float(_) => None,
491                Statistics::Double(_) => None,
492                Statistics::ByteArray(s) => {
493                    let bytes = if is_min {
494                        s.min_bytes_opt()?
495                    } else {
496                        s.max_bytes_opt()?
497                    };
498                    converter.decode_leftmost(bytes).ok()?
499                }
500                Statistics::FixedLenByteArray(_) => None,
501            }
502        });
503        let mut builder = column
504            .column_schema
505            .data_type
506            .create_mutable_vector(row_groups.len());
507        for value_opt in values {
508            match value_opt {
509                // Safety: We use the same data type to create the converter.
510                Some(v) => builder.push_value_ref(v.as_value_ref()),
511                None => builder.push_null(),
512            }
513        }
514        let vector = builder.to_vector();
515
516        Some(vector.to_arrow_array())
517    }
518
519    /// Returns min/max values of specific non-tag columns.
520    /// Returns None if the column does not have statistics.
521    fn column_values(
522        row_groups: &[impl Borrow<RowGroupMetaData>],
523        column: &ColumnMetadata,
524        column_index: usize,
525        is_min: bool,
526    ) -> Option<ArrayRef> {
527        let null_scalar: ScalarValue = column
528            .column_schema
529            .data_type
530            .as_arrow_type()
531            .try_into()
532            .ok()?;
533        let scalar_values = row_groups
534            .iter()
535            .map(|meta| {
536                let stats = meta.borrow().column(column_index).statistics()?;
537                match stats {
538                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
539                        *s.min_opt()?
540                    } else {
541                        *s.max_opt()?
542                    }))),
543                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
544                        *s.min_opt()?
545                    } else {
546                        *s.max_opt()?
547                    }))),
548                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
549                        *s.min_opt()?
550                    } else {
551                        *s.max_opt()?
552                    }))),
553
554                    Statistics::Int96(_) => None,
555                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
556                        *s.min_opt()?
557                    } else {
558                        *s.max_opt()?
559                    }))),
560                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
561                        *s.min_opt()?
562                    } else {
563                        *s.max_opt()?
564                    }))),
565                    Statistics::ByteArray(s) => {
566                        let bytes = if is_min {
567                            s.min_bytes_opt()?
568                        } else {
569                            s.max_bytes_opt()?
570                        };
571                        let s = String::from_utf8(bytes.to_vec()).ok();
572                        Some(ScalarValue::Utf8(s))
573                    }
574
575                    Statistics::FixedLenByteArray(_) => None,
576                }
577            })
578            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
579            .collect::<Vec<ScalarValue>>();
580        debug_assert_eq!(scalar_values.len(), row_groups.len());
581        ScalarValue::iter_to_array(scalar_values).ok()
582    }
583
584    /// Returns null counts of specific non-tag columns.
585    fn column_null_counts(
586        row_groups: &[impl Borrow<RowGroupMetaData>],
587        column_index: usize,
588    ) -> Option<ArrayRef> {
589        let values = row_groups.iter().map(|meta| {
590            let col = meta.borrow().column(column_index);
591            let stat = col.statistics()?;
592            stat.null_count_opt()
593        });
594        Some(Arc::new(UInt64Array::from_iter(values)))
595    }
596
597    /// Index in SST of the primary key.
598    fn primary_key_position(&self) -> usize {
599        self.arrow_schema.fields.len() - 3
600    }
601
602    /// Index in SST of the time index.
603    fn time_index_position(&self) -> usize {
604        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
605    }
606
607    /// Index of a field column by its column id.
608    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
609        self.field_id_to_projected_index.get(&column_id).copied()
610    }
611}
612
613/// Values of column statistics of the SST.
614///
615/// It also distinguishes the case that a column is not found and
616/// the column exists but has no statistics.
617pub enum StatValues {
618    /// Values of each row group.
619    Values(ArrayRef),
620    /// No such column.
621    NoColumn,
622    /// Column exists but has no statistics.
623    NoStats,
624}
625
626impl StatValues {
627    /// Creates a new `StatValues` instance from optional statistics.
628    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
629        match stats {
630            Some(stats) => StatValues::Values(stats),
631            None => StatValues::NoStats,
632        }
633    }
634}
635
636#[cfg(test)]
637impl ReadFormat {
638    /// Creates a helper with existing `metadata` and all columns.
639    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
640        Self::new(
641            Arc::clone(&metadata),
642            metadata.column_metadatas.iter().map(|c| c.column_id),
643        )
644    }
645}
646
647/// Compute offsets of different primary keys in the array.
648fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
649    if pk_dict_array.is_empty() {
650        return Ok(Vec::new());
651    }
652
653    // Init offsets.
654    let mut offsets = vec![0];
655    let keys = pk_dict_array.keys();
656    // We know that primary keys are always not null so we iterate `keys.values()` directly.
657    let pk_indices = keys.values();
658    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
659        // Compare each key with next key
660        if *key != pk_indices[i + 1] {
661            // We meet a new key, push the next index as end of the offset.
662            offsets.push(i + 1);
663        }
664    }
665    offsets.push(keys.len());
666
667    Ok(offsets)
668}
669
670/// Creates a new array for specific `primary_key`.
671fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
672    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
673    let keys = UInt32Array::from_value(0, num_rows);
674
675    // Safety: The key index is valid.
676    Arc::new(DictionaryArray::new(keys, values))
677}
678
679/// Gets the min/max time index of the row group from the parquet meta.
680/// It assumes the parquet is created by the mito engine.
681pub(crate) fn parquet_row_group_time_range(
682    file_meta: &FileMeta,
683    parquet_meta: &ParquetMetaData,
684    row_group_idx: usize,
685) -> Option<FileTimeRange> {
686    let row_group_meta = parquet_meta.row_group(row_group_idx);
687    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
688    assert!(
689        num_columns >= FIXED_POS_COLUMN_NUM,
690        "file only has {} columns",
691        num_columns
692    );
693    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
694
695    let stats = row_group_meta.column(time_index_pos).statistics()?;
696    // The physical type for the timestamp should be i64.
697    let (min, max) = match stats {
698        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
699        Statistics::Int32(_)
700        | Statistics::Boolean(_)
701        | Statistics::Int96(_)
702        | Statistics::Float(_)
703        | Statistics::Double(_)
704        | Statistics::ByteArray(_)
705        | Statistics::FixedLenByteArray(_) => {
706            common_telemetry::warn!(
707                "Invalid statistics {:?} for time index in parquet in {}",
708                stats,
709                file_meta.file_id
710            );
711            return None;
712        }
713    };
714
715    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
716    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
717    let unit = file_meta.time_range.0.unit();
718
719    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
720}
721
722/// Checks if sequence override is needed based on all row groups' statistics.
723/// Returns true if ALL row groups have sequence min-max values of 0.
724pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
725    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
726    if num_columns < FIXED_POS_COLUMN_NUM {
727        return false;
728    }
729
730    // The sequence column is the second-to-last column (before op_type)
731    let sequence_pos = num_columns - 2;
732
733    // Check all row groups - all must have sequence min-max of 0
734    for row_group in parquet_meta.row_groups() {
735        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
736            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
737                // If any row group doesn't have min=0 and max=0, return false
738                if *min_val != 0 || *max_val != 0 {
739                    return false;
740                }
741            } else {
742                // If any row group doesn't have statistics, return false
743                return false;
744            }
745        } else {
746            // If any row group doesn't have Int64 statistics, return false
747            return false;
748        }
749    }
750
751    // All row groups have sequence min-max of 0, or there are no row groups
752    !parquet_meta.row_groups().is_empty()
753}
754
755#[cfg(test)]
756mod tests {
757    use api::v1::OpType;
758    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
759    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
760    use datatypes::prelude::ConcreteDataType;
761    use datatypes::schema::ColumnSchema;
762    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
763    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
764    use store_api::storage::RegionId;
765
766    use super::*;
767
768    const TEST_SEQUENCE: u64 = 1;
769    const TEST_OP_TYPE: u8 = OpType::Put as u8;
770
771    fn build_test_region_metadata() -> RegionMetadataRef {
772        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
773        builder
774            .push_column_metadata(ColumnMetadata {
775                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
776                semantic_type: SemanticType::Tag,
777                column_id: 1,
778            })
779            .push_column_metadata(ColumnMetadata {
780                column_schema: ColumnSchema::new(
781                    "field1",
782                    ConcreteDataType::int64_datatype(),
783                    true,
784                ),
785                semantic_type: SemanticType::Field,
786                column_id: 4, // We change the order of fields columns.
787            })
788            .push_column_metadata(ColumnMetadata {
789                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
790                semantic_type: SemanticType::Tag,
791                column_id: 3,
792            })
793            .push_column_metadata(ColumnMetadata {
794                column_schema: ColumnSchema::new(
795                    "field0",
796                    ConcreteDataType::int64_datatype(),
797                    true,
798                ),
799                semantic_type: SemanticType::Field,
800                column_id: 2,
801            })
802            .push_column_metadata(ColumnMetadata {
803                column_schema: ColumnSchema::new(
804                    "ts",
805                    ConcreteDataType::timestamp_millisecond_datatype(),
806                    false,
807                ),
808                semantic_type: SemanticType::Timestamp,
809                column_id: 5,
810            })
811            .primary_key(vec![1, 3]);
812        Arc::new(builder.build().unwrap())
813    }
814
815    fn build_test_arrow_schema() -> SchemaRef {
816        let fields = vec![
817            Field::new("field1", ArrowDataType::Int64, true),
818            Field::new("field0", ArrowDataType::Int64, true),
819            Field::new(
820                "ts",
821                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
822                false,
823            ),
824            Field::new(
825                "__primary_key",
826                ArrowDataType::Dictionary(
827                    Box::new(ArrowDataType::UInt32),
828                    Box::new(ArrowDataType::Binary),
829                ),
830                false,
831            ),
832            Field::new("__sequence", ArrowDataType::UInt64, false),
833            Field::new("__op_type", ArrowDataType::UInt8, false),
834        ];
835        Arc::new(Schema::new(fields))
836    }
837
838    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
839        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
840    }
841
842    fn new_batch_with_sequence(
843        primary_key: &[u8],
844        start_ts: i64,
845        start_field: i64,
846        num_rows: usize,
847        sequence: u64,
848    ) -> Batch {
849        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
850        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
851        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
852        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
853        let fields = vec![
854            BatchColumn {
855                column_id: 4,
856                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
857            }, // field1
858            BatchColumn {
859                column_id: 2,
860                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
861            }, // field0
862        ];
863
864        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
865            .with_fields(fields)
866            .build()
867            .unwrap()
868    }
869
870    #[test]
871    fn test_to_sst_arrow_schema() {
872        let metadata = build_test_region_metadata();
873        let write_format = WriteFormat::new(metadata);
874        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
875    }
876
877    #[test]
878    fn test_new_primary_key_array() {
879        let array = new_primary_key_array(b"test", 3);
880        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
881        assert_eq!(&expect, &array);
882    }
883
884    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
885        let values = Arc::new(BinaryArray::from_iter_values(
886            pk_row_nums.iter().map(|v| &v.0),
887        ));
888        let mut keys = vec![];
889        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
890            keys.extend(std::iter::repeat_n(index as u32, num_rows));
891        }
892        let keys = UInt32Array::from(keys);
893        Arc::new(DictionaryArray::new(keys, values))
894    }
895
896    #[test]
897    fn test_convert_batch() {
898        let metadata = build_test_region_metadata();
899        let write_format = WriteFormat::new(metadata);
900
901        let num_rows = 4;
902        let batch = new_batch(b"test", 1, 2, num_rows);
903        let columns: Vec<ArrayRef> = vec![
904            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
905            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
906            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
907            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
908            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
909            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
910        ];
911        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
912
913        let actual = write_format.convert_batch(&batch).unwrap();
914        assert_eq!(expect_record, actual);
915    }
916
917    #[test]
918    fn test_convert_batch_with_override_sequence() {
919        let metadata = build_test_region_metadata();
920        let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411));
921
922        let num_rows = 4;
923        let batch = new_batch(b"test", 1, 2, num_rows);
924        let columns: Vec<ArrayRef> = vec![
925            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
926            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
927            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
928            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
929            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
930            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
931        ];
932        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
933
934        let actual = write_format.convert_batch(&batch).unwrap();
935        assert_eq!(expect_record, actual);
936    }
937
938    #[test]
939    fn test_projection_indices() {
940        let metadata = build_test_region_metadata();
941        // Only read tag1
942        let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
943        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
944        // Only read field1
945        let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
946        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
947        // Only read ts
948        let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
949        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
950        // Read field0, tag0, ts
951        let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
952        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
953    }
954
955    #[test]
956    fn test_empty_primary_key_offsets() {
957        let array = build_test_pk_array(&[]);
958        assert!(primary_key_offsets(&array).unwrap().is_empty());
959    }
960
961    #[test]
962    fn test_primary_key_offsets_one_series() {
963        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
964        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
965
966        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
967        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
968
969        let array = build_test_pk_array(&[
970            (b"one".to_vec(), 1),
971            (b"two".to_vec(), 1),
972            (b"three".to_vec(), 1),
973        ]);
974        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
975    }
976
977    #[test]
978    fn test_primary_key_offsets_multi_series() {
979        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
980        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
981
982        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
983        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
984
985        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
986        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
987    }
988
989    #[test]
990    fn test_convert_empty_record_batch() {
991        let metadata = build_test_region_metadata();
992        let arrow_schema = build_test_arrow_schema();
993        let column_ids: Vec<_> = metadata
994            .column_metadatas
995            .iter()
996            .map(|col| col.column_id)
997            .collect();
998        let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
999        assert_eq!(arrow_schema, *read_format.arrow_schema());
1000
1001        let record_batch = RecordBatch::new_empty(arrow_schema);
1002        let mut batches = VecDeque::new();
1003        read_format
1004            .convert_record_batch(&record_batch, None, &mut batches)
1005            .unwrap();
1006        assert!(batches.is_empty());
1007    }
1008
1009    #[test]
1010    fn test_convert_record_batch() {
1011        let metadata = build_test_region_metadata();
1012        let column_ids: Vec<_> = metadata
1013            .column_metadatas
1014            .iter()
1015            .map(|col| col.column_id)
1016            .collect();
1017        let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
1018
1019        let columns: Vec<ArrayRef> = vec![
1020            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1021            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1022            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1023            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1024            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1025            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1026        ];
1027        let arrow_schema = build_test_arrow_schema();
1028        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1029        let mut batches = VecDeque::new();
1030        read_format
1031            .convert_record_batch(&record_batch, None, &mut batches)
1032            .unwrap();
1033
1034        assert_eq!(
1035            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1036            batches.into_iter().collect::<Vec<_>>(),
1037        );
1038    }
1039
1040    #[test]
1041    fn test_convert_record_batch_with_override_sequence() {
1042        let metadata = build_test_region_metadata();
1043        let column_ids: Vec<_> = metadata
1044            .column_metadatas
1045            .iter()
1046            .map(|col| col.column_id)
1047            .collect();
1048        let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
1049
1050        let columns: Vec<ArrayRef> = vec![
1051            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1052            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1053            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1054            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1055            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1056            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1057        ];
1058        let arrow_schema = build_test_arrow_schema();
1059        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1060
1061        // Create override sequence array with custom values
1062        let override_sequence: u64 = 12345;
1063        let override_sequence_array: ArrayRef =
1064            Arc::new(UInt64Array::from_value(override_sequence, 4));
1065
1066        let mut batches = VecDeque::new();
1067        read_format
1068            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1069            .unwrap();
1070
1071        // Create expected batches with override sequence
1072        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1073        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1074
1075        assert_eq!(
1076            vec![expected_batch1, expected_batch2],
1077            batches.into_iter().collect::<Vec<_>>(),
1078        );
1079    }
1080}