mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37    ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields};
44use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
45use parquet::file::statistics::Statistics;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51    ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
52};
53use crate::read::{Batch, BatchBuilder, BatchColumn};
54use crate::sst::file::{FileMeta, FileTimeRange};
55use crate::sst::parquet::flat_format::FlatReadFormat;
56use crate::sst::to_sst_arrow_schema;
57
58/// Arrow array type for the primary key dictionary.
59pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
60/// Builder type for primary key dictionary array.
61pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
62
63/// Number of columns that have fixed positions.
64///
65/// Contains: time index and internal columns.
66pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
67/// Number of internal columns.
68pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
69
70/// Helper for writing the SST format with primary key.
71pub(crate) struct PrimaryKeyWriteFormat {
72    metadata: RegionMetadataRef,
73    /// SST file schema.
74    arrow_schema: SchemaRef,
75    override_sequence: Option<SequenceNumber>,
76}
77
78impl PrimaryKeyWriteFormat {
79    /// Creates a new helper.
80    pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
81        let arrow_schema = to_sst_arrow_schema(&metadata);
82        PrimaryKeyWriteFormat {
83            metadata,
84            arrow_schema,
85            override_sequence: None,
86        }
87    }
88
89    /// Set override sequence.
90    pub(crate) fn with_override_sequence(
91        mut self,
92        override_sequence: Option<SequenceNumber>,
93    ) -> Self {
94        self.override_sequence = override_sequence;
95        self
96    }
97
98    /// Gets the arrow schema to store in parquet.
99    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
100        &self.arrow_schema
101    }
102
103    /// Convert `batch` to a arrow record batch to store in parquet.
104    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
105        debug_assert_eq!(
106            batch.fields().len() + FIXED_POS_COLUMN_NUM,
107            self.arrow_schema.fields().len()
108        );
109        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
110        // Store all fields first.
111        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
112            ensure!(
113                column.column_id == column_metadata.column_id,
114                InvalidBatchSnafu {
115                    reason: format!(
116                        "Batch has column {} but metadata has column {}",
117                        column.column_id, column_metadata.column_id
118                    ),
119                }
120            );
121
122            columns.push(column.data.to_arrow_array());
123        }
124        // Add time index column.
125        columns.push(batch.timestamps().to_arrow_array());
126        // Add internal columns: primary key, sequences, op types.
127        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
128
129        if let Some(override_sequence) = self.override_sequence {
130            let sequence_array =
131                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
132            columns.push(sequence_array);
133        } else {
134            columns.push(batch.sequences().to_arrow_array());
135        }
136        columns.push(batch.op_types().to_arrow_array());
137
138        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
139    }
140}
141
142/// Helper to read parquet formats.
143pub enum ReadFormat {
144    /// The parquet is in the old primary key format.
145    PrimaryKey(PrimaryKeyReadFormat),
146    /// The parquet is in the new flat format.
147    Flat(FlatReadFormat),
148}
149
150impl ReadFormat {
151    /// Creates a helper to read the primary key format.
152    pub fn new_primary_key(
153        metadata: RegionMetadataRef,
154        column_ids: impl Iterator<Item = ColumnId>,
155    ) -> Self {
156        ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
157    }
158
159    /// Creates a helper to read the flat format.
160    pub fn new_flat(
161        metadata: RegionMetadataRef,
162        column_ids: impl Iterator<Item = ColumnId>,
163        num_columns: Option<usize>,
164        file_path: &str,
165        skip_auto_convert: bool,
166    ) -> Result<Self> {
167        Ok(ReadFormat::Flat(FlatReadFormat::new(
168            metadata,
169            column_ids,
170            num_columns,
171            file_path,
172            skip_auto_convert,
173        )?))
174    }
175
176    /// Creates a new read format.
177    pub fn new(
178        region_metadata: RegionMetadataRef,
179        projection: Option<&[ColumnId]>,
180        flat_format: bool,
181        num_columns: Option<usize>,
182        file_path: &str,
183        skip_auto_convert: bool,
184    ) -> Result<ReadFormat> {
185        if flat_format {
186            if let Some(column_ids) = projection {
187                ReadFormat::new_flat(
188                    region_metadata,
189                    column_ids.iter().copied(),
190                    num_columns,
191                    file_path,
192                    skip_auto_convert,
193                )
194            } else {
195                // No projection, lists all column ids to read.
196                ReadFormat::new_flat(
197                    region_metadata.clone(),
198                    region_metadata
199                        .column_metadatas
200                        .iter()
201                        .map(|col| col.column_id),
202                    num_columns,
203                    file_path,
204                    skip_auto_convert,
205                )
206            }
207        } else if let Some(column_ids) = projection {
208            Ok(ReadFormat::new_primary_key(
209                region_metadata,
210                column_ids.iter().copied(),
211            ))
212        } else {
213            // No projection, lists all column ids to read.
214            Ok(ReadFormat::new_primary_key(
215                region_metadata.clone(),
216                region_metadata
217                    .column_metadatas
218                    .iter()
219                    .map(|col| col.column_id),
220            ))
221        }
222    }
223
224    pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
225        match self {
226            ReadFormat::PrimaryKey(format) => Some(format),
227            _ => None,
228        }
229    }
230
231    pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
232        match self {
233            ReadFormat::Flat(format) => Some(format),
234            _ => None,
235        }
236    }
237
238    /// Gets the arrow schema of the SST file.
239    ///
240    /// This schema is computed from the region metadata but should be the same
241    /// as the arrow schema decoded from the file metadata.
242    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
243        match self {
244            ReadFormat::PrimaryKey(format) => format.arrow_schema(),
245            ReadFormat::Flat(format) => format.arrow_schema(),
246        }
247    }
248
249    /// Gets the metadata of the SST.
250    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
251        match self {
252            ReadFormat::PrimaryKey(format) => format.metadata(),
253            ReadFormat::Flat(format) => format.metadata(),
254        }
255    }
256
257    /// Gets sorted projection indices to read.
258    pub(crate) fn projection_indices(&self) -> &[usize] {
259        match self {
260            ReadFormat::PrimaryKey(format) => format.projection_indices(),
261            ReadFormat::Flat(format) => format.projection_indices(),
262        }
263    }
264
265    /// Returns min values of specific column in row groups.
266    pub fn min_values(
267        &self,
268        row_groups: &[impl Borrow<RowGroupMetaData>],
269        column_id: ColumnId,
270    ) -> StatValues {
271        match self {
272            ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
273            ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
274        }
275    }
276
277    /// Returns max values of specific column in row groups.
278    pub fn max_values(
279        &self,
280        row_groups: &[impl Borrow<RowGroupMetaData>],
281        column_id: ColumnId,
282    ) -> StatValues {
283        match self {
284            ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
285            ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
286        }
287    }
288
289    /// Returns null counts of specific column in row groups.
290    pub fn null_counts(
291        &self,
292        row_groups: &[impl Borrow<RowGroupMetaData>],
293        column_id: ColumnId,
294    ) -> StatValues {
295        match self {
296            ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
297            ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
298        }
299    }
300
301    /// Returns min/max values of specific columns.
302    /// Returns None if the column does not have statistics.
303    /// The column should not be encoded as a part of a primary key.
304    pub(crate) fn column_values(
305        row_groups: &[impl Borrow<RowGroupMetaData>],
306        column: &ColumnMetadata,
307        column_index: usize,
308        is_min: bool,
309    ) -> Option<ArrayRef> {
310        let null_scalar: ScalarValue = column
311            .column_schema
312            .data_type
313            .as_arrow_type()
314            .try_into()
315            .ok()?;
316        let scalar_values = row_groups
317            .iter()
318            .map(|meta| {
319                let stats = meta.borrow().column(column_index).statistics()?;
320                match stats {
321                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
322                        *s.min_opt()?
323                    } else {
324                        *s.max_opt()?
325                    }))),
326                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
327                        *s.min_opt()?
328                    } else {
329                        *s.max_opt()?
330                    }))),
331                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
332                        *s.min_opt()?
333                    } else {
334                        *s.max_opt()?
335                    }))),
336
337                    Statistics::Int96(_) => None,
338                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
339                        *s.min_opt()?
340                    } else {
341                        *s.max_opt()?
342                    }))),
343                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
344                        *s.min_opt()?
345                    } else {
346                        *s.max_opt()?
347                    }))),
348                    Statistics::ByteArray(s) => {
349                        let bytes = if is_min {
350                            s.min_bytes_opt()?
351                        } else {
352                            s.max_bytes_opt()?
353                        };
354                        let s = String::from_utf8(bytes.to_vec()).ok();
355                        Some(ScalarValue::Utf8(s))
356                    }
357
358                    Statistics::FixedLenByteArray(_) => None,
359                }
360            })
361            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
362            .collect::<Vec<ScalarValue>>();
363        debug_assert_eq!(scalar_values.len(), row_groups.len());
364        ScalarValue::iter_to_array(scalar_values).ok()
365    }
366
367    /// Returns null counts of specific columns.
368    /// The column should not be encoded as a part of a primary key.
369    pub(crate) fn column_null_counts(
370        row_groups: &[impl Borrow<RowGroupMetaData>],
371        column_index: usize,
372    ) -> Option<ArrayRef> {
373        let values = row_groups.iter().map(|meta| {
374            let col = meta.borrow().column(column_index);
375            let stat = col.statistics()?;
376            stat.null_count_opt()
377        });
378        Some(Arc::new(UInt64Array::from_iter(values)))
379    }
380
381    /// Sets the sequence number to override.
382    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
383        match self {
384            ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
385            ReadFormat::Flat(format) => format.set_override_sequence(sequence),
386        }
387    }
388
389    /// Creates a sequence array to override.
390    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
391        match self {
392            ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
393            ReadFormat::Flat(format) => format.new_override_sequence_array(length),
394        }
395    }
396}
397
398/// Helper for reading the SST format.
399pub struct PrimaryKeyReadFormat {
400    /// The metadata stored in the SST.
401    metadata: RegionMetadataRef,
402    /// SST file schema.
403    arrow_schema: SchemaRef,
404    /// Field column id to its index in `schema` (SST schema).
405    /// In SST schema, fields are stored in the front of the schema.
406    field_id_to_index: HashMap<ColumnId, usize>,
407    /// Indices of columns to read from the SST. It contains all internal columns.
408    projection_indices: Vec<usize>,
409    /// Field column id to their index in the projected schema (
410    /// the schema of [Batch]).
411    field_id_to_projected_index: HashMap<ColumnId, usize>,
412    /// Sequence number to override the sequence read from the SST.
413    override_sequence: Option<SequenceNumber>,
414}
415
416impl PrimaryKeyReadFormat {
417    /// Creates a helper with existing `metadata` and `column_ids` to read.
418    pub fn new(
419        metadata: RegionMetadataRef,
420        column_ids: impl Iterator<Item = ColumnId>,
421    ) -> PrimaryKeyReadFormat {
422        let field_id_to_index: HashMap<_, _> = metadata
423            .field_columns()
424            .enumerate()
425            .map(|(index, column)| (column.column_id, index))
426            .collect();
427        let arrow_schema = to_sst_arrow_schema(&metadata);
428
429        let format_projection = FormatProjection::compute_format_projection(
430            &field_id_to_index,
431            arrow_schema.fields.len(),
432            column_ids,
433        );
434
435        PrimaryKeyReadFormat {
436            metadata,
437            arrow_schema,
438            field_id_to_index,
439            projection_indices: format_projection.projection_indices,
440            field_id_to_projected_index: format_projection.column_id_to_projected_index,
441            override_sequence: None,
442        }
443    }
444
445    /// Sets the sequence number to override.
446    pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
447        self.override_sequence = sequence;
448    }
449
450    /// Gets the arrow schema of the SST file.
451    ///
452    /// This schema is computed from the region metadata but should be the same
453    /// as the arrow schema decoded from the file metadata.
454    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
455        &self.arrow_schema
456    }
457
458    /// Gets the metadata of the SST.
459    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
460        &self.metadata
461    }
462
463    /// Gets sorted projection indices to read.
464    pub(crate) fn projection_indices(&self) -> &[usize] {
465        &self.projection_indices
466    }
467
468    /// Creates a sequence array to override.
469    pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
470        self.override_sequence
471            .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
472    }
473
474    /// Convert a arrow record batch into `batches`.
475    ///
476    /// The length of `override_sequence_array` must be larger than the length of the record batch.
477    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
478    pub fn convert_record_batch(
479        &self,
480        record_batch: &RecordBatch,
481        override_sequence_array: Option<&ArrayRef>,
482        batches: &mut VecDeque<Batch>,
483    ) -> Result<()> {
484        debug_assert!(batches.is_empty());
485
486        // The record batch must has time index and internal columns.
487        ensure!(
488            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
489            InvalidRecordBatchSnafu {
490                reason: format!(
491                    "record batch only has {} columns",
492                    record_batch.num_columns()
493                ),
494            }
495        );
496
497        let mut fixed_pos_columns = record_batch
498            .columns()
499            .iter()
500            .rev()
501            .take(FIXED_POS_COLUMN_NUM);
502        // Safety: We have checked the column number.
503        let op_type_array = fixed_pos_columns.next().unwrap();
504        let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
505        let pk_array = fixed_pos_columns.next().unwrap();
506        let ts_array = fixed_pos_columns.next().unwrap();
507        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
508
509        // Override sequence array if provided.
510        if let Some(override_array) = override_sequence_array {
511            assert!(override_array.len() >= sequence_array.len());
512            // It's fine to assign the override array directly, but we slice it to make
513            // sure it matches the length of the original sequence array.
514            sequence_array = if override_array.len() > sequence_array.len() {
515                override_array.slice(0, sequence_array.len())
516            } else {
517                override_array.clone()
518            };
519        }
520
521        // Compute primary key offsets.
522        let pk_dict_array = pk_array
523            .as_any()
524            .downcast_ref::<PrimaryKeyArray>()
525            .with_context(|| InvalidRecordBatchSnafu {
526                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
527            })?;
528        let offsets = primary_key_offsets(pk_dict_array)?;
529        if offsets.is_empty() {
530            return Ok(());
531        }
532
533        // Split record batch according to pk offsets.
534        let keys = pk_dict_array.keys();
535        let pk_values = pk_dict_array
536            .values()
537            .as_any()
538            .downcast_ref::<BinaryArray>()
539            .with_context(|| InvalidRecordBatchSnafu {
540                reason: format!(
541                    "values of primary key array should not be {:?}",
542                    pk_dict_array.values().data_type()
543                ),
544            })?;
545        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
546            let end = offsets[i + 1];
547            let rows_in_batch = end - start;
548            let dict_key = keys.value(*start);
549            let primary_key = pk_values.value(dict_key as usize).to_vec();
550
551            let mut builder = BatchBuilder::new(primary_key);
552            builder
553                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
554                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
555                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
556            // Push all fields
557            for batch_column in &field_batch_columns {
558                builder.push_field(BatchColumn {
559                    column_id: batch_column.column_id,
560                    data: batch_column.data.slice(*start, rows_in_batch),
561                });
562            }
563
564            let batch = builder.build()?;
565            batches.push_back(batch);
566        }
567
568        Ok(())
569    }
570
571    /// Returns min values of specific column in row groups.
572    pub fn min_values(
573        &self,
574        row_groups: &[impl Borrow<RowGroupMetaData>],
575        column_id: ColumnId,
576    ) -> StatValues {
577        let Some(column) = self.metadata.column_by_id(column_id) else {
578            // No such column in the SST.
579            return StatValues::NoColumn;
580        };
581        match column.semantic_type {
582            SemanticType::Tag => self.tag_values(row_groups, column, true),
583            SemanticType::Field => {
584                // Safety: `field_id_to_index` is initialized by the semantic type.
585                let index = self.field_id_to_index.get(&column_id).unwrap();
586                let stats = ReadFormat::column_values(row_groups, column, *index, true);
587                StatValues::from_stats_opt(stats)
588            }
589            SemanticType::Timestamp => {
590                let index = self.time_index_position();
591                let stats = ReadFormat::column_values(row_groups, column, index, true);
592                StatValues::from_stats_opt(stats)
593            }
594        }
595    }
596
597    /// Returns max values of specific column in row groups.
598    pub fn max_values(
599        &self,
600        row_groups: &[impl Borrow<RowGroupMetaData>],
601        column_id: ColumnId,
602    ) -> StatValues {
603        let Some(column) = self.metadata.column_by_id(column_id) else {
604            // No such column in the SST.
605            return StatValues::NoColumn;
606        };
607        match column.semantic_type {
608            SemanticType::Tag => self.tag_values(row_groups, column, false),
609            SemanticType::Field => {
610                // Safety: `field_id_to_index` is initialized by the semantic type.
611                let index = self.field_id_to_index.get(&column_id).unwrap();
612                let stats = ReadFormat::column_values(row_groups, column, *index, false);
613                StatValues::from_stats_opt(stats)
614            }
615            SemanticType::Timestamp => {
616                let index = self.time_index_position();
617                let stats = ReadFormat::column_values(row_groups, column, index, false);
618                StatValues::from_stats_opt(stats)
619            }
620        }
621    }
622
623    /// Returns null counts of specific column in row groups.
624    pub fn null_counts(
625        &self,
626        row_groups: &[impl Borrow<RowGroupMetaData>],
627        column_id: ColumnId,
628    ) -> StatValues {
629        let Some(column) = self.metadata.column_by_id(column_id) else {
630            // No such column in the SST.
631            return StatValues::NoColumn;
632        };
633        match column.semantic_type {
634            SemanticType::Tag => StatValues::NoStats,
635            SemanticType::Field => {
636                // Safety: `field_id_to_index` is initialized by the semantic type.
637                let index = self.field_id_to_index.get(&column_id).unwrap();
638                let stats = ReadFormat::column_null_counts(row_groups, *index);
639                StatValues::from_stats_opt(stats)
640            }
641            SemanticType::Timestamp => {
642                let index = self.time_index_position();
643                let stats = ReadFormat::column_null_counts(row_groups, index);
644                StatValues::from_stats_opt(stats)
645            }
646        }
647    }
648
649    /// Get fields from `record_batch`.
650    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
651        record_batch
652            .columns()
653            .iter()
654            .zip(record_batch.schema().fields())
655            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
656            .map(|(array, field)| {
657                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
658                let column = self
659                    .metadata
660                    .column_by_name(field.name())
661                    .with_context(|| InvalidRecordBatchSnafu {
662                        reason: format!("column {} not found in metadata", field.name()),
663                    })?;
664
665                Ok(BatchColumn {
666                    column_id: column.column_id,
667                    data: vector,
668                })
669            })
670            .collect()
671    }
672
673    /// Returns min/max values of specific tag.
674    fn tag_values(
675        &self,
676        row_groups: &[impl Borrow<RowGroupMetaData>],
677        column: &ColumnMetadata,
678        is_min: bool,
679    ) -> StatValues {
680        let is_first_tag = self
681            .metadata
682            .primary_key
683            .first()
684            .map(|id| *id == column.column_id)
685            .unwrap_or(false);
686        if !is_first_tag {
687            // Only the min-max of the first tag is available in the primary key.
688            return StatValues::NoStats;
689        }
690
691        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
692    }
693
694    /// Returns min/max values of the first tag.
695    /// Returns None if the tag does not have statistics.
696    fn first_tag_values(
697        &self,
698        row_groups: &[impl Borrow<RowGroupMetaData>],
699        column: &ColumnMetadata,
700        is_min: bool,
701    ) -> Option<ArrayRef> {
702        debug_assert!(
703            self.metadata
704                .primary_key
705                .first()
706                .map(|id| *id == column.column_id)
707                .unwrap_or(false)
708        );
709
710        let primary_key_encoding = self.metadata.primary_key_encoding;
711        let converter = build_primary_key_codec_with_fields(
712            primary_key_encoding,
713            [(
714                column.column_id,
715                SortField::new(column.column_schema.data_type.clone()),
716            )]
717            .into_iter(),
718        );
719
720        let values = row_groups.iter().map(|meta| {
721            let stats = meta
722                .borrow()
723                .column(self.primary_key_position())
724                .statistics()?;
725            match stats {
726                Statistics::Boolean(_) => None,
727                Statistics::Int32(_) => None,
728                Statistics::Int64(_) => None,
729                Statistics::Int96(_) => None,
730                Statistics::Float(_) => None,
731                Statistics::Double(_) => None,
732                Statistics::ByteArray(s) => {
733                    let bytes = if is_min {
734                        s.min_bytes_opt()?
735                    } else {
736                        s.max_bytes_opt()?
737                    };
738                    converter.decode_leftmost(bytes).ok()?
739                }
740                Statistics::FixedLenByteArray(_) => None,
741            }
742        });
743        let mut builder = column
744            .column_schema
745            .data_type
746            .create_mutable_vector(row_groups.len());
747        for value_opt in values {
748            match value_opt {
749                // Safety: We use the same data type to create the converter.
750                Some(v) => builder.push_value_ref(&v.as_value_ref()),
751                None => builder.push_null(),
752            }
753        }
754        let vector = builder.to_vector();
755
756        Some(vector.to_arrow_array())
757    }
758
759    /// Index in SST of the primary key.
760    fn primary_key_position(&self) -> usize {
761        self.arrow_schema.fields.len() - 3
762    }
763
764    /// Index in SST of the time index.
765    fn time_index_position(&self) -> usize {
766        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
767    }
768
769    /// Index of a field column by its column id.
770    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
771        self.field_id_to_projected_index.get(&column_id).copied()
772    }
773}
774
775/// Helper to compute the projection for the SST.
776pub(crate) struct FormatProjection {
777    /// Indices of columns to read from the SST. It contains all internal columns.
778    pub(crate) projection_indices: Vec<usize>,
779    /// Column id to their index in the projected schema (
780    /// the schema after projection).
781    ///
782    /// It doesn't contain time index column if it is not present in the projection.
783    pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
784}
785
786impl FormatProjection {
787    /// Computes the projection.
788    ///
789    /// `id_to_index` is a mapping from column id to the index of the column in the SST.
790    pub(crate) fn compute_format_projection(
791        id_to_index: &HashMap<ColumnId, usize>,
792        sst_column_num: usize,
793        column_ids: impl Iterator<Item = ColumnId>,
794    ) -> Self {
795        // Maps column id of a projected column to its index in SST.
796        // It also ignores columns not in the SST.
797        // [(column id, index in SST)]
798        let mut projected_schema: Vec<_> = column_ids
799            .filter_map(|column_id| {
800                id_to_index
801                    .get(&column_id)
802                    .copied()
803                    .map(|index| (column_id, index))
804            })
805            .collect();
806        // Sorts columns by their indices in the SST. SST uses a bitmap for projection.
807        // This ensures the schema of `projected_schema` is the same as the batch returned from the SST.
808        projected_schema.sort_unstable_by_key(|x| x.1);
809        // Dedups the entries to avoid the case that `column_ids` has duplicated columns.
810        projected_schema.dedup_by_key(|x| x.1);
811
812        // Collects all projected indices.
813        // It contains the positions of all columns we need to read.
814        let mut projection_indices: Vec<_> = projected_schema
815            .iter()
816            .map(|(_column_id, index)| *index)
817            // We need to add all fixed position columns.
818            .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
819            .collect();
820        projection_indices.sort_unstable();
821        // Removes duplications.
822        projection_indices.dedup();
823
824        // Creates a map from column id to the index of that column in the projected record batch.
825        let column_id_to_projected_index = projected_schema
826            .into_iter()
827            .map(|(column_id, _)| column_id)
828            .enumerate()
829            .map(|(index, column_id)| (column_id, index))
830            .collect();
831
832        Self {
833            projection_indices,
834            column_id_to_projected_index,
835        }
836    }
837}
838
839/// Values of column statistics of the SST.
840///
841/// It also distinguishes the case that a column is not found and
842/// the column exists but has no statistics.
843pub enum StatValues {
844    /// Values of each row group.
845    Values(ArrayRef),
846    /// No such column.
847    NoColumn,
848    /// Column exists but has no statistics.
849    NoStats,
850}
851
852impl StatValues {
853    /// Creates a new `StatValues` instance from optional statistics.
854    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
855        match stats {
856            Some(stats) => StatValues::Values(stats),
857            None => StatValues::NoStats,
858        }
859    }
860}
861
862#[cfg(test)]
863impl PrimaryKeyReadFormat {
864    /// Creates a helper with existing `metadata` and all columns.
865    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
866        Self::new(
867            Arc::clone(&metadata),
868            metadata.column_metadatas.iter().map(|c| c.column_id),
869        )
870    }
871}
872
873/// Compute offsets of different primary keys in the array.
874fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
875    if pk_dict_array.is_empty() {
876        return Ok(Vec::new());
877    }
878
879    // Init offsets.
880    let mut offsets = vec![0];
881    let keys = pk_dict_array.keys();
882    // We know that primary keys are always not null so we iterate `keys.values()` directly.
883    let pk_indices = keys.values();
884    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
885        // Compare each key with next key
886        if *key != pk_indices[i + 1] {
887            // We meet a new key, push the next index as end of the offset.
888            offsets.push(i + 1);
889        }
890    }
891    offsets.push(keys.len());
892
893    Ok(offsets)
894}
895
896/// Creates a new array for specific `primary_key`.
897fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
898    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
899    let keys = UInt32Array::from_value(0, num_rows);
900
901    // Safety: The key index is valid.
902    Arc::new(DictionaryArray::new(keys, values))
903}
904
905/// Gets the min/max time index of the row group from the parquet meta.
906/// It assumes the parquet is created by the mito engine.
907pub(crate) fn parquet_row_group_time_range(
908    file_meta: &FileMeta,
909    parquet_meta: &ParquetMetaData,
910    row_group_idx: usize,
911) -> Option<FileTimeRange> {
912    let row_group_meta = parquet_meta.row_group(row_group_idx);
913    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
914    assert!(
915        num_columns >= FIXED_POS_COLUMN_NUM,
916        "file only has {} columns",
917        num_columns
918    );
919    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
920
921    let stats = row_group_meta.column(time_index_pos).statistics()?;
922    // The physical type for the timestamp should be i64.
923    let (min, max) = match stats {
924        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
925        Statistics::Int32(_)
926        | Statistics::Boolean(_)
927        | Statistics::Int96(_)
928        | Statistics::Float(_)
929        | Statistics::Double(_)
930        | Statistics::ByteArray(_)
931        | Statistics::FixedLenByteArray(_) => {
932            common_telemetry::warn!(
933                "Invalid statistics {:?} for time index in parquet in {}",
934                stats,
935                file_meta.file_id
936            );
937            return None;
938        }
939    };
940
941    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
942    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
943    let unit = file_meta.time_range.0.unit();
944
945    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
946}
947
948/// Checks if sequence override is needed based on all row groups' statistics.
949/// Returns true if ALL row groups have sequence min-max values of 0.
950pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
951    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
952    if num_columns < FIXED_POS_COLUMN_NUM {
953        return false;
954    }
955
956    // The sequence column is the second-to-last column (before op_type)
957    let sequence_pos = num_columns - 2;
958
959    // Check all row groups - all must have sequence min-max of 0
960    for row_group in parquet_meta.row_groups() {
961        if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
962            if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
963                // If any row group doesn't have min=0 and max=0, return false
964                if *min_val != 0 || *max_val != 0 {
965                    return false;
966                }
967            } else {
968                // If any row group doesn't have statistics, return false
969                return false;
970            }
971        } else {
972            // If any row group doesn't have Int64 statistics, return false
973            return false;
974        }
975    }
976
977    // All row groups have sequence min-max of 0, or there are no row groups
978    !parquet_meta.row_groups().is_empty()
979}
980
981#[cfg(test)]
982mod tests {
983    use std::sync::Arc;
984
985    use api::v1::OpType;
986    use datatypes::arrow::array::{
987        Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
988    };
989    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
990    use datatypes::prelude::ConcreteDataType;
991    use datatypes::schema::ColumnSchema;
992    use datatypes::value::ValueRef;
993    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
994    use mito_codec::row_converter::{
995        DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
996    };
997    use store_api::codec::PrimaryKeyEncoding;
998    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
999    use store_api::storage::RegionId;
1000    use store_api::storage::consts::ReservedColumnId;
1001
1002    use super::*;
1003    use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1004    use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1005
1006    const TEST_SEQUENCE: u64 = 1;
1007    const TEST_OP_TYPE: u8 = OpType::Put as u8;
1008
1009    fn build_test_region_metadata() -> RegionMetadataRef {
1010        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1011        builder
1012            .push_column_metadata(ColumnMetadata {
1013                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1014                semantic_type: SemanticType::Tag,
1015                column_id: 1,
1016            })
1017            .push_column_metadata(ColumnMetadata {
1018                column_schema: ColumnSchema::new(
1019                    "field1",
1020                    ConcreteDataType::int64_datatype(),
1021                    true,
1022                ),
1023                semantic_type: SemanticType::Field,
1024                column_id: 4, // We change the order of fields columns.
1025            })
1026            .push_column_metadata(ColumnMetadata {
1027                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1028                semantic_type: SemanticType::Tag,
1029                column_id: 3,
1030            })
1031            .push_column_metadata(ColumnMetadata {
1032                column_schema: ColumnSchema::new(
1033                    "field0",
1034                    ConcreteDataType::int64_datatype(),
1035                    true,
1036                ),
1037                semantic_type: SemanticType::Field,
1038                column_id: 2,
1039            })
1040            .push_column_metadata(ColumnMetadata {
1041                column_schema: ColumnSchema::new(
1042                    "ts",
1043                    ConcreteDataType::timestamp_millisecond_datatype(),
1044                    false,
1045                ),
1046                semantic_type: SemanticType::Timestamp,
1047                column_id: 5,
1048            })
1049            .primary_key(vec![1, 3]);
1050        Arc::new(builder.build().unwrap())
1051    }
1052
1053    fn build_test_arrow_schema() -> SchemaRef {
1054        let fields = vec![
1055            Field::new("field1", ArrowDataType::Int64, true),
1056            Field::new("field0", ArrowDataType::Int64, true),
1057            Field::new(
1058                "ts",
1059                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1060                false,
1061            ),
1062            Field::new(
1063                "__primary_key",
1064                ArrowDataType::Dictionary(
1065                    Box::new(ArrowDataType::UInt32),
1066                    Box::new(ArrowDataType::Binary),
1067                ),
1068                false,
1069            ),
1070            Field::new("__sequence", ArrowDataType::UInt64, false),
1071            Field::new("__op_type", ArrowDataType::UInt8, false),
1072        ];
1073        Arc::new(Schema::new(fields))
1074    }
1075
1076    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1077        new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1078    }
1079
1080    fn new_batch_with_sequence(
1081        primary_key: &[u8],
1082        start_ts: i64,
1083        start_field: i64,
1084        num_rows: usize,
1085        sequence: u64,
1086    ) -> Batch {
1087        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1088        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1089        let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1090        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1091        let fields = vec![
1092            BatchColumn {
1093                column_id: 4,
1094                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1095            }, // field1
1096            BatchColumn {
1097                column_id: 2,
1098                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1099            }, // field0
1100        ];
1101
1102        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1103            .with_fields(fields)
1104            .build()
1105            .unwrap()
1106    }
1107
1108    #[test]
1109    fn test_to_sst_arrow_schema() {
1110        let metadata = build_test_region_metadata();
1111        let write_format = PrimaryKeyWriteFormat::new(metadata);
1112        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1113    }
1114
1115    #[test]
1116    fn test_new_primary_key_array() {
1117        let array = new_primary_key_array(b"test", 3);
1118        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1119        assert_eq!(&expect, &array);
1120    }
1121
1122    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1123        let values = Arc::new(BinaryArray::from_iter_values(
1124            pk_row_nums.iter().map(|v| &v.0),
1125        ));
1126        let mut keys = vec![];
1127        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1128            keys.extend(std::iter::repeat_n(index as u32, num_rows));
1129        }
1130        let keys = UInt32Array::from(keys);
1131        Arc::new(DictionaryArray::new(keys, values))
1132    }
1133
1134    #[test]
1135    fn test_convert_batch() {
1136        let metadata = build_test_region_metadata();
1137        let write_format = PrimaryKeyWriteFormat::new(metadata);
1138
1139        let num_rows = 4;
1140        let batch = new_batch(b"test", 1, 2, num_rows);
1141        let columns: Vec<ArrayRef> = vec![
1142            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1143            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1144            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1145            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1146            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1147            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1148        ];
1149        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1150
1151        let actual = write_format.convert_batch(&batch).unwrap();
1152        assert_eq!(expect_record, actual);
1153    }
1154
1155    #[test]
1156    fn test_convert_batch_with_override_sequence() {
1157        let metadata = build_test_region_metadata();
1158        let write_format =
1159            PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1160
1161        let num_rows = 4;
1162        let batch = new_batch(b"test", 1, 2, num_rows);
1163        let columns: Vec<ArrayRef> = vec![
1164            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1165            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1166            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1167            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
1168            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
1169            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1170        ];
1171        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1172
1173        let actual = write_format.convert_batch(&batch).unwrap();
1174        assert_eq!(expect_record, actual);
1175    }
1176
1177    #[test]
1178    fn test_projection_indices() {
1179        let metadata = build_test_region_metadata();
1180        // Only read tag1
1181        let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1182        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1183        // Only read field1
1184        let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1185        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1186        // Only read ts
1187        let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1188        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1189        // Read field0, tag0, ts
1190        let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1191        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1192    }
1193
1194    #[test]
1195    fn test_empty_primary_key_offsets() {
1196        let array = build_test_pk_array(&[]);
1197        assert!(primary_key_offsets(&array).unwrap().is_empty());
1198    }
1199
1200    #[test]
1201    fn test_primary_key_offsets_one_series() {
1202        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1203        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1204
1205        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1206        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1207
1208        let array = build_test_pk_array(&[
1209            (b"one".to_vec(), 1),
1210            (b"two".to_vec(), 1),
1211            (b"three".to_vec(), 1),
1212        ]);
1213        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1214    }
1215
1216    #[test]
1217    fn test_primary_key_offsets_multi_series() {
1218        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1219        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1220
1221        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1222        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1223
1224        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1225        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1226    }
1227
1228    #[test]
1229    fn test_convert_empty_record_batch() {
1230        let metadata = build_test_region_metadata();
1231        let arrow_schema = build_test_arrow_schema();
1232        let column_ids: Vec<_> = metadata
1233            .column_metadatas
1234            .iter()
1235            .map(|col| col.column_id)
1236            .collect();
1237        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1238        assert_eq!(arrow_schema, *read_format.arrow_schema());
1239
1240        let record_batch = RecordBatch::new_empty(arrow_schema);
1241        let mut batches = VecDeque::new();
1242        read_format
1243            .convert_record_batch(&record_batch, None, &mut batches)
1244            .unwrap();
1245        assert!(batches.is_empty());
1246    }
1247
1248    #[test]
1249    fn test_convert_record_batch() {
1250        let metadata = build_test_region_metadata();
1251        let column_ids: Vec<_> = metadata
1252            .column_metadatas
1253            .iter()
1254            .map(|col| col.column_id)
1255            .collect();
1256        let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1257
1258        let columns: Vec<ArrayRef> = vec![
1259            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1260            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1261            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1262            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1263            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1264            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1265        ];
1266        let arrow_schema = build_test_arrow_schema();
1267        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1268        let mut batches = VecDeque::new();
1269        read_format
1270            .convert_record_batch(&record_batch, None, &mut batches)
1271            .unwrap();
1272
1273        assert_eq!(
1274            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1275            batches.into_iter().collect::<Vec<_>>(),
1276        );
1277    }
1278
1279    #[test]
1280    fn test_convert_record_batch_with_override_sequence() {
1281        let metadata = build_test_region_metadata();
1282        let column_ids: Vec<_> = metadata
1283            .column_metadatas
1284            .iter()
1285            .map(|col| col.column_id)
1286            .collect();
1287        let read_format =
1288            ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1289
1290        let columns: Vec<ArrayRef> = vec![
1291            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
1292            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
1293            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
1294            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
1295            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
1296            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
1297        ];
1298        let arrow_schema = build_test_arrow_schema();
1299        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1300
1301        // Create override sequence array with custom values
1302        let override_sequence: u64 = 12345;
1303        let override_sequence_array: ArrayRef =
1304            Arc::new(UInt64Array::from_value(override_sequence, 4));
1305
1306        let mut batches = VecDeque::new();
1307        read_format
1308            .as_primary_key()
1309            .unwrap()
1310            .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1311            .unwrap();
1312
1313        // Create expected batches with override sequence
1314        let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1315        let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1316
1317        assert_eq!(
1318            vec![expected_batch1, expected_batch2],
1319            batches.into_iter().collect::<Vec<_>>(),
1320        );
1321    }
1322
1323    fn build_test_flat_sst_schema() -> SchemaRef {
1324        let fields = vec![
1325            Field::new("tag0", ArrowDataType::Int64, true), // primary key columns first
1326            Field::new("tag1", ArrowDataType::Int64, true),
1327            Field::new("field1", ArrowDataType::Int64, true), // then field columns
1328            Field::new("field0", ArrowDataType::Int64, true),
1329            Field::new(
1330                "ts",
1331                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1332                false,
1333            ),
1334            Field::new(
1335                "__primary_key",
1336                ArrowDataType::Dictionary(
1337                    Box::new(ArrowDataType::UInt32),
1338                    Box::new(ArrowDataType::Binary),
1339                ),
1340                false,
1341            ),
1342            Field::new("__sequence", ArrowDataType::UInt64, false),
1343            Field::new("__op_type", ArrowDataType::UInt8, false),
1344        ];
1345        Arc::new(Schema::new(fields))
1346    }
1347
1348    #[test]
1349    fn test_flat_to_sst_arrow_schema() {
1350        let metadata = build_test_region_metadata();
1351        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1352        assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1353    }
1354
1355    fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1356        vec![
1357            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1358            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1359            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1360            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1361            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1362            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1363            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
1364            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1365        ]
1366    }
1367
1368    #[test]
1369    fn test_flat_convert_batch() {
1370        let metadata = build_test_region_metadata();
1371        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1372
1373        let num_rows = 4;
1374        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1375        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1376        let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1377
1378        let actual = format.convert_batch(&batch).unwrap();
1379        assert_eq!(expect_record, actual);
1380    }
1381
1382    #[test]
1383    fn test_flat_convert_with_override_sequence() {
1384        let metadata = build_test_region_metadata();
1385        let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1386            .with_override_sequence(Some(415411));
1387
1388        let num_rows = 4;
1389        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1390        let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1391
1392        let expected_columns: Vec<ArrayRef> = vec![
1393            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0
1394            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1
1395            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1396            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1397            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1398            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // __primary_key
1399            Arc::new(UInt64Array::from(vec![415411; num_rows])), // overridden sequence
1400            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1401        ];
1402        let expected_record =
1403            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1404
1405        let actual = format.convert_batch(&batch).unwrap();
1406        assert_eq!(expected_record, actual);
1407    }
1408
1409    #[test]
1410    fn test_flat_projection_indices() {
1411        let metadata = build_test_region_metadata();
1412        // Based on flat format: tag0(0), tag1(1), field1(2), field0(3), ts(4), __primary_key(5), __sequence(6), __op_type(7)
1413        // The projection includes all "fixed position" columns: ts(4), __primary_key(5), __sequence(6), __op_type(7)
1414
1415        // Only read tag1 (column_id=3, index=1) + fixed columns
1416        let read_format =
1417            ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1418                .unwrap();
1419        assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1420
1421        // Only read field1 (column_id=4, index=2) + fixed columns
1422        let read_format =
1423            ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1424                .unwrap();
1425        assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1426
1427        // Only read ts (column_id=5, index=4) + fixed columns (ts is already included in fixed)
1428        let read_format =
1429            ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1430                .unwrap();
1431        assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1432
1433        // Read field0(column_id=2, index=3), tag0(column_id=1, index=0), ts(column_id=5, index=4) + fixed columns
1434        let read_format =
1435            ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1436        assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1437    }
1438
1439    #[test]
1440    fn test_flat_read_format_convert_batch() {
1441        let metadata = build_test_region_metadata();
1442        let mut format = FlatReadFormat::new(
1443            metadata,
1444            std::iter::once(1), // Just read tag0
1445            Some(8),
1446            "test",
1447            false,
1448        )
1449        .unwrap();
1450
1451        let num_rows = 4;
1452        let original_sequence = 100u64;
1453        let override_sequence = 200u64;
1454
1455        // Create a test record batch
1456        let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1457        let mut test_columns = columns.clone();
1458        // Replace sequence column with original sequence values
1459        test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1460        let record_batch =
1461            RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1462
1463        // Test without override sequence - should return clone
1464        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1465        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1466        let sequence_array = sequence_column
1467            .as_any()
1468            .downcast_ref::<UInt64Array>()
1469            .unwrap();
1470
1471        let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1472        assert_eq!(sequence_array, &expected_original);
1473
1474        // Set override sequence and test with new_override_sequence_array
1475        format.set_override_sequence(Some(override_sequence));
1476        let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1477        let result = format
1478            .convert_batch(record_batch, Some(&override_sequence_array))
1479            .unwrap();
1480        let sequence_column = result.column(sequence_column_index(result.num_columns()));
1481        let sequence_array = sequence_column
1482            .as_any()
1483            .downcast_ref::<UInt64Array>()
1484            .unwrap();
1485
1486        let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1487        assert_eq!(sequence_array, &expected_override);
1488    }
1489
1490    #[test]
1491    fn test_need_convert_to_flat() {
1492        let metadata = build_test_region_metadata();
1493
1494        // Test case 1: Same number of columns, no conversion needed
1495        // For flat format: all columns (5) + internal columns (3)
1496        let expected_columns = metadata.column_metadatas.len() + 3;
1497        let result =
1498            FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1499        assert!(
1500            !result,
1501            "Should not need conversion when column counts match"
1502        );
1503
1504        // Test case 2: Different number of columns, need conversion
1505        // Missing primary key columns (2 primary keys in test metadata)
1506        let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1507        let result =
1508            FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1509                .unwrap();
1510        assert!(
1511            result,
1512            "Should need conversion when primary key columns are missing"
1513        );
1514
1515        // Test case 3: Invalid case - actual columns more than expected
1516        let too_many_columns = expected_columns + 1;
1517        let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1518            .unwrap_err();
1519        assert!(err.to_string().contains("Expected columns"), "{err:?}");
1520
1521        // Test case 4: Invalid case - column difference doesn't match primary key count
1522        let wrong_diff_columns = expected_columns - 1; // Difference of 1, but we have 2 primary keys
1523        let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1524            .unwrap_err();
1525        assert!(
1526            err.to_string().contains("Column number difference"),
1527            "{err:?}"
1528        );
1529    }
1530
1531    fn build_test_dense_pk_array(
1532        codec: &DensePrimaryKeyCodec,
1533        pk_values_per_row: &[&[Option<i64>]],
1534    ) -> Arc<PrimaryKeyArray> {
1535        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1536
1537        for pk_values_row in pk_values_per_row {
1538            let values: Vec<ValueRef> = pk_values_row
1539                .iter()
1540                .map(|opt| match opt {
1541                    Some(val) => ValueRef::Int64(*val),
1542                    None => ValueRef::Null,
1543                })
1544                .collect();
1545
1546            let encoded = codec.encode(values.into_iter()).unwrap();
1547            builder.append_value(&encoded);
1548        }
1549
1550        Arc::new(builder.finish())
1551    }
1552
1553    fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1554        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1555        builder
1556            .push_column_metadata(ColumnMetadata {
1557                column_schema: ColumnSchema::new(
1558                    "__table_id",
1559                    ConcreteDataType::uint32_datatype(),
1560                    false,
1561                ),
1562                semantic_type: SemanticType::Tag,
1563                column_id: ReservedColumnId::table_id(),
1564            })
1565            .push_column_metadata(ColumnMetadata {
1566                column_schema: ColumnSchema::new(
1567                    "__tsid",
1568                    ConcreteDataType::uint64_datatype(),
1569                    false,
1570                ),
1571                semantic_type: SemanticType::Tag,
1572                column_id: ReservedColumnId::tsid(),
1573            })
1574            .push_column_metadata(ColumnMetadata {
1575                column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1576                semantic_type: SemanticType::Tag,
1577                column_id: 1,
1578            })
1579            .push_column_metadata(ColumnMetadata {
1580                column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1581                semantic_type: SemanticType::Tag,
1582                column_id: 3,
1583            })
1584            .push_column_metadata(ColumnMetadata {
1585                column_schema: ColumnSchema::new(
1586                    "field1",
1587                    ConcreteDataType::int64_datatype(),
1588                    true,
1589                ),
1590                semantic_type: SemanticType::Field,
1591                column_id: 4,
1592            })
1593            .push_column_metadata(ColumnMetadata {
1594                column_schema: ColumnSchema::new(
1595                    "field0",
1596                    ConcreteDataType::int64_datatype(),
1597                    true,
1598                ),
1599                semantic_type: SemanticType::Field,
1600                column_id: 2,
1601            })
1602            .push_column_metadata(ColumnMetadata {
1603                column_schema: ColumnSchema::new(
1604                    "ts",
1605                    ConcreteDataType::timestamp_millisecond_datatype(),
1606                    false,
1607                ),
1608                semantic_type: SemanticType::Timestamp,
1609                column_id: 5,
1610            })
1611            .primary_key(vec![
1612                ReservedColumnId::table_id(),
1613                ReservedColumnId::tsid(),
1614                1,
1615                3,
1616            ])
1617            .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1618        Arc::new(builder.build().unwrap())
1619    }
1620
1621    fn build_test_sparse_pk_array(
1622        codec: &SparsePrimaryKeyCodec,
1623        pk_values_per_row: &[SparseTestRow],
1624    ) -> Arc<PrimaryKeyArray> {
1625        let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1626        for row in pk_values_per_row {
1627            let values = vec![
1628                (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1629                (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1630                (1, ValueRef::String(&row.tag0)),
1631                (3, ValueRef::String(&row.tag1)),
1632            ];
1633
1634            let mut buffer = Vec::new();
1635            codec.encode_value_refs(&values, &mut buffer).unwrap();
1636            builder.append_value(&buffer);
1637        }
1638
1639        Arc::new(builder.finish())
1640    }
1641
1642    #[derive(Clone)]
1643    struct SparseTestRow {
1644        table_id: u32,
1645        tsid: u64,
1646        tag0: String,
1647        tag1: String,
1648    }
1649
1650    #[test]
1651    fn test_flat_read_format_convert_format_with_dense_encoding() {
1652        let metadata = build_test_region_metadata();
1653
1654        let column_ids: Vec<_> = metadata
1655            .column_metadatas
1656            .iter()
1657            .map(|c| c.column_id)
1658            .collect();
1659        let format = FlatReadFormat::new(
1660            metadata.clone(),
1661            column_ids.into_iter(),
1662            Some(6),
1663            "test",
1664            false,
1665        )
1666        .unwrap();
1667
1668        let num_rows = 4;
1669        let original_sequence = 100u64;
1670
1671        // Create primary key values for each row: tag0=1, tag1=1 for all rows
1672        let pk_values_per_row = vec![
1673                &[Some(1i64), Some(1i64)][..]; num_rows  // All rows have same primary key values
1674            ];
1675
1676        // Create a test record batch in old format using dense encoding
1677        let codec = DensePrimaryKeyCodec::new(&metadata);
1678        let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1679        let columns: Vec<ArrayRef> = vec![
1680            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1681            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1682            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1683            dense_pk_array.clone(),                        // __primary_key (dense encoding)
1684            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1685            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1686        ];
1687
1688        // Create schema for old format (without primary key columns)
1689        let old_format_fields = vec![
1690            Field::new("field1", ArrowDataType::Int64, true),
1691            Field::new("field0", ArrowDataType::Int64, true),
1692            Field::new(
1693                "ts",
1694                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1695                false,
1696            ),
1697            Field::new(
1698                "__primary_key",
1699                ArrowDataType::Dictionary(
1700                    Box::new(ArrowDataType::UInt32),
1701                    Box::new(ArrowDataType::Binary),
1702                ),
1703                false,
1704            ),
1705            Field::new("__sequence", ArrowDataType::UInt64, false),
1706            Field::new("__op_type", ArrowDataType::UInt8, false),
1707        ];
1708        let old_schema = Arc::new(Schema::new(old_format_fields));
1709        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1710
1711        // Test conversion with dense encoding
1712        let result = format.convert_batch(record_batch, None).unwrap();
1713
1714        // Construct expected RecordBatch in flat format with decoded primary key columns
1715        let expected_columns: Vec<ArrayRef> = vec![
1716            Arc::new(Int64Array::from(vec![1; num_rows])), // tag0 (decoded from primary key)
1717            Arc::new(Int64Array::from(vec![1; num_rows])), // tag1 (decoded from primary key)
1718            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1719            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1720            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1721            dense_pk_array,                                // __primary_key (preserved)
1722            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1723            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1724        ];
1725        let expected_record_batch =
1726            RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1727
1728        // Compare the actual result with the expected record batch
1729        assert_eq!(expected_record_batch, result);
1730    }
1731
1732    #[test]
1733    fn test_flat_read_format_convert_format_with_sparse_encoding() {
1734        let metadata = build_test_sparse_region_metadata();
1735
1736        let column_ids: Vec<_> = metadata
1737            .column_metadatas
1738            .iter()
1739            .map(|c| c.column_id)
1740            .collect();
1741        let format = FlatReadFormat::new(
1742            metadata.clone(),
1743            column_ids.clone().into_iter(),
1744            None,
1745            "test",
1746            false,
1747        )
1748        .unwrap();
1749
1750        let num_rows = 4;
1751        let original_sequence = 100u64;
1752
1753        // Create sparse test data with table_id, tsid and string tags
1754        let pk_test_rows = vec![
1755            SparseTestRow {
1756                table_id: 1,
1757                tsid: 123,
1758                tag0: "frontend".to_string(),
1759                tag1: "pod1".to_string(),
1760            };
1761            num_rows
1762        ];
1763
1764        let codec = SparsePrimaryKeyCodec::new(&metadata);
1765        let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1766        // Create a test record batch in old format using sparse encoding
1767        let columns: Vec<ArrayRef> = vec![
1768            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
1769            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
1770            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1771            sparse_pk_array.clone(),                       // __primary_key (sparse encoding)
1772            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1773            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1774        ];
1775
1776        // Create schema for old format (without primary key columns)
1777        let old_format_fields = vec![
1778            Field::new("field1", ArrowDataType::Int64, true),
1779            Field::new("field0", ArrowDataType::Int64, true),
1780            Field::new(
1781                "ts",
1782                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1783                false,
1784            ),
1785            Field::new(
1786                "__primary_key",
1787                ArrowDataType::Dictionary(
1788                    Box::new(ArrowDataType::UInt32),
1789                    Box::new(ArrowDataType::Binary),
1790                ),
1791                false,
1792            ),
1793            Field::new("__sequence", ArrowDataType::UInt64, false),
1794            Field::new("__op_type", ArrowDataType::UInt8, false),
1795        ];
1796        let old_schema = Arc::new(Schema::new(old_format_fields));
1797        let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1798
1799        // Test conversion with sparse encoding
1800        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1801
1802        // Construct expected RecordBatch in flat format with decoded primary key columns
1803        let tag0_array = Arc::new(DictionaryArray::new(
1804            UInt32Array::from(vec![0; num_rows]),
1805            Arc::new(StringArray::from(vec!["frontend"])),
1806        ));
1807        let tag1_array = Arc::new(DictionaryArray::new(
1808            UInt32Array::from(vec![0; num_rows]),
1809            Arc::new(StringArray::from(vec!["pod1"])),
1810        ));
1811        let expected_columns: Vec<ArrayRef> = vec![
1812            Arc::new(UInt32Array::from(vec![1; num_rows])), // __table_id (decoded from primary key)
1813            Arc::new(UInt64Array::from(vec![123; num_rows])), // __tsid (decoded from primary key)
1814            tag0_array,                                     // tag0 (decoded from primary key)
1815            tag1_array,                                     // tag1 (decoded from primary key)
1816            Arc::new(Int64Array::from(vec![2; num_rows])),  // field1
1817            Arc::new(Int64Array::from(vec![3; num_rows])),  // field0
1818            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
1819            sparse_pk_array,                                // __primary_key (preserved)
1820            Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), // sequence
1821            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
1822        ];
1823        let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1824        let expected_record_batch =
1825            RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1826
1827        // Compare the actual result with the expected record batch
1828        assert_eq!(expected_record_batch, result);
1829
1830        let format =
1831            FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1832                .unwrap();
1833        // Test conversion with sparse encoding and skip convert.
1834        let result = format.convert_batch(record_batch.clone(), None).unwrap();
1835        assert_eq!(record_batch, result);
1836    }
1837}