mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::Helper;
43use mito_codec::row_converter::{
44    CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45    build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54    ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
55};
56use crate::read::{Batch, BatchBuilder, BatchColumn};
57use crate::sst::file::{FileMeta, FileTimeRange};
58use crate::sst::parquet::flat_format::FlatReadFormat;
59use crate::sst::to_sst_arrow_schema;
60
61/// Arrow array type for the primary key dictionary.
62pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
63/// Builder type for primary key dictionary array.
64pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
65
66/// Number of columns that have fixed positions.
67///
68/// Contains: time index and internal columns.
69pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
70/// Number of internal columns.
71pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
72
73/// Helper for writing the SST format with primary key.
74pub(crate) struct PrimaryKeyWriteFormat {
75    /// SST file schema.
76    arrow_schema: SchemaRef,
77    override_sequence: Option<SequenceNumber>,
78}
79
80impl PrimaryKeyWriteFormat {
81    /// Creates a new helper.
82    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
83        let arrow_schema = to_sst_arrow_schema(&metadata);
84        PrimaryKeyWriteFormat {
85            arrow_schema,
86            override_sequence: None,
87        }
88    }
89
90    /// Set override sequence.
91    pub(crate) fn with_override_sequence(
92        mut self,
93        override_sequence: Option<SequenceNumber>,
94    ) -> Self {
95        self.override_sequence = override_sequence;
96        self
97    }
98
99    /// Gets the arrow schema to store in parquet.
100    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
101        &self.arrow_schema
102    }
103
104    /// Convert a flat `RecordBatch` to primary-key format, retaining only
105    /// field columns, time index, and internal columns.
106    ///
107    /// `num_fields` is the number of field columns. The method strips
108    /// leading tag columns: `num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM`.
109    pub(crate) fn convert_flat_batch(
110        &self,
111        batch: &RecordBatch,
112        num_fields: usize,
113    ) -> Result<RecordBatch> {
114        let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
115        let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
116
117        if let Some(override_sequence) = self.override_sequence {
118            let num_cols = columns.len();
119            // sequence is at num_cols - 2 (before op_type)
120            columns[num_cols - 2] =
121                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
122        }
123
124        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
125    }
126}
127
128/// Helper to read parquet formats.
129pub enum ReadFormat {
130    /// The parquet is in the old primary key format.
131    PrimaryKey(PrimaryKeyReadFormat),
132    /// The parquet is in the new flat format.
133    Flat(FlatReadFormat),
134}
135
136impl ReadFormat {
137    /// Creates a helper to read the primary key format.
138    pub fn new_primary_key(
139        metadata: RegionMetadataRef,
140        column_ids: impl Iterator<Item = ColumnId>,
141    ) -> Self {
142        ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
143    }
144
145    /// Creates a helper to read the flat format.
146    pub fn new_flat(
147        metadata: RegionMetadataRef,
148        column_ids: impl Iterator<Item = ColumnId>,
149        num_columns: Option<usize>,
150        file_path: &str,
151        skip_auto_convert: bool,
152    ) -> Result<Self> {
153        Ok(ReadFormat::Flat(FlatReadFormat::new(
154            metadata,
155            column_ids,
156            num_columns,
157            file_path,
158            skip_auto_convert,
159        )?))
160    }
161
162    /// Creates a new read format.
163    pub fn new(
164        region_metadata: RegionMetadataRef,
165        projection: Option<&[ColumnId]>,
166        flat_format: bool,
167        num_columns: Option<usize>,
168        file_path: &str,
169        skip_auto_convert: bool,
170    ) -> Result<ReadFormat> {
171        if flat_format {
172            if let Some(column_ids) = projection {
173                ReadFormat::new_flat(
174                    region_metadata,
175                    column_ids.iter().copied(),
176                    num_columns,
177                    file_path,
178                    skip_auto_convert,
179                )
180            } else {
181                // No projection, lists all column ids to read.
182                ReadFormat::new_flat(
183                    region_metadata.clone(),
184                    region_metadata
185                        .column_metadatas
186                        .iter()
187                        .map(|col| col.column_id),
188                    num_columns,
189                    file_path,
190                    skip_auto_convert,
191                )
192            }
193        } else if let Some(column_ids) = projection {
194            Ok(ReadFormat::new_primary_key(
195                region_metadata,
196                column_ids.iter().copied(),
197            ))
198        } else {
199            // No projection, lists all column ids to read.
200            Ok(ReadFormat::new_primary_key(
201                region_metadata.clone(),
202                region_metadata
203                    .column_metadatas
204                    .iter()
205                    .map(|col| col.column_id),
206            ))
207        }
208    }
209
210    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
211        match self {
212            ReadFormat::PrimaryKey(format) => Some(format),
213            _ => None,
214        }
215    }
216
217    pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
218        match self {
219            ReadFormat::Flat(format) => Some(format),
220            _ => None,
221        }
222    }
223
224    /// Gets the arrow schema of the SST file.
225    ///
226    /// This schema is computed from the region metadata but should be the same
227    /// as the arrow schema decoded from the file metadata.
228    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
229        match self {
230            ReadFormat::PrimaryKey(format) => format.arrow_schema(),
231            ReadFormat::Flat(format) => format.arrow_schema(),
232        }
233    }
234
235    /// Gets the metadata of the SST.
236    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
237        match self {
238            ReadFormat::PrimaryKey(format) => format.metadata(),
239            ReadFormat::Flat(format) => format.metadata(),
240        }
241    }
242
243    /// Gets sorted projection indices to read.
244    pub(crate) fn projection_indices(&self) -> &[usize] {
245        match self {
246            ReadFormat::PrimaryKey(format) => format.projection_indices(),
247            ReadFormat::Flat(format) => format.projection_indices(),
248        }
249    }
250
251    /// Returns min values of specific column in row groups.
252    pub fn min_values(
253        &self,
254        row_groups: &[impl Borrow<RowGroupMetaData>],
255        column_id: ColumnId,
256    ) -> StatValues {
257        match self {
258            ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
259            ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
260        }
261    }
262
263    /// Returns max values of specific column in row groups.
264    pub fn max_values(
265        &self,
266        row_groups: &[impl Borrow<RowGroupMetaData>],
267        column_id: ColumnId,
268    ) -> StatValues {
269        match self {
270            ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
271            ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
272        }
273    }
274
275    /// Returns null counts of specific column in row groups.
276    pub fn null_counts(
277        &self,
278        row_groups: &[impl Borrow<RowGroupMetaData>],
279        column_id: ColumnId,
280    ) -> StatValues {
281        match self {
282            ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
283            ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
284        }
285    }
286
287    /// Returns min/max values of specific columns.
288    /// Returns None if the column does not have statistics.
289    /// The column should not be encoded as a part of a primary key.
290    pub(crate) fn column_values(
291        row_groups: &[impl Borrow<RowGroupMetaData>],
292        column: &ColumnMetadata,
293        column_index: usize,
294        is_min: bool,
295    ) -> Option<ArrayRef> {
296        let null_scalar: ScalarValue = column
297            .column_schema
298            .data_type
299            .as_arrow_type()
300            .try_into()
301            .ok()?;
302        let scalar_values = row_groups
303            .iter()
304            .map(|meta| {
305                let stats = meta.borrow().column(column_index).statistics()?;
306                match stats {
307                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
308                        *s.min_opt()?
309                    } else {
310                        *s.max_opt()?
311                    }))),
312                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
313                        *s.min_opt()?
314                    } else {
315                        *s.max_opt()?
316                    }))),
317                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
318                        *s.min_opt()?
319                    } else {
320                        *s.max_opt()?
321                    }))),
322
323                    Statistics::Int96(_) => None,
324                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
325                        *s.min_opt()?
326                    } else {
327                        *s.max_opt()?
328                    }))),
329                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
330                        *s.min_opt()?
331                    } else {
332                        *s.max_opt()?
333                    }))),
334                    Statistics::ByteArray(s) => {
335                        let bytes = if is_min {
336                            s.min_bytes_opt()?
337                        } else {
338                            s.max_bytes_opt()?
339                        };
340                        let s = String::from_utf8(bytes.to_vec()).ok();
341                        Some(ScalarValue::Utf8(s))
342                    }
343
344                    Statistics::FixedLenByteArray(_) => None,
345                }
346            })
347            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
348            .collect::<Vec<ScalarValue>>();
349        debug_assert_eq!(scalar_values.len(), row_groups.len());
350        ScalarValue::iter_to_array(scalar_values).ok()
351    }
352
353    /// Returns null counts of specific columns.
354    /// The column should not be encoded as a part of a primary key.
355    pub(crate) fn column_null_counts(
356        row_groups: &[impl Borrow<RowGroupMetaData>],
357        column_index: usize,
358    ) -> Option<ArrayRef> {
359        let values = row_groups.iter().map(|meta| {
360            let col = meta.borrow().column(column_index);
361            let stat = col.statistics()?;
362            stat.null_count_opt()
363        });
364        Some(Arc::new(UInt64Array::from_iter(values)))
365    }
366
367    /// Sets the sequence number to override.
368    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
369        match self {
370            ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
371            ReadFormat::Flat(format) => format.set_override_sequence(sequence),
372        }
373    }
374
375    /// Enables or disables eager decoding of primary key values into batches.
376    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
377        if let ReadFormat::PrimaryKey(format) = self {
378            format.set_decode_primary_key_values(decode);
379        }
380    }
381
382    /// Creates a sequence array to override.
383    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
384        match self {
385            ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
386            ReadFormat::Flat(format) => format.new_override_sequence_array(length),
387        }
388    }
389}
390
391/// Helper for reading the SST format.
392pub struct PrimaryKeyReadFormat {
393    /// The metadata stored in the SST.
394    metadata: RegionMetadataRef,
395    /// SST file schema.
396    arrow_schema: SchemaRef,
397    /// Field column id to its index in `schema` (SST schema).
398    /// In SST schema, fields are stored in the front of the schema.
399    field_id_to_index: HashMap<ColumnId, usize>,
400    /// Indices of columns to read from the SST. It contains all internal columns.
401    projection_indices: Vec<usize>,
402    /// Field column id to their index in the projected schema (
403    /// the schema of [Batch]).
404    field_id_to_projected_index: HashMap<ColumnId, usize>,
405    /// Sequence number to override the sequence read from the SST.
406    override_sequence: Option<SequenceNumber>,
407    /// Codec used to decode primary key values if eager decoding is enabled.
408    primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
409}
410
411impl PrimaryKeyReadFormat {
412    /// Creates a helper with existing `metadata` and `column_ids` to read.
413    pub fn new(
414        metadata: RegionMetadataRef,
415        column_ids: impl Iterator<Item = ColumnId>,
416    ) -> PrimaryKeyReadFormat {
417        let field_id_to_index: HashMap<_, _> = metadata
418            .field_columns()
419            .enumerate()
420            .map(|(index, column)| (column.column_id, index))
421            .collect();
422        let arrow_schema = to_sst_arrow_schema(&metadata);
423
424        let format_projection = FormatProjection::compute_format_projection(
425            &field_id_to_index,
426            arrow_schema.fields.len(),
427            column_ids,
428        );
429
430        PrimaryKeyReadFormat {
431            metadata,
432            arrow_schema,
433            field_id_to_index,
434            projection_indices: format_projection.projection_indices,
435            field_id_to_projected_index: format_projection.column_id_to_projected_index,
436            override_sequence: None,
437            primary_key_codec: None,
438        }
439    }
440
441    /// Sets the sequence number to override.
442    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
443        self.override_sequence = sequence;
444    }
445
446    /// Enables or disables eager decoding of primary key values into batches.
447    pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
448        self.primary_key_codec = if decode {
449            Some(build_primary_key_codec(&self.metadata))
450        } else {
451            None
452        };
453    }
454
455    /// Gets the arrow schema of the SST file.
456    ///
457    /// This schema is computed from the region metadata but should be the same
458    /// as the arrow schema decoded from the file metadata.
459    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
460        &self.arrow_schema
461    }
462
463    /// Gets the metadata of the SST.
464    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
465        &self.metadata
466    }
467
468    /// Gets sorted projection indices to read.
469    pub(crate) fn projection_indices(&self) -> &[usize] {
470        &self.projection_indices
471    }
472
473    /// Gets the field id to projected index.
474    pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
475        &self.field_id_to_projected_index
476    }
477
478    /// Creates a sequence array to override.
479    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
480        self.override_sequence
481            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
482    }
483
484    /// Convert a arrow record batch into `batches`.
485    ///
486    /// The length of `override_sequence_array` must be larger than the length of the record batch.
487    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
488    pub fn convert_record_batch(
489        &self,
490        record_batch: &RecordBatch,
491        override_sequence_array: Option<&ArrayRef>,
492        batches: &mut VecDeque<Batch>,
493    ) -> Result<()> {
494        debug_assert!(batches.is_empty());
495
496        // The record batch must has time index and internal columns.
497        ensure!(
498            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
499            InvalidRecordBatchSnafu {
500                reason: format!(
501                    "record batch only has {} columns",
502                    record_batch.num_columns()
503                ),
504            }
505        );
506
507        let mut fixed_pos_columns = record_batch
508            .columns()
509            .iter()
510            .rev()
511            .take(FIXED_POS_COLUMN_NUM);
512        // Safety: We have checked the column number.
513        let op_type_array = fixed_pos_columns.next().unwrap();
514        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
515        let pk_array = fixed_pos_columns.next().unwrap();
516        let ts_array = fixed_pos_columns.next().unwrap();
517        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
518
519        // Override sequence array if provided.
520        if let Some(override_array) = override_sequence_array {
521            assert!(override_array.len() >= sequence_array.len());
522            // It's fine to assign the override array directly, but we slice it to make
523            // sure it matches the length of the original sequence array.
524            sequence_array = if override_array.len() > sequence_array.len() {
525                override_array.slice(0, sequence_array.len())
526            } else {
527                override_array.clone()
528            };
529        }
530
531        // Compute primary key offsets.
532        let pk_dict_array = pk_array
533            .as_any()
534            .downcast_ref::<PrimaryKeyArray>()
535            .with_context(|| InvalidRecordBatchSnafu {
536                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
537            })?;
538        let offsets = primary_key_offsets(pk_dict_array)?;
539        if offsets.is_empty() {
540            return Ok(());
541        }
542
543        // Split record batch according to pk offsets.
544        let keys = pk_dict_array.keys();
545        let pk_values = pk_dict_array
546            .values()
547            .as_any()
548            .downcast_ref::<BinaryArray>()
549            .with_context(|| InvalidRecordBatchSnafu {
550                reason: format!(
551                    "values of primary key array should not be {:?}",
552                    pk_dict_array.values().data_type()
553                ),
554            })?;
555        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
556            let end = offsets[i + 1];
557            let rows_in_batch = end - start;
558            let dict_key = keys.value(*start);
559            let primary_key = pk_values.value(dict_key as usize).to_vec();
560
561            let mut builder = BatchBuilder::new(primary_key);
562            builder
563                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
564                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
565                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
566            // Push all fields
567            for batch_column in &field_batch_columns {
568                builder.push_field(BatchColumn {
569                    column_id: batch_column.column_id,
570                    data: batch_column.data.slice(*start, rows_in_batch),
571                });
572            }
573
574            let mut batch = builder.build()?;
575            if let Some(codec) = &self.primary_key_codec {
576                let pk_values: CompositeValues =
577                    codec.decode(batch.primary_key()).context(DecodeSnafu)?;
578                batch.set_pk_values(pk_values);
579            }
580            batches.push_back(batch);
581        }
582
583        Ok(())
584    }
585
586    /// Returns min values of specific column in row groups.
587    pub fn min_values(
588        &self,
589        row_groups: &[impl Borrow<RowGroupMetaData>],
590        column_id: ColumnId,
591    ) -> StatValues {
592        let Some(column) = self.metadata.column_by_id(column_id) else {
593            // No such column in the SST.
594            return StatValues::NoColumn;
595        };
596        match column.semantic_type {
597            SemanticType::Tag => self.tag_values(row_groups, column, true),
598            SemanticType::Field => {
599                // Safety: `field_id_to_index` is initialized by the semantic type.
600                let index = self.field_id_to_index.get(&column_id).unwrap();
601                let stats = ReadFormat::column_values(row_groups, column, *index, true);
602                StatValues::from_stats_opt(stats)
603            }
604            SemanticType::Timestamp => {
605                let index = self.time_index_position();
606                let stats = ReadFormat::column_values(row_groups, column, index, true);
607                StatValues::from_stats_opt(stats)
608            }
609        }
610    }
611
612    /// Returns max values of specific column in row groups.
613    pub fn max_values(
614        &self,
615        row_groups: &[impl Borrow<RowGroupMetaData>],
616        column_id: ColumnId,
617    ) -> StatValues {
618        let Some(column) = self.metadata.column_by_id(column_id) else {
619            // No such column in the SST.
620            return StatValues::NoColumn;
621        };
622        match column.semantic_type {
623            SemanticType::Tag => self.tag_values(row_groups, column, false),
624            SemanticType::Field => {
625                // Safety: `field_id_to_index` is initialized by the semantic type.
626                let index = self.field_id_to_index.get(&column_id).unwrap();
627                let stats = ReadFormat::column_values(row_groups, column, *index, false);
628                StatValues::from_stats_opt(stats)
629            }
630            SemanticType::Timestamp => {
631                let index = self.time_index_position();
632                let stats = ReadFormat::column_values(row_groups, column, index, false);
633                StatValues::from_stats_opt(stats)
634            }
635        }
636    }
637
638    /// Returns null counts of specific column in row groups.
639    pub fn null_counts(
640        &self,
641        row_groups: &[impl Borrow<RowGroupMetaData>],
642        column_id: ColumnId,
643    ) -> StatValues {
644        let Some(column) = self.metadata.column_by_id(column_id) else {
645            // No such column in the SST.
646            return StatValues::NoColumn;
647        };
648        match column.semantic_type {
649            SemanticType::Tag => StatValues::NoStats,
650            SemanticType::Field => {
651                // Safety: `field_id_to_index` is initialized by the semantic type.
652                let index = self.field_id_to_index.get(&column_id).unwrap();
653                let stats = ReadFormat::column_null_counts(row_groups, *index);
654                StatValues::from_stats_opt(stats)
655            }
656            SemanticType::Timestamp => {
657                let index = self.time_index_position();
658                let stats = ReadFormat::column_null_counts(row_groups, index);
659                StatValues::from_stats_opt(stats)
660            }
661        }
662    }
663
664    /// Get fields from `record_batch`.
665    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
666        record_batch
667            .columns()
668            .iter()
669            .zip(record_batch.schema().fields())
670            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
671            .map(|(array, field)| {
672                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
673                let column = self
674                    .metadata
675                    .column_by_name(field.name())
676                    .with_context(|| InvalidRecordBatchSnafu {
677                        reason: format!("column {} not found in metadata", field.name()),
678                    })?;
679
680                Ok(BatchColumn {
681                    column_id: column.column_id,
682                    data: vector,
683                })
684            })
685            .collect()
686    }
687
688    /// Returns min/max values of specific tag.
689    fn tag_values(
690        &self,
691        row_groups: &[impl Borrow<RowGroupMetaData>],
692        column: &ColumnMetadata,
693        is_min: bool,
694    ) -> StatValues {
695        let is_first_tag = self
696            .metadata
697            .primary_key
698            .first()
699            .map(|id| *id == column.column_id)
700            .unwrap_or(false);
701        if !is_first_tag {
702            // Only the min-max of the first tag is available in the primary key.
703            return StatValues::NoStats;
704        }
705
706        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
707    }
708
709    /// Returns min/max values of the first tag.
710    /// Returns None if the tag does not have statistics.
711    fn first_tag_values(
712        &self,
713        row_groups: &[impl Borrow<RowGroupMetaData>],
714        column: &ColumnMetadata,
715        is_min: bool,
716    ) -> Option<ArrayRef> {
717        debug_assert!(
718            self.metadata
719                .primary_key
720                .first()
721                .map(|id| *id == column.column_id)
722                .unwrap_or(false)
723        );
724
725        let primary_key_encoding = self.metadata.primary_key_encoding;
726        let converter = build_primary_key_codec_with_fields(
727            primary_key_encoding,
728            [(
729                column.column_id,
730                SortField::new(column.column_schema.data_type.clone()),
731            )]
732            .into_iter(),
733        );
734
735        let values = row_groups.iter().map(|meta| {
736            let stats = meta
737                .borrow()
738                .column(self.primary_key_position())
739                .statistics()?;
740            match stats {
741                Statistics::Boolean(_) => None,
742                Statistics::Int32(_) => None,
743                Statistics::Int64(_) => None,
744                Statistics::Int96(_) => None,
745                Statistics::Float(_) => None,
746                Statistics::Double(_) => None,
747                Statistics::ByteArray(s) => {
748                    let bytes = if is_min {
749                        s.min_bytes_opt()?
750                    } else {
751                        s.max_bytes_opt()?
752                    };
753                    converter.decode_leftmost(bytes).ok()?
754                }
755                Statistics::FixedLenByteArray(_) => None,
756            }
757        });
758        let mut builder = column
759            .column_schema
760            .data_type
761            .create_mutable_vector(row_groups.len());
762        for value_opt in values {
763            match value_opt {
764                // Safety: We use the same data type to create the converter.
765                Some(v) => builder.push_value_ref(&v.as_value_ref()),
766                None => builder.push_null(),
767            }
768        }
769        let vector = builder.to_vector();
770
771        Some(vector.to_arrow_array())
772    }
773
774    /// Index in SST of the primary key.
775    fn primary_key_position(&self) -> usize {
776        self.arrow_schema.fields.len() - 3
777    }
778
779    /// Index in SST of the time index.
780    fn time_index_position(&self) -> usize {
781        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
782    }
783
784    /// Index of a field column by its column id.
785    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
786        self.field_id_to_projected_index.get(&column_id).copied()
787    }
788}
789
790/// Helper to compute the projection for the SST.
791pub(crate) struct FormatProjection {
792    /// Indices of columns to read from the SST. It contains all internal columns.
793    pub(crate) projection_indices: Vec<usize>,
794    /// Column id to their index in the projected schema (
795    /// the schema after projection).
796    ///
797    /// It doesn't contain time index column if it is not present in the projection.
798    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
799}
800
801impl FormatProjection {
802    /// Computes the projection.
803    ///
804    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
805    pub(crate) fn compute_format_projection(
806        id_to_index: &HashMap<ColumnId, usize>,
807        sst_column_num: usize,
808        column_ids: impl Iterator<Item = ColumnId>,
809    ) -> Self {
810        // Maps column id of a projected column to its index in SST.
811        // It also ignores columns not in the SST.
812        // [(column id, index in SST)]
813        let mut projected_schema: Vec<_> = column_ids
814            .filter_map(|column_id| {
815                id_to_index
816                    .get(&column_id)
817                    .copied()
818                    .map(|index| (column_id, index))
819            })
820            .collect();
821        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
822        // This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
823        projected_schema.sort_unstable_by_key(|x| x.1);
824        // Dedups the entries to avoid the case that `column_ids` has duplicated columns.
825        projected_schema.dedup_by_key(|x| x.1);
826
827        // Collects all projected indices.
828        // It contains the positions of all columns we need to read.
829        let mut projection_indices: Vec<_> = projected_schema
830            .iter()
831            .map(|(_column_id, index)| *index)
832            // We need to add all fixed position columns.
833            .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
834            .collect();
835        projection_indices.sort_unstable();
836        // Removes duplications.
837        projection_indices.dedup();
838
839        // Creates a map from column id to the index of that column in the projected record batch.
840        let column_id_to_projected_index = projected_schema
841            .into_iter()
842            .map(|(column_id, _)| column_id)
843            .enumerate()
844            .map(|(index, column_id)| (column_id, index))
845            .collect();
846
847        Self {
848            projection_indices,
849            column_id_to_projected_index,
850        }
851    }
852}
853
854/// Values of column statistics of the SST.
855///
856/// It also distinguishes the case that a column is not found and
857/// the column exists but has no statistics.
858pub enum StatValues {
859    /// Values of each row group.
860    Values(ArrayRef),
861    /// No such column.
862    NoColumn,
863    /// Column exists but has no statistics.
864    NoStats,
865}
866
867impl StatValues {
868    /// Creates a new `StatValues` instance from optional statistics.
869    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
870        match stats {
871            Some(stats) => StatValues::Values(stats),
872            None => StatValues::NoStats,
873        }
874    }
875}
876
877#[cfg(test)]
878impl PrimaryKeyReadFormat {
879    /// Creates a helper with existing `metadata` and all columns.
880    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
881        Self::new(
882            Arc::clone(&metadata),
883            metadata.column_metadatas.iter().map(|c| c.column_id),
884        )
885    }
886}
887
888/// Compute offsets of different primary keys in the array.
889pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
890    if pk_dict_array.is_empty() {
891        return Ok(Vec::new());
892    }
893
894    // Init offsets.
895    let mut offsets = vec![0];
896    let keys = pk_dict_array.keys();
897    // We know that primary keys are always not null so we iterate `keys.values()` directly.
898    let pk_indices = keys.values();
899    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
900        // Compare each key with next key
901        if *key != pk_indices[i + 1] {
902            // We meet a new key, push the next index as end of the offset.
903            offsets.push(i + 1);
904        }
905    }
906    offsets.push(keys.len());
907
908    Ok(offsets)
909}
910
911/// Gets the min/max time index of the row group from the parquet meta.
912/// It assumes the parquet is created by the mito engine.
913pub(crate) fn parquet_row_group_time_range(
914    file_meta: &FileMeta,
915    parquet_meta: &ParquetMetaData,
916    row_group_idx: usize,
917) -> Option<FileTimeRange> {
918    let row_group_meta = parquet_meta.row_group(row_group_idx);
919    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
920    assert!(
921        num_columns >= FIXED_POS_COLUMN_NUM,
922        "file only has {} columns",
923        num_columns
924    );
925    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
926
927    let stats = row_group_meta.column(time_index_pos).statistics()?;
928    // The physical type for the timestamp should be i64.
929    let (min, max) = match stats {
930        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
931        Statistics::Int32(_)
932        | Statistics::Boolean(_)
933        | Statistics::Int96(_)
934        | Statistics::Float(_)
935        | Statistics::Double(_)
936        | Statistics::ByteArray(_)
937        | Statistics::FixedLenByteArray(_) => {
938            common_telemetry::warn!(
939                "Invalid statistics {:?} for time index in parquet in {}",
940                stats,
941                file_meta.file_id
942            );
943            return None;
944        }
945    };
946
947    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
948    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
949    let unit = file_meta.time_range.0.unit();
950
951    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
952}
953
954/// Checks if sequence override is needed based on all row groups' statistics.
955/// Returns true if ALL row groups have sequence min-max values of 0.
956pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
957    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
958    if num_columns < FIXED_POS_COLUMN_NUM {
959        return false;
960    }
961
962    // The sequence column is the second-to-last column (before op_type)
963    let sequence_pos = num_columns - 2;
964
965    // Check all row groups - all must have sequence min-max of 0
966    for row_group in parquet_meta.row_groups() {
967        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
968            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
969                // If any row group doesn't have min=0 and max=0, return false
970                if *min_val != 0 || *max_val != 0 {
971                    return false;
972                }
973            } else {
974                // If any row group doesn't have statistics, return false
975                return false;
976            }
977        } else {
978            // If any row group doesn't have Int64 statistics, return false
979            return false;
980        }
981    }
982
983    // All row groups have sequence min-max of 0, or there are no row groups
984    !parquet_meta.row_groups().is_empty()
985}
986
987#[cfg(test)]
988mod tests {
989    use std::sync::Arc;
990
991    use api::v1::OpType;
992    use datatypes::arrow::array::{
993        Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
994    };
995    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
996    use datatypes::prelude::ConcreteDataType;
997    use datatypes::schema::ColumnSchema;
998    use datatypes::value::ValueRef;
999    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1000    use mito_codec::row_converter::{
1001        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1002    };
1003    use store_api::codec::PrimaryKeyEncoding;
1004    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1005    use store_api::storage::RegionId;
1006    use store_api::storage::consts::ReservedColumnId;
1007
1008    use super::*;
1009    use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1010    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1011
1012    const TEST_SEQUENCE: u64 = 1;
1013    const TEST_OP_TYPE: u8 = OpType::Put as u8;
1014
1015    fn build_test_region_metadata() -> RegionMetadataRef {
1016        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1017        builder
1018            .push_column_metadata(ColumnMetadata {
1019                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1020                semantic_type: SemanticType::Tag,
1021                column_id: 1,
1022            })
1023            .push_column_metadata(ColumnMetadata {
1024                column_schema: ColumnSchema::new(
1025                    "field1",
1026                    ConcreteDataType::int64_datatype(),
1027                    true,
1028                ),
1029                semantic_type: SemanticType::Field,
1030                column_id: 4, // We change the order of fields columns.
1031            })
1032            .push_column_metadata(ColumnMetadata {
1033                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1034                semantic_type: SemanticType::Tag,
1035                column_id: 3,
1036            })
1037            .push_column_metadata(ColumnMetadata {
1038                column_schema: ColumnSchema::new(
1039                    "field0",
1040                    ConcreteDataType::int64_datatype(),
1041                    true,
1042                ),
1043                semantic_type: SemanticType::Field,
1044                column_id: 2,
1045            })
1046            .push_column_metadata(ColumnMetadata {
1047                column_schema: ColumnSchema::new(
1048                    "ts",
1049                    ConcreteDataType::timestamp_millisecond_datatype(),
1050                    false,
1051                ),
1052                semantic_type: SemanticType::Timestamp,
1053                column_id: 5,
1054            })
1055            .primary_key(vec![1, 3]);
1056        Arc::new(builder.build().unwrap())
1057    }
1058
1059    fn build_test_arrow_schema() -> SchemaRef {
1060        let fields = vec![
1061            Field::new("field1", ArrowDataType::Int64, true),
1062            Field::new("field0", ArrowDataType::Int64, true),
1063            Field::new(
1064                "ts",
1065                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1066                false,
1067            ),
1068            Field::new(
1069                "__primary_key",
1070                ArrowDataType::Dictionary(
1071                    Box::new(ArrowDataType::UInt32),
1072                    Box::new(ArrowDataType::Binary),
1073                ),
1074                false,
1075            ),
1076            Field::new("__sequence", ArrowDataType::UInt64, false),
1077            Field::new("__op_type", ArrowDataType::UInt8, false),
1078        ];
1079        Arc::new(Schema::new(fields))
1080    }
1081
1082    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1083        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1084    }
1085
1086    fn new_batch_with_sequence(
1087        primary_key: &[u8],
1088        start_ts: i64,
1089        start_field: i64,
1090        num_rows: usize,
1091        sequence: u64,
1092    ) -> Batch {
1093        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1094        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1095        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1096        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1097        let fields = vec![
1098            BatchColumn {
1099                column_id: 4,
1100                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1101            }, // field1
1102            BatchColumn {
1103                column_id: 2,
1104                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1105            }, // field0
1106        ];
1107
1108        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1109            .with_fields(fields)
1110            .build()
1111            .unwrap()
1112    }
1113
1114    #[test]
1115    fn test_to_sst_arrow_schema() {
1116        let metadata = build_test_region_metadata();
1117        let write_format = PrimaryKeyWriteFormat::new(metadata);
1118        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1119    }
1120
1121    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1122        let values = Arc::new(BinaryArray::from_iter_values(
1123            pk_row_nums.iter().map(|v| &v.0),
1124        ));
1125        let mut keys = vec![];
1126        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1127            keys.extend(std::iter::repeat_n(index as u32, num_rows));
1128        }
1129        let keys = UInt32Array::from(keys);
1130        Arc::new(DictionaryArray::new(keys, values))
1131    }
1132
1133    #[test]
1134    fn test_projection_indices() {
1135        let metadata = build_test_region_metadata();
1136        // Only read tag1
1137        let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1138        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1139        // Only read field1
1140        let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1141        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1142        // Only read ts
1143        let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1144        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1145        // Read field0, tag0, ts
1146        let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1147        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1148    }
1149
1150    #[test]
1151    fn test_empty_primary_key_offsets() {
1152        let array = build_test_pk_array(&[]);
1153        assert!(primary_key_offsets(&array).unwrap().is_empty());
1154    }
1155
1156    #[test]
1157    fn test_primary_key_offsets_one_series() {
1158        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1159        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1160
1161        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1162        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1163
1164        let array = build_test_pk_array(&[
1165            (b"one".to_vec(), 1),
1166            (b"two".to_vec(), 1),
1167            (b"three".to_vec(), 1),
1168        ]);
1169        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1170    }
1171
1172    #[test]
1173    fn test_primary_key_offsets_multi_series() {
1174        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1175        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1176
1177        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1178        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1179
1180        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1181        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1182    }
1183
1184    #[test]
1185    fn test_convert_empty_record_batch() {
1186        let metadata = build_test_region_metadata();
1187        let arrow_schema = build_test_arrow_schema();
1188        let column_ids: Vec<_> = metadata
1189            .column_metadatas
1190            .iter()
1191            .map(|col| col.column_id)
1192            .collect();
1193        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1194        assert_eq!(arrow_schema, *read_format.arrow_schema());
1195
1196        let record_batch = RecordBatch::new_empty(arrow_schema);
1197        let mut batches = VecDeque::new();
1198        read_format
1199            .convert_record_batch(&record_batch, None, &mut batches)
1200            .unwrap();
1201        assert!(batches.is_empty());
1202    }
1203
1204    #[test]
1205    fn test_convert_record_batch() {
1206        let metadata = build_test_region_metadata();
1207        let column_ids: Vec<_> = metadata
1208            .column_metadatas
1209            .iter()
1210            .map(|col| col.column_id)
1211            .collect();
1212        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1213
1214        let columns: Vec<ArrayRef> = vec![
1215            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1216            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1217            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1218            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1219            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1220            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1221        ];
1222        let arrow_schema = build_test_arrow_schema();
1223        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1224        let mut batches = VecDeque::new();
1225        read_format
1226            .convert_record_batch(&record_batch, None, &mut batches)
1227            .unwrap();
1228
1229        assert_eq!(
1230            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1231            batches.into_iter().collect::<Vec<_>>(),
1232        );
1233    }
1234
1235    #[test]
1236    fn test_convert_record_batch_with_override_sequence() {
1237        let metadata = build_test_region_metadata();
1238        let column_ids: Vec<_> = metadata
1239            .column_metadatas
1240            .iter()
1241            .map(|col| col.column_id)
1242            .collect();
1243        let read_format =
1244            ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1245
1246        let columns: Vec<ArrayRef> = vec![
1247            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1248            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1249            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1250            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1251            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1252            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1253        ];
1254        let arrow_schema = build_test_arrow_schema();
1255        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1256
1257        // Create override sequence array with custom values
1258        let override_sequence: u64 = 12345;
1259        let override_sequence_array: ArrayRef =
1260            Arc::new(UInt64Array::from_value(override_sequence, 4));
1261
1262        let mut batches = VecDeque::new();
1263        read_format
1264            .as_primary_key()
1265            .unwrap()
1266            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1267            .unwrap();
1268
1269        // Create expected batches with override sequence
1270        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1271        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1272
1273        assert_eq!(
1274            vec![expected_batch1, expected_batch2],
1275            batches.into_iter().collect::<Vec<_>>(),
1276        );
1277    }
1278
1279    fn build_test_flat_sst_schema() -> SchemaRef {
1280        let fields = vec![
1281            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1282            Field::new("tag1", ArrowDataType::Int64, true),
1283            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1284            Field::new("field0", ArrowDataType::Int64, true),
1285            Field::new(
1286                "ts",
1287                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1288                false,
1289            ),
1290            Field::new(
1291                "__primary_key",
1292                ArrowDataType::Dictionary(
1293                    Box::new(ArrowDataType::UInt32),
1294                    Box::new(ArrowDataType::Binary),
1295                ),
1296                false,
1297            ),
1298            Field::new("__sequence", ArrowDataType::UInt64, false),
1299            Field::new("__op_type", ArrowDataType::UInt8, false),
1300        ];
1301        Arc::new(Schema::new(fields))
1302    }
1303
1304    #[test]
1305    fn test_flat_to_sst_arrow_schema() {
1306        let metadata = build_test_region_metadata();
1307        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1308        assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1309    }
1310
1311    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1312        vec![
1313            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1314            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1315            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1316            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1317            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1318            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1319            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1320            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1321        ]
1322    }
1323
1324    #[test]
1325    fn test_flat_convert_batch() {
1326        let metadata = build_test_region_metadata();
1327        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1328
1329        let num_rows = 4;
1330        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1331        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1332        let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1333
1334        let actual = format.convert_batch(&batch).unwrap();
1335        assert_eq!(expect_record, actual);
1336    }
1337
1338    #[test]
1339    fn test_flat_convert_with_override_sequence() {
1340        let metadata = build_test_region_metadata();
1341        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1342            .with_override_sequence(Some(415411));
1343
1344        let num_rows = 4;
1345        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1346        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1347
1348        let expected_columns: Vec<ArrayRef> = vec![
1349            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1350            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1351            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1352            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1353            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1354            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1355            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1356            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1357        ];
1358        let expected_record =
1359            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1360
1361        let actual = format.convert_batch(&batch).unwrap();
1362        assert_eq!(expected_record, actual);
1363    }
1364
1365    #[test]
1366    fn test_flat_projection_indices() {
1367        let metadata = build_test_region_metadata();
1368        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1369        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1370
1371        // Only read tag1 (column_id=3, index=1) + fixed columns
1372        let read_format =
1373            ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1374                .unwrap();
1375        assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1376
1377        // Only read field1 (column_id=4, index=2) + fixed columns
1378        let read_format =
1379            ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1380                .unwrap();
1381        assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1382
1383        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1384        let read_format =
1385            ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1386                .unwrap();
1387        assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1388
1389        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1390        let read_format =
1391            ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1392        assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1393    }
1394
1395    #[test]
1396    fn test_flat_read_format_convert_batch() {
1397        let metadata = build_test_region_metadata();
1398        let mut format = FlatReadFormat::new(
1399            metadata,
1400            std::iter::once(1), // Just read tag0
1401            Some(8),
1402            "test",
1403            false,
1404        )
1405        .unwrap();
1406
1407        let num_rows = 4;
1408        let original_sequence = 100u64;
1409        let override_sequence = 200u64;
1410
1411        // Create a test record batch
1412        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1413        let mut test_columns = columns.clone();
1414        // Replace sequence column with original sequence values
1415        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1416        let record_batch =
1417            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1418
1419        // Test without override sequence - should return clone
1420        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1421        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1422        let sequence_array = sequence_column
1423            .as_any()
1424            .downcast_ref::<UInt64Array>()
1425            .unwrap();
1426
1427        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1428        assert_eq!(sequence_array, &expected_original);
1429
1430        // Set override sequence and test with new_override_sequence_array
1431        format.set_override_sequence(Some(override_sequence));
1432        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1433        let result = format
1434            .convert_batch(record_batch, Some(&override_sequence_array))
1435            .unwrap();
1436        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1437        let sequence_array = sequence_column
1438            .as_any()
1439            .downcast_ref::<UInt64Array>()
1440            .unwrap();
1441
1442        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1443        assert_eq!(sequence_array, &expected_override);
1444    }
1445
1446    #[test]
1447    fn test_need_convert_to_flat() {
1448        let metadata = build_test_region_metadata();
1449
1450        // Test case 1: Same number of columns, no conversion needed
1451        // For flat format: all columns (5) + internal columns (3)
1452        let expected_columns = metadata.column_metadatas.len() + 3;
1453        let result =
1454            FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1455        assert!(
1456            !result,
1457            "Should not need conversion when column counts match"
1458        );
1459
1460        // Test case 2: Different number of columns, need conversion
1461        // Missing primary key columns (2 primary keys in test metadata)
1462        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1463        let result =
1464            FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1465                .unwrap();
1466        assert!(
1467            result,
1468            "Should need conversion when primary key columns are missing"
1469        );
1470
1471        // Test case 3: Invalid case - actual columns more than expected
1472        let too_many_columns = expected_columns + 1;
1473        let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1474            .unwrap_err();
1475        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1476
1477        // Test case 4: Invalid case - column difference doesn't match primary key count
1478        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1479        let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1480            .unwrap_err();
1481        assert!(
1482            err.to_string().contains("Column number difference"),
1483            "{err:?}"
1484        );
1485    }
1486
1487    fn build_test_dense_pk_array(
1488        codec: &DensePrimaryKeyCodec,
1489        pk_values_per_row: &[&[Option<i64>]],
1490    ) -> Arc<PrimaryKeyArray> {
1491        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1492
1493        for pk_values_row in pk_values_per_row {
1494            let values: Vec<ValueRef> = pk_values_row
1495                .iter()
1496                .map(|opt| match opt {
1497                    Some(val) => ValueRef::Int64(*val),
1498                    None => ValueRef::Null,
1499                })
1500                .collect();
1501
1502            let encoded = codec.encode(values.into_iter()).unwrap();
1503            builder.append_value(&encoded);
1504        }
1505
1506        Arc::new(builder.finish())
1507    }
1508
1509    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1510        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1511        builder
1512            .push_column_metadata(ColumnMetadata {
1513                column_schema: ColumnSchema::new(
1514                    "__table_id",
1515                    ConcreteDataType::uint32_datatype(),
1516                    false,
1517                ),
1518                semantic_type: SemanticType::Tag,
1519                column_id: ReservedColumnId::table_id(),
1520            })
1521            .push_column_metadata(ColumnMetadata {
1522                column_schema: ColumnSchema::new(
1523                    "__tsid",
1524                    ConcreteDataType::uint64_datatype(),
1525                    false,
1526                ),
1527                semantic_type: SemanticType::Tag,
1528                column_id: ReservedColumnId::tsid(),
1529            })
1530            .push_column_metadata(ColumnMetadata {
1531                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1532                semantic_type: SemanticType::Tag,
1533                column_id: 1,
1534            })
1535            .push_column_metadata(ColumnMetadata {
1536                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1537                semantic_type: SemanticType::Tag,
1538                column_id: 3,
1539            })
1540            .push_column_metadata(ColumnMetadata {
1541                column_schema: ColumnSchema::new(
1542                    "field1",
1543                    ConcreteDataType::int64_datatype(),
1544                    true,
1545                ),
1546                semantic_type: SemanticType::Field,
1547                column_id: 4,
1548            })
1549            .push_column_metadata(ColumnMetadata {
1550                column_schema: ColumnSchema::new(
1551                    "field0",
1552                    ConcreteDataType::int64_datatype(),
1553                    true,
1554                ),
1555                semantic_type: SemanticType::Field,
1556                column_id: 2,
1557            })
1558            .push_column_metadata(ColumnMetadata {
1559                column_schema: ColumnSchema::new(
1560                    "ts",
1561                    ConcreteDataType::timestamp_millisecond_datatype(),
1562                    false,
1563                ),
1564                semantic_type: SemanticType::Timestamp,
1565                column_id: 5,
1566            })
1567            .primary_key(vec![
1568                ReservedColumnId::table_id(),
1569                ReservedColumnId::tsid(),
1570                1,
1571                3,
1572            ])
1573            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1574        Arc::new(builder.build().unwrap())
1575    }
1576
1577    fn build_test_sparse_pk_array(
1578        codec: &SparsePrimaryKeyCodec,
1579        pk_values_per_row: &[SparseTestRow],
1580    ) -> Arc<PrimaryKeyArray> {
1581        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1582        for row in pk_values_per_row {
1583            let values = vec![
1584                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1585                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1586                (1, ValueRef::String(&row.tag0)),
1587                (3, ValueRef::String(&row.tag1)),
1588            ];
1589
1590            let mut buffer = Vec::new();
1591            codec.encode_value_refs(&values, &mut buffer).unwrap();
1592            builder.append_value(&buffer);
1593        }
1594
1595        Arc::new(builder.finish())
1596    }
1597
1598    #[derive(Clone)]
1599    struct SparseTestRow {
1600        table_id: u32,
1601        tsid: u64,
1602        tag0: String,
1603        tag1: String,
1604    }
1605
1606    #[test]
1607    fn test_flat_read_format_convert_format_with_dense_encoding() {
1608        let metadata = build_test_region_metadata();
1609
1610        let column_ids: Vec<_> = metadata
1611            .column_metadatas
1612            .iter()
1613            .map(|c| c.column_id)
1614            .collect();
1615        let format = FlatReadFormat::new(
1616            metadata.clone(),
1617            column_ids.into_iter(),
1618            Some(6),
1619            "test",
1620            false,
1621        )
1622        .unwrap();
1623
1624        let num_rows = 4;
1625        let original_sequence = 100u64;
1626
1627        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1628        let pk_values_per_row = vec![
1629                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1630            ];
1631
1632        // Create a test record batch in old format using dense encoding
1633        let codec = DensePrimaryKeyCodec::new(&metadata);
1634        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1635        let columns: Vec<ArrayRef> = vec![
1636            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1637            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1638            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1639            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1640            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1641            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1642        ];
1643
1644        // Create schema for old format (without primary key columns)
1645        let old_format_fields = vec![
1646            Field::new("field1", ArrowDataType::Int64, true),
1647            Field::new("field0", ArrowDataType::Int64, true),
1648            Field::new(
1649                "ts",
1650                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1651                false,
1652            ),
1653            Field::new(
1654                "__primary_key",
1655                ArrowDataType::Dictionary(
1656                    Box::new(ArrowDataType::UInt32),
1657                    Box::new(ArrowDataType::Binary),
1658                ),
1659                false,
1660            ),
1661            Field::new("__sequence", ArrowDataType::UInt64, false),
1662            Field::new("__op_type", ArrowDataType::UInt8, false),
1663        ];
1664        let old_schema = Arc::new(Schema::new(old_format_fields));
1665        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1666
1667        // Test conversion with dense encoding
1668        let result = format.convert_batch(record_batch, None).unwrap();
1669
1670        // Construct expected RecordBatch in flat format with decoded primary key columns
1671        let expected_columns: Vec<ArrayRef> = vec![
1672            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1673            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1674            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1675            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1676            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1677            dense_pk_array,                                // __primary_key (preserved)
1678            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1679            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1680        ];
1681        let expected_record_batch =
1682            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1683
1684        // Compare the actual result with the expected record batch
1685        assert_eq!(expected_record_batch, result);
1686    }
1687
1688    #[test]
1689    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1690        let metadata = build_test_sparse_region_metadata();
1691
1692        let column_ids: Vec<_> = metadata
1693            .column_metadatas
1694            .iter()
1695            .map(|c| c.column_id)
1696            .collect();
1697        let format = FlatReadFormat::new(
1698            metadata.clone(),
1699            column_ids.clone().into_iter(),
1700            None,
1701            "test",
1702            false,
1703        )
1704        .unwrap();
1705
1706        let num_rows = 4;
1707        let original_sequence = 100u64;
1708
1709        // Create sparse test data with table_id, tsid and string tags
1710        let pk_test_rows = vec![
1711            SparseTestRow {
1712                table_id: 1,
1713                tsid: 123,
1714                tag0: "frontend".to_string(),
1715                tag1: "pod1".to_string(),
1716            };
1717            num_rows
1718        ];
1719
1720        let codec = SparsePrimaryKeyCodec::new(&metadata);
1721        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1722        // Create a test record batch in old format using sparse encoding
1723        let columns: Vec<ArrayRef> = vec![
1724            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1725            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1726            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1727            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1728            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1729            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1730        ];
1731
1732        // Create schema for old format (without primary key columns)
1733        let old_format_fields = vec![
1734            Field::new("field1", ArrowDataType::Int64, true),
1735            Field::new("field0", ArrowDataType::Int64, true),
1736            Field::new(
1737                "ts",
1738                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1739                false,
1740            ),
1741            Field::new(
1742                "__primary_key",
1743                ArrowDataType::Dictionary(
1744                    Box::new(ArrowDataType::UInt32),
1745                    Box::new(ArrowDataType::Binary),
1746                ),
1747                false,
1748            ),
1749            Field::new("__sequence", ArrowDataType::UInt64, false),
1750            Field::new("__op_type", ArrowDataType::UInt8, false),
1751        ];
1752        let old_schema = Arc::new(Schema::new(old_format_fields));
1753        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1754
1755        // Test conversion with sparse encoding
1756        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1757
1758        // Construct expected RecordBatch in flat format with decoded primary key columns
1759        let tag0_array = Arc::new(DictionaryArray::new(
1760            UInt32Array::from(vec![0; num_rows]),
1761            Arc::new(StringArray::from(vec!["frontend"])),
1762        ));
1763        let tag1_array = Arc::new(DictionaryArray::new(
1764            UInt32Array::from(vec![0; num_rows]),
1765            Arc::new(StringArray::from(vec!["pod1"])),
1766        ));
1767        let expected_columns: Vec<ArrayRef> = vec![
1768            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1769            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1770            tag0_array,                                     // tag0 (decoded from primary key)
1771            tag1_array,                                     // tag1 (decoded from primary key)
1772            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1773            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1774            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1775            sparse_pk_array,                                // __primary_key (preserved)
1776            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1777            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1778        ];
1779        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1780        let expected_record_batch =
1781            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1782
1783        // Compare the actual result with the expected record batch
1784        assert_eq!(expected_record_batch, result);
1785
1786        let format =
1787            FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1788                .unwrap();
1789        // Test conversion with sparse encoding and skip convert.
1790        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1791        assert_eq!(record_batch, result);
1792    }
1793
1794    #[test]
1795    fn test_convert_flat_batch() {
1796        let metadata = build_test_region_metadata();
1797        let write_format = PrimaryKeyWriteFormat::new(metadata);
1798
1799        let num_rows = 4;
1800        // Build a flat record batch: tag0, tag1, field1, field0, ts, __primary_key, __sequence, __op_type
1801        let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1802        let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1803
1804        // num_fields = 2 (field1, field0)
1805        let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1806
1807        // Expected: tag columns stripped, only field1, field0, ts, __primary_key, __sequence, __op_type
1808        let expected_columns: Vec<ArrayRef> = vec![
1809            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1810            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1811            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1812            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1813            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
1814            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1815        ];
1816        let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1817
1818        assert_eq!(expected, result);
1819    }
1820
1821    #[test]
1822    fn test_convert_flat_batch_with_override_sequence() {
1823        let metadata = build_test_region_metadata();
1824        let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
1825
1826        let num_rows = 4;
1827        let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1828        let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1829
1830        let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1831
1832        let expected_columns: Vec<ArrayRef> = vec![
1833            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1834            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1835            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1836            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1837            Arc::new(UInt64Array::from(vec![999; num_rows])), // overridden __sequence
1838            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1839        ];
1840        let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1841
1842        assert_eq!(expected, result);
1843    }
1844
1845    #[test]
1846    fn test_convert_flat_batch_no_tags() {
1847        // Test with a region that has no primary key columns (no tags to strip).
1848        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1849        builder
1850            .push_column_metadata(ColumnMetadata {
1851                column_schema: ColumnSchema::new(
1852                    "field0",
1853                    ConcreteDataType::int64_datatype(),
1854                    true,
1855                ),
1856                semantic_type: SemanticType::Field,
1857                column_id: 1,
1858            })
1859            .push_column_metadata(ColumnMetadata {
1860                column_schema: ColumnSchema::new(
1861                    "ts",
1862                    ConcreteDataType::timestamp_millisecond_datatype(),
1863                    false,
1864                ),
1865                semantic_type: SemanticType::Timestamp,
1866                column_id: 2,
1867            });
1868        let metadata = Arc::new(builder.build().unwrap());
1869        let write_format = PrimaryKeyWriteFormat::new(metadata);
1870
1871        let num_rows = 3;
1872        // No tag columns, so flat batch is: field0, ts, __primary_key, __sequence, __op_type
1873        let sst_schema = write_format.arrow_schema().clone();
1874        let columns: Vec<ArrayRef> = vec![
1875            Arc::new(Int64Array::from(vec![10; num_rows])), // field0
1876            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), // ts
1877            build_test_pk_array(&[(b"".to_vec(), num_rows)]), // __primary_key
1878            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // __sequence
1879            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // __op_type
1880        ];
1881        let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
1882
1883        // num_fields = 1, num_tag_columns = 5 - 1 - 4 = 0, so nothing is stripped
1884        let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
1885        let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
1886
1887        assert_eq!(expected, result);
1888    }
1889}