mito2/sst/parquet/
format.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Format to store in parquet.
16//!
17//! We store three internal columns in parquet:
18//! - `__primary_key`, the primary key of the row (tags). Type: dictionary(uint32, binary)
19//! - `__sequence`, the sequence number of a row. Type: uint64
20//! - `__op_type`, the op type of the row. Type: uint8
21//!
22//! The schema of a parquet file is:
23//! ```text
24//! field 0, field 1, ..., field N, time index, primary key, sequence, op type
25//! ```
26//!
27//! We stores fields in the same order as [RegionMetadata::field_columns()](store_api::metadata::RegionMetadata::field_columns()).
28
29use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow::record_batch::RecordBatch;
39use datatypes::prelude::DataType;
40use datatypes::vectors::{Helper, Vector};
41use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
42use parquet::file::statistics::Statistics;
43use snafu::{ensure, OptionExt, ResultExt};
44use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
45use store_api::storage::{ColumnId, SequenceNumber};
46
47use crate::error::{
48    ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
49};
50use crate::read::{Batch, BatchBuilder, BatchColumn};
51use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
52use crate::sst::file::{FileMeta, FileTimeRange};
53use crate::sst::to_sst_arrow_schema;
54
55/// Arrow array type for the primary key dictionary.
56pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
57
58/// Number of columns that have fixed positions.
59///
60/// Contains: time index and internal columns.
61const FIXED_POS_COLUMN_NUM: usize = 4;
62
63/// Helper for writing the SST format.
64pub(crate) struct WriteFormat {
65    metadata: RegionMetadataRef,
66    /// SST file schema.
67    arrow_schema: SchemaRef,
68    override_sequence: Option<SequenceNumber>,
69}
70
71impl WriteFormat {
72    /// Creates a new helper.
73    pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
74        let arrow_schema = to_sst_arrow_schema(&metadata);
75        WriteFormat {
76            metadata,
77            arrow_schema,
78            override_sequence: None,
79        }
80    }
81
82    /// Set override sequence.
83    pub(crate) fn with_override_sequence(
84        mut self,
85        override_sequence: Option<SequenceNumber>,
86    ) -> Self {
87        self.override_sequence = override_sequence;
88        self
89    }
90
91    /// Gets the arrow schema to store in parquet.
92    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
93        &self.arrow_schema
94    }
95
96    /// Convert `batch` to a arrow record batch to store in parquet.
97    pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
98        debug_assert_eq!(
99            batch.fields().len() + FIXED_POS_COLUMN_NUM,
100            self.arrow_schema.fields().len()
101        );
102        let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
103        // Store all fields first.
104        for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
105            ensure!(
106                column.column_id == column_metadata.column_id,
107                InvalidBatchSnafu {
108                    reason: format!(
109                        "Batch has column {} but metadata has column {}",
110                        column.column_id, column_metadata.column_id
111                    ),
112                }
113            );
114
115            columns.push(column.data.to_arrow_array());
116        }
117        // Add time index column.
118        columns.push(batch.timestamps().to_arrow_array());
119        // Add internal columns: primary key, sequences, op types.
120        columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
121
122        if let Some(override_sequence) = self.override_sequence {
123            let sequence_array =
124                Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
125            columns.push(sequence_array);
126        } else {
127            columns.push(batch.sequences().to_arrow_array());
128        }
129        columns.push(batch.op_types().to_arrow_array());
130
131        RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
132    }
133}
134
135/// Helper for reading the SST format.
136pub struct ReadFormat {
137    /// The metadata stored in the SST.
138    metadata: RegionMetadataRef,
139    /// SST file schema.
140    arrow_schema: SchemaRef,
141    /// Field column id to its index in `schema` (SST schema).
142    /// In SST schema, fields are stored in the front of the schema.
143    field_id_to_index: HashMap<ColumnId, usize>,
144    /// Indices of columns to read from the SST. It contains all internal columns.
145    projection_indices: Vec<usize>,
146    /// Field column id to their index in the projected schema (
147    /// the schema of [Batch]).
148    field_id_to_projected_index: HashMap<ColumnId, usize>,
149}
150
151impl ReadFormat {
152    /// Creates a helper with existing `metadata` and `column_ids` to read.
153    pub fn new(
154        metadata: RegionMetadataRef,
155        column_ids: impl Iterator<Item = ColumnId>,
156    ) -> ReadFormat {
157        let field_id_to_index: HashMap<_, _> = metadata
158            .field_columns()
159            .enumerate()
160            .map(|(index, column)| (column.column_id, index))
161            .collect();
162        let arrow_schema = to_sst_arrow_schema(&metadata);
163
164        // Maps column id of a projected field to its index in SST.
165        let mut projected_field_id_index: Vec<_> = column_ids
166            .filter_map(|column_id| {
167                // Only apply projection to fields.
168                field_id_to_index
169                    .get(&column_id)
170                    .copied()
171                    .map(|index| (column_id, index))
172            })
173            .collect();
174        let mut projection_indices: Vec<_> = projected_field_id_index
175            .iter()
176            .map(|(_column_id, index)| *index)
177            // We need to add all fixed position columns.
178            .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
179            .collect();
180        projection_indices.sort_unstable();
181
182        // Sort fields by their indices in the SST. Then the order of fields is their order
183        // in the Batch.
184        projected_field_id_index.sort_unstable_by_key(|x| x.1);
185        // Because the SST put fields before other columns, we don't need to consider other
186        // columns.
187        let field_id_to_projected_index = projected_field_id_index
188            .into_iter()
189            .map(|(column_id, _)| column_id)
190            .enumerate()
191            .map(|(index, column_id)| (column_id, index))
192            .collect();
193
194        ReadFormat {
195            metadata,
196            arrow_schema,
197            field_id_to_index,
198            projection_indices,
199            field_id_to_projected_index,
200        }
201    }
202
203    /// Gets the arrow schema of the SST file.
204    ///
205    /// This schema is computed from the region metadata but should be the same
206    /// as the arrow schema decoded from the file metadata.
207    pub(crate) fn arrow_schema(&self) -> &SchemaRef {
208        &self.arrow_schema
209    }
210
211    /// Gets the metadata of the SST.
212    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
213        &self.metadata
214    }
215
216    /// Gets sorted projection indices to read.
217    pub(crate) fn projection_indices(&self) -> &[usize] {
218        &self.projection_indices
219    }
220
221    /// Convert a arrow record batch into `batches`.
222    ///
223    /// Note that the `record_batch` may only contains a subset of columns if it is projected.
224    pub fn convert_record_batch(
225        &self,
226        record_batch: &RecordBatch,
227        batches: &mut VecDeque<Batch>,
228    ) -> Result<()> {
229        debug_assert!(batches.is_empty());
230
231        // The record batch must has time index and internal columns.
232        ensure!(
233            record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
234            InvalidRecordBatchSnafu {
235                reason: format!(
236                    "record batch only has {} columns",
237                    record_batch.num_columns()
238                ),
239            }
240        );
241
242        let mut fixed_pos_columns = record_batch
243            .columns()
244            .iter()
245            .rev()
246            .take(FIXED_POS_COLUMN_NUM);
247        // Safety: We have checked the column number.
248        let op_type_array = fixed_pos_columns.next().unwrap();
249        let sequence_array = fixed_pos_columns.next().unwrap();
250        let pk_array = fixed_pos_columns.next().unwrap();
251        let ts_array = fixed_pos_columns.next().unwrap();
252        let field_batch_columns = self.get_field_batch_columns(record_batch)?;
253
254        // Compute primary key offsets.
255        let pk_dict_array = pk_array
256            .as_any()
257            .downcast_ref::<PrimaryKeyArray>()
258            .with_context(|| InvalidRecordBatchSnafu {
259                reason: format!("primary key array should not be {:?}", pk_array.data_type()),
260            })?;
261        let offsets = primary_key_offsets(pk_dict_array)?;
262        if offsets.is_empty() {
263            return Ok(());
264        }
265
266        // Split record batch according to pk offsets.
267        let keys = pk_dict_array.keys();
268        let pk_values = pk_dict_array
269            .values()
270            .as_any()
271            .downcast_ref::<BinaryArray>()
272            .with_context(|| InvalidRecordBatchSnafu {
273                reason: format!(
274                    "values of primary key array should not be {:?}",
275                    pk_dict_array.values().data_type()
276                ),
277            })?;
278        for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
279            let end = offsets[i + 1];
280            let rows_in_batch = end - start;
281            let dict_key = keys.value(*start);
282            let primary_key = pk_values.value(dict_key as usize).to_vec();
283
284            let mut builder = BatchBuilder::new(primary_key);
285            builder
286                .timestamps_array(ts_array.slice(*start, rows_in_batch))?
287                .sequences_array(sequence_array.slice(*start, rows_in_batch))?
288                .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
289            // Push all fields
290            for batch_column in &field_batch_columns {
291                builder.push_field(BatchColumn {
292                    column_id: batch_column.column_id,
293                    data: batch_column.data.slice(*start, rows_in_batch),
294                });
295            }
296
297            let batch = builder.build()?;
298            batches.push_back(batch);
299        }
300
301        Ok(())
302    }
303
304    /// Returns min values of specific column in row groups.
305    pub fn min_values(
306        &self,
307        row_groups: &[impl Borrow<RowGroupMetaData>],
308        column_id: ColumnId,
309    ) -> StatValues {
310        let Some(column) = self.metadata.column_by_id(column_id) else {
311            // No such column in the SST.
312            return StatValues::NoColumn;
313        };
314        match column.semantic_type {
315            SemanticType::Tag => self.tag_values(row_groups, column, true),
316            SemanticType::Field => {
317                // Safety: `field_id_to_index` is initialized by the semantic type.
318                let index = self.field_id_to_index.get(&column_id).unwrap();
319                let stats = Self::column_values(row_groups, column, *index, true);
320                StatValues::from_stats_opt(stats)
321            }
322            SemanticType::Timestamp => {
323                let index = self.time_index_position();
324                let stats = Self::column_values(row_groups, column, index, true);
325                StatValues::from_stats_opt(stats)
326            }
327        }
328    }
329
330    /// Returns max values of specific column in row groups.
331    pub fn max_values(
332        &self,
333        row_groups: &[impl Borrow<RowGroupMetaData>],
334        column_id: ColumnId,
335    ) -> StatValues {
336        let Some(column) = self.metadata.column_by_id(column_id) else {
337            // No such column in the SST.
338            return StatValues::NoColumn;
339        };
340        match column.semantic_type {
341            SemanticType::Tag => self.tag_values(row_groups, column, false),
342            SemanticType::Field => {
343                // Safety: `field_id_to_index` is initialized by the semantic type.
344                let index = self.field_id_to_index.get(&column_id).unwrap();
345                let stats = Self::column_values(row_groups, column, *index, false);
346                StatValues::from_stats_opt(stats)
347            }
348            SemanticType::Timestamp => {
349                let index = self.time_index_position();
350                let stats = Self::column_values(row_groups, column, index, false);
351                StatValues::from_stats_opt(stats)
352            }
353        }
354    }
355
356    /// Returns null counts of specific column in row groups.
357    pub fn null_counts(
358        &self,
359        row_groups: &[impl Borrow<RowGroupMetaData>],
360        column_id: ColumnId,
361    ) -> StatValues {
362        let Some(column) = self.metadata.column_by_id(column_id) else {
363            // No such column in the SST.
364            return StatValues::NoColumn;
365        };
366        match column.semantic_type {
367            SemanticType::Tag => StatValues::NoStats,
368            SemanticType::Field => {
369                // Safety: `field_id_to_index` is initialized by the semantic type.
370                let index = self.field_id_to_index.get(&column_id).unwrap();
371                let stats = Self::column_null_counts(row_groups, *index);
372                StatValues::from_stats_opt(stats)
373            }
374            SemanticType::Timestamp => {
375                let index = self.time_index_position();
376                let stats = Self::column_null_counts(row_groups, index);
377                StatValues::from_stats_opt(stats)
378            }
379        }
380    }
381
382    /// Get fields from `record_batch`.
383    fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
384        record_batch
385            .columns()
386            .iter()
387            .zip(record_batch.schema().fields())
388            .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) // Take all field columns.
389            .map(|(array, field)| {
390                let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
391                let column = self
392                    .metadata
393                    .column_by_name(field.name())
394                    .with_context(|| InvalidRecordBatchSnafu {
395                        reason: format!("column {} not found in metadata", field.name()),
396                    })?;
397
398                Ok(BatchColumn {
399                    column_id: column.column_id,
400                    data: vector,
401                })
402            })
403            .collect()
404    }
405
406    /// Returns min/max values of specific tag.
407    fn tag_values(
408        &self,
409        row_groups: &[impl Borrow<RowGroupMetaData>],
410        column: &ColumnMetadata,
411        is_min: bool,
412    ) -> StatValues {
413        let is_first_tag = self
414            .metadata
415            .primary_key
416            .first()
417            .map(|id| *id == column.column_id)
418            .unwrap_or(false);
419        if !is_first_tag {
420            // Only the min-max of the first tag is available in the primary key.
421            return StatValues::NoStats;
422        }
423
424        StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
425    }
426
427    /// Returns min/max values of the first tag.
428    /// Returns None if the tag does not have statistics.
429    fn first_tag_values(
430        &self,
431        row_groups: &[impl Borrow<RowGroupMetaData>],
432        column: &ColumnMetadata,
433        is_min: bool,
434    ) -> Option<ArrayRef> {
435        debug_assert!(self
436            .metadata
437            .primary_key
438            .first()
439            .map(|id| *id == column.column_id)
440            .unwrap_or(false));
441
442        let primary_key_encoding = self.metadata.primary_key_encoding;
443        let converter = build_primary_key_codec_with_fields(
444            primary_key_encoding,
445            [(
446                column.column_id,
447                SortField::new(column.column_schema.data_type.clone()),
448            )]
449            .into_iter(),
450        );
451
452        let values = row_groups.iter().map(|meta| {
453            let stats = meta
454                .borrow()
455                .column(self.primary_key_position())
456                .statistics()?;
457            match stats {
458                Statistics::Boolean(_) => None,
459                Statistics::Int32(_) => None,
460                Statistics::Int64(_) => None,
461                Statistics::Int96(_) => None,
462                Statistics::Float(_) => None,
463                Statistics::Double(_) => None,
464                Statistics::ByteArray(s) => {
465                    let bytes = if is_min {
466                        s.min_bytes_opt()?
467                    } else {
468                        s.max_bytes_opt()?
469                    };
470                    converter.decode_leftmost(bytes).ok()?
471                }
472                Statistics::FixedLenByteArray(_) => None,
473            }
474        });
475        let mut builder = column
476            .column_schema
477            .data_type
478            .create_mutable_vector(row_groups.len());
479        for value_opt in values {
480            match value_opt {
481                // Safety: We use the same data type to create the converter.
482                Some(v) => builder.push_value_ref(v.as_value_ref()),
483                None => builder.push_null(),
484            }
485        }
486        let vector = builder.to_vector();
487
488        Some(vector.to_arrow_array())
489    }
490
491    /// Returns min/max values of specific non-tag columns.
492    /// Returns None if the column does not have statistics.
493    fn column_values(
494        row_groups: &[impl Borrow<RowGroupMetaData>],
495        column: &ColumnMetadata,
496        column_index: usize,
497        is_min: bool,
498    ) -> Option<ArrayRef> {
499        let null_scalar: ScalarValue = column
500            .column_schema
501            .data_type
502            .as_arrow_type()
503            .try_into()
504            .ok()?;
505        let scalar_values = row_groups
506            .iter()
507            .map(|meta| {
508                let stats = meta.borrow().column(column_index).statistics()?;
509                match stats {
510                    Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
511                        *s.min_opt()?
512                    } else {
513                        *s.max_opt()?
514                    }))),
515                    Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
516                        *s.min_opt()?
517                    } else {
518                        *s.max_opt()?
519                    }))),
520                    Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
521                        *s.min_opt()?
522                    } else {
523                        *s.max_opt()?
524                    }))),
525
526                    Statistics::Int96(_) => None,
527                    Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
528                        *s.min_opt()?
529                    } else {
530                        *s.max_opt()?
531                    }))),
532                    Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
533                        *s.min_opt()?
534                    } else {
535                        *s.max_opt()?
536                    }))),
537                    Statistics::ByteArray(s) => {
538                        let bytes = if is_min {
539                            s.min_bytes_opt()?
540                        } else {
541                            s.max_bytes_opt()?
542                        };
543                        let s = String::from_utf8(bytes.to_vec()).ok();
544                        Some(ScalarValue::Utf8(s))
545                    }
546
547                    Statistics::FixedLenByteArray(_) => None,
548                }
549            })
550            .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
551            .collect::<Vec<ScalarValue>>();
552        debug_assert_eq!(scalar_values.len(), row_groups.len());
553        ScalarValue::iter_to_array(scalar_values).ok()
554    }
555
556    /// Returns null counts of specific non-tag columns.
557    fn column_null_counts(
558        row_groups: &[impl Borrow<RowGroupMetaData>],
559        column_index: usize,
560    ) -> Option<ArrayRef> {
561        let values = row_groups.iter().map(|meta| {
562            let col = meta.borrow().column(column_index);
563            let stat = col.statistics()?;
564            stat.null_count_opt()
565        });
566        Some(Arc::new(UInt64Array::from_iter(values)))
567    }
568
569    /// Index in SST of the primary key.
570    fn primary_key_position(&self) -> usize {
571        self.arrow_schema.fields.len() - 3
572    }
573
574    /// Index in SST of the time index.
575    fn time_index_position(&self) -> usize {
576        self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
577    }
578
579    /// Index of a field column by its column id.
580    pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
581        self.field_id_to_projected_index.get(&column_id).copied()
582    }
583}
584
585/// Values of column statistics of the SST.
586///
587/// It also distinguishes the case that a column is not found and
588/// the column exists but has no statistics.
589pub enum StatValues {
590    /// Values of each row group.
591    Values(ArrayRef),
592    /// No such column.
593    NoColumn,
594    /// Column exists but has no statistics.
595    NoStats,
596}
597
598impl StatValues {
599    /// Creates a new `StatValues` instance from optional statistics.
600    pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
601        match stats {
602            Some(stats) => StatValues::Values(stats),
603            None => StatValues::NoStats,
604        }
605    }
606}
607
608#[cfg(test)]
609impl ReadFormat {
610    /// Creates a helper with existing `metadata` and all columns.
611    pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
612        Self::new(
613            Arc::clone(&metadata),
614            metadata.column_metadatas.iter().map(|c| c.column_id),
615        )
616    }
617}
618
619/// Compute offsets of different primary keys in the array.
620fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
621    if pk_dict_array.is_empty() {
622        return Ok(Vec::new());
623    }
624
625    // Init offsets.
626    let mut offsets = vec![0];
627    let keys = pk_dict_array.keys();
628    // We know that primary keys are always not null so we iterate `keys.values()` directly.
629    let pk_indices = keys.values();
630    for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
631        // Compare each key with next key
632        if *key != pk_indices[i + 1] {
633            // We meet a new key, push the next index as end of the offset.
634            offsets.push(i + 1);
635        }
636    }
637    offsets.push(keys.len());
638
639    Ok(offsets)
640}
641
642/// Creates a new array for specific `primary_key`.
643fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
644    let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
645    let keys = UInt32Array::from_value(0, num_rows);
646
647    // Safety: The key index is valid.
648    Arc::new(DictionaryArray::new(keys, values))
649}
650
651/// Gets the min/max time index of the row group from the parquet meta.
652/// It assumes the parquet is created by the mito engine.
653pub(crate) fn parquet_row_group_time_range(
654    file_meta: &FileMeta,
655    parquet_meta: &ParquetMetaData,
656    row_group_idx: usize,
657) -> Option<FileTimeRange> {
658    let row_group_meta = parquet_meta.row_group(row_group_idx);
659    let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
660    assert!(
661        num_columns >= FIXED_POS_COLUMN_NUM,
662        "file only has {} columns",
663        num_columns
664    );
665    let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
666
667    let stats = row_group_meta.column(time_index_pos).statistics()?;
668    // The physical type for the timestamp should be i64.
669    let (min, max) = match stats {
670        Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
671        Statistics::Int32(_)
672        | Statistics::Boolean(_)
673        | Statistics::Int96(_)
674        | Statistics::Float(_)
675        | Statistics::Double(_)
676        | Statistics::ByteArray(_)
677        | Statistics::FixedLenByteArray(_) => {
678            common_telemetry::warn!(
679                "Invalid statistics {:?} for time index in parquet in {}",
680                stats,
681                file_meta.file_id
682            );
683            return None;
684        }
685    };
686
687    debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
688    debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
689    let unit = file_meta.time_range.0.unit();
690
691    Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
692}
693
694#[cfg(test)]
695mod tests {
696    use api::v1::OpType;
697    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
698    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
699    use datatypes::prelude::ConcreteDataType;
700    use datatypes::schema::ColumnSchema;
701    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
702    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
703    use store_api::storage::RegionId;
704
705    use super::*;
706
707    const TEST_SEQUENCE: u64 = 1;
708    const TEST_OP_TYPE: u8 = OpType::Put as u8;
709
710    fn build_test_region_metadata() -> RegionMetadataRef {
711        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
712        builder
713            .push_column_metadata(ColumnMetadata {
714                column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
715                semantic_type: SemanticType::Tag,
716                column_id: 1,
717            })
718            .push_column_metadata(ColumnMetadata {
719                column_schema: ColumnSchema::new(
720                    "field1",
721                    ConcreteDataType::int64_datatype(),
722                    true,
723                ),
724                semantic_type: SemanticType::Field,
725                column_id: 4, // We change the order of fields columns.
726            })
727            .push_column_metadata(ColumnMetadata {
728                column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
729                semantic_type: SemanticType::Tag,
730                column_id: 3,
731            })
732            .push_column_metadata(ColumnMetadata {
733                column_schema: ColumnSchema::new(
734                    "field0",
735                    ConcreteDataType::int64_datatype(),
736                    true,
737                ),
738                semantic_type: SemanticType::Field,
739                column_id: 2,
740            })
741            .push_column_metadata(ColumnMetadata {
742                column_schema: ColumnSchema::new(
743                    "ts",
744                    ConcreteDataType::timestamp_millisecond_datatype(),
745                    false,
746                ),
747                semantic_type: SemanticType::Timestamp,
748                column_id: 5,
749            })
750            .primary_key(vec![1, 3]);
751        Arc::new(builder.build().unwrap())
752    }
753
754    fn build_test_arrow_schema() -> SchemaRef {
755        let fields = vec![
756            Field::new("field1", ArrowDataType::Int64, true),
757            Field::new("field0", ArrowDataType::Int64, true),
758            Field::new(
759                "ts",
760                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
761                false,
762            ),
763            Field::new(
764                "__primary_key",
765                ArrowDataType::Dictionary(
766                    Box::new(ArrowDataType::UInt32),
767                    Box::new(ArrowDataType::Binary),
768                ),
769                false,
770            ),
771            Field::new("__sequence", ArrowDataType::UInt64, false),
772            Field::new("__op_type", ArrowDataType::UInt8, false),
773        ];
774        Arc::new(Schema::new(fields))
775    }
776
777    fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
778        let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
779        let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
780        let sequences = Arc::new(UInt64Vector::from_vec(vec![TEST_SEQUENCE; num_rows]));
781        let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
782        let fields = vec![
783            BatchColumn {
784                column_id: 4,
785                data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
786            }, // field1
787            BatchColumn {
788                column_id: 2,
789                data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
790            }, // field0
791        ];
792
793        BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
794            .with_fields(fields)
795            .build()
796            .unwrap()
797    }
798
799    #[test]
800    fn test_to_sst_arrow_schema() {
801        let metadata = build_test_region_metadata();
802        let write_format = WriteFormat::new(metadata);
803        assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
804    }
805
806    #[test]
807    fn test_new_primary_key_array() {
808        let array = new_primary_key_array(b"test", 3);
809        let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
810        assert_eq!(&expect, &array);
811    }
812
813    fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
814        let values = Arc::new(BinaryArray::from_iter_values(
815            pk_row_nums.iter().map(|v| &v.0),
816        ));
817        let mut keys = vec![];
818        for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
819            keys.extend(std::iter::repeat_n(index as u32, num_rows));
820        }
821        let keys = UInt32Array::from(keys);
822        Arc::new(DictionaryArray::new(keys, values))
823    }
824
825    #[test]
826    fn test_convert_batch() {
827        let metadata = build_test_region_metadata();
828        let write_format = WriteFormat::new(metadata);
829
830        let num_rows = 4;
831        let batch = new_batch(b"test", 1, 2, num_rows);
832        let columns: Vec<ArrayRef> = vec![
833            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
834            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
835            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
836            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
837            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), // sequence
838            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
839        ];
840        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
841
842        let actual = write_format.convert_batch(&batch).unwrap();
843        assert_eq!(expect_record, actual);
844    }
845
846    #[test]
847    fn test_convert_batch_with_override_sequence() {
848        let metadata = build_test_region_metadata();
849        let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411));
850
851        let num_rows = 4;
852        let batch = new_batch(b"test", 1, 2, num_rows);
853        let columns: Vec<ArrayRef> = vec![
854            Arc::new(Int64Array::from(vec![2; num_rows])), // field1
855            Arc::new(Int64Array::from(vec![3; num_rows])), // field0
856            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), // ts
857            build_test_pk_array(&[(b"test".to_vec(), num_rows)]), // primary key
858            Arc::new(UInt64Array::from(vec![415411; num_rows])), // sequence
859            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), // op type
860        ];
861        let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
862
863        let actual = write_format.convert_batch(&batch).unwrap();
864        assert_eq!(expect_record, actual);
865    }
866
867    #[test]
868    fn test_projection_indices() {
869        let metadata = build_test_region_metadata();
870        // Only read tag1
871        let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
872        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
873        // Only read field1
874        let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
875        assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
876        // Only read ts
877        let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
878        assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
879        // Read field0, tag0, ts
880        let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
881        assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
882    }
883
884    #[test]
885    fn test_empty_primary_key_offsets() {
886        let array = build_test_pk_array(&[]);
887        assert!(primary_key_offsets(&array).unwrap().is_empty());
888    }
889
890    #[test]
891    fn test_primary_key_offsets_one_series() {
892        let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
893        assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
894
895        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
896        assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
897
898        let array = build_test_pk_array(&[
899            (b"one".to_vec(), 1),
900            (b"two".to_vec(), 1),
901            (b"three".to_vec(), 1),
902        ]);
903        assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
904    }
905
906    #[test]
907    fn test_primary_key_offsets_multi_series() {
908        let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
909        assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
910
911        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
912        assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
913
914        let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
915        assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
916    }
917
918    #[test]
919    fn test_convert_empty_record_batch() {
920        let metadata = build_test_region_metadata();
921        let arrow_schema = build_test_arrow_schema();
922        let column_ids: Vec<_> = metadata
923            .column_metadatas
924            .iter()
925            .map(|col| col.column_id)
926            .collect();
927        let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
928        assert_eq!(arrow_schema, *read_format.arrow_schema());
929
930        let record_batch = RecordBatch::new_empty(arrow_schema);
931        let mut batches = VecDeque::new();
932        read_format
933            .convert_record_batch(&record_batch, &mut batches)
934            .unwrap();
935        assert!(batches.is_empty());
936    }
937
938    #[test]
939    fn test_convert_record_batch() {
940        let metadata = build_test_region_metadata();
941        let column_ids: Vec<_> = metadata
942            .column_metadatas
943            .iter()
944            .map(|col| col.column_id)
945            .collect();
946        let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
947
948        let columns: Vec<ArrayRef> = vec![
949            Arc::new(Int64Array::from(vec![1, 1, 10, 10])), // field1
950            Arc::new(Int64Array::from(vec![2, 2, 11, 11])), // field0
951            Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), // ts
952            build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), // primary key
953            Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), // sequence
954            Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), // op type
955        ];
956        let arrow_schema = build_test_arrow_schema();
957        let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
958        let mut batches = VecDeque::new();
959        read_format
960            .convert_record_batch(&record_batch, &mut batches)
961            .unwrap();
962
963        assert_eq!(
964            vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
965            batches.into_iter().collect::<Vec<_>>(),
966        );
967    }
968}