Skip to main content

mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use arrow_schema::DataType;
22use common_base::readable_size::ReadableSize;
23use datatypes::arrow::datatypes::{
24    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
25};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::ConcreteDataType;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use serde::{Deserialize, Serialize};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::metadata::RegionMetadata;
32use store_api::storage::consts::{
33    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
34};
35
36use crate::sst::parquet::flat_format::time_index_column_index;
37
38pub mod file;
39pub mod file_purger;
40pub mod file_ref;
41pub mod index;
42pub mod location;
43pub mod parquet;
44pub(crate) mod version;
45
46/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
47pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
48
49/// Default number of concurrent write, it only works on object store backend(e.g., S3).
50pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
51
52/// Format type of the SST file.
53#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
54#[serde(rename_all = "snake_case")]
55#[strum(serialize_all = "snake_case")]
56pub enum FormatType {
57    /// Parquet with primary key encoded.
58    #[default]
59    PrimaryKey,
60    /// Flat Parquet format.
61    Flat,
62}
63
64/// Iceberg-compatible column field ID key stored in Parquet column metadata.
65pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
66
67/// Adds `PARQUET:field_id` metadata to an Arrow field.
68pub fn with_field_id(mut field: Field, column_id: u32) -> Field {
69    field
70        .metadata_mut()
71        .insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string());
72    field
73}
74
75/// Parquet field ID base for internal columns (__primary_key, __sequence, __op_type).
76/// Uses bit 30 to distinguish from user column IDs and fit in positive i32 range.
77pub(crate) const INTERNAL_PARQUET_FIELD_ID_BASE: u32 = 1 << 30;
78
79/// Parquet field ID for the __primary_key column.
80pub(crate) const PRIMARY_KEY_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE;
81/// Parquet field ID for the __sequence column.
82pub(crate) const SEQUENCE_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE + 1;
83/// Parquet field ID for the __op_type column.
84pub(crate) const OP_TYPE_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE + 2;
85
86/// Gets the arrow schema to store in parquet.
87pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
88    let fields = Fields::from_iter(
89        metadata
90            .schema
91            .arrow_schema()
92            .fields()
93            .iter()
94            .zip(&metadata.column_metadatas)
95            .filter_map(|(field, column_meta)| {
96                if column_meta.semantic_type == SemanticType::Field {
97                    Some(Arc::new(with_field_id(
98                        (**field).clone(),
99                        column_meta.column_id,
100                    )))
101                } else {
102                    // We have fixed positions for tags (primary key) and time index.
103                    None
104                }
105            })
106            .chain([Arc::new(with_field_id(
107                (*metadata.time_index_field()).clone(),
108                metadata.time_index_column().column_id,
109            ))])
110            .chain(internal_fields()),
111    );
112
113    Arc::new(Schema::new(fields))
114}
115
116/// Options of flat schema.
117pub struct FlatSchemaOptions {
118    /// Whether to store primary key columns additionally instead of an encoded column.
119    pub raw_pk_columns: bool,
120    /// Whether to use dictionary encoding for string primary key columns
121    /// when storing primary key columns.
122    /// Only takes effect when `raw_pk_columns` is true.
123    pub string_pk_use_dict: bool,
124    /// The column's concretized JSON types, to be set into Arrow schema.
125    /// Otherwise it's empty struct in the Arrow schema.
126    pub concretized_json_types: HashMap<String, DataType>,
127}
128
129impl Default for FlatSchemaOptions {
130    fn default() -> Self {
131        Self {
132            raw_pk_columns: true,
133            string_pk_use_dict: true,
134            concretized_json_types: HashMap::new(),
135        }
136    }
137}
138
139impl FlatSchemaOptions {
140    /// Creates a options according to the primary key encoding.
141    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
142        if encoding == PrimaryKeyEncoding::Dense {
143            Self::default()
144        } else {
145            Self {
146                raw_pk_columns: false,
147                string_pk_use_dict: false,
148                concretized_json_types: HashMap::new(),
149            }
150        }
151    }
152}
153
154/// Gets the arrow schema to store in parquet.
155///
156/// The schema is:
157/// ```text
158/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type
159/// ```
160///
161/// # Panics
162/// Panics if the metadata is invalid.
163pub fn to_flat_sst_arrow_schema(
164    metadata: &RegionMetadata,
165    options: &FlatSchemaOptions,
166) -> SchemaRef {
167    let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
168    let mut fields = Vec::with_capacity(num_fields);
169    let schema = metadata.schema.arrow_schema();
170    if options.raw_pk_columns {
171        for pk_id in &metadata.primary_key {
172            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
173            let column_id = metadata.column_metadatas[pk_index].column_id;
174            if options.string_pk_use_dict {
175                let old_field = &schema.fields[pk_index];
176                let new_field = tag_maybe_to_dictionary_field(
177                    &metadata.column_metadatas[pk_index].column_schema.data_type,
178                    old_field,
179                );
180                let new_field = concretize_json_type(new_field, options);
181                fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
182            }
183        }
184    }
185    let remaining_fields = schema
186        .fields()
187        .iter()
188        .zip(&metadata.column_metadatas)
189        .filter_map(|(field, column_meta)| {
190            if column_meta.semantic_type == SemanticType::Field {
191                let field = concretize_json_type(field.clone(), options);
192                Some(Arc::new(with_field_id(
193                    Arc::unwrap_or_clone(field),
194                    column_meta.column_id,
195                )))
196            } else {
197                None
198            }
199        })
200        .chain([Arc::new(with_field_id(
201            (*metadata.time_index_field()).clone(),
202            metadata.time_index_column().column_id,
203        ))])
204        .chain(internal_fields());
205    for field in remaining_fields {
206        fields.push(field);
207    }
208
209    Arc::new(Schema::new(fields))
210}
211
212fn concretize_json_type(field: Arc<Field>, options: &FlatSchemaOptions) -> Arc<Field> {
213    if let Some(data_type) = options.concretized_json_types.get(field.name()) {
214        let mut field = Arc::unwrap_or_clone(field);
215        field.set_data_type(data_type.clone());
216        Arc::new(field)
217    } else {
218        field
219    }
220}
221
222/// Returns the number of columns in the flat format.
223pub fn flat_sst_arrow_schema_column_num(
224    metadata: &RegionMetadata,
225    options: &FlatSchemaOptions,
226) -> usize {
227    if options.raw_pk_columns {
228        metadata.column_metadatas.len() + 3
229    } else {
230        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
231    }
232}
233
234/// Helper function to create a dictionary field from a field.
235fn to_dictionary_field(field: &Field) -> Field {
236    let mut new_field = Field::new_dictionary(
237        field.name(),
238        datatypes::arrow::datatypes::DataType::UInt32,
239        field.data_type().clone(),
240        field.is_nullable(),
241    );
242
243    // retain field_id metadata
244    if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
245        new_field
246            .metadata_mut()
247            .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
248    }
249
250    new_field
251}
252
253/// Helper function to create a dictionary field from a field if it is a string column.
254pub(crate) fn tag_maybe_to_dictionary_field(
255    data_type: &ConcreteDataType,
256    field: &Arc<Field>,
257) -> Arc<Field> {
258    if data_type.is_string() {
259        Arc::new(to_dictionary_field(field))
260    } else {
261        field.clone()
262    }
263}
264
265/// Fields for internal columns.
266pub(crate) fn internal_fields() -> [FieldRef; 3] {
267    // Internal columns are always not null.
268    [
269        Arc::new(with_field_id(
270            Field::new_dictionary(
271                PRIMARY_KEY_COLUMN_NAME,
272                ArrowDataType::UInt32,
273                ArrowDataType::Binary,
274                false,
275            ),
276            PRIMARY_KEY_PARQUET_FIELD_ID,
277        )),
278        Arc::new(with_field_id(
279            Field::new(SEQUENCE_COLUMN_NAME, ArrowDataType::UInt64, false),
280            SEQUENCE_PARQUET_FIELD_ID,
281        )),
282        Arc::new(with_field_id(
283            Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false),
284            OP_TYPE_PARQUET_FIELD_ID,
285        )),
286    ]
287}
288
289/// Returns a copy of `schema` with the `__primary_key` field replaced by a plain `Binary` field.
290pub(crate) fn override_pk_field_to_binary(schema: &SchemaRef) -> SchemaRef {
291    let new_fields = schema
292        .fields()
293        .iter()
294        .map(|field| {
295            if field.name() == PRIMARY_KEY_COLUMN_NAME {
296                let mut new_field = Field::new(
297                    PRIMARY_KEY_COLUMN_NAME,
298                    ArrowDataType::Binary,
299                    field.is_nullable(),
300                );
301                // Preserve the field_id metadata so parquet readers that require
302                // all columns to carry a field_id don't fail.
303                if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
304                    new_field
305                        .metadata_mut()
306                        .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
307                }
308                Arc::new(new_field)
309            } else {
310                field.clone()
311            }
312        })
313        .collect::<Vec<_>>();
314    Arc::new(Schema::new(new_fields))
315}
316
317/// Gets the estimated number of series from record batches.
318///
319/// This struct tracks the last timestamp value to detect series boundaries
320/// by observing when timestamps decrease (indicating a new series).
321#[derive(Default)]
322pub(crate) struct SeriesEstimator {
323    /// The last timestamp value seen
324    last_timestamp: Option<i64>,
325    /// The estimated number of series
326    series_count: u64,
327}
328
329impl SeriesEstimator {
330    /// Updates the estimator with a new record batch in flat format.
331    ///
332    /// This method examines the time index column to detect series boundaries.
333    pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
334        let batch_rows = record_batch.num_rows();
335        if batch_rows == 0 {
336            return;
337        }
338
339        let time_index_pos = time_index_column_index(record_batch.num_columns());
340        let timestamps = record_batch.column(time_index_pos);
341        let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
342            return;
343        };
344        let values = ts_values.values();
345
346        // Checks if there's a boundary between the last batch and this batch
347        if let Some(last_ts) = self.last_timestamp {
348            if values[0] <= last_ts {
349                self.series_count += 1;
350            }
351        } else {
352            // First batch, counts as first series
353            self.series_count = 1;
354        }
355
356        // Counts series boundaries within this batch.
357        for i in 0..batch_rows - 1 {
358            // We assumes the same timestamp as a new series, which is different from
359            // how we split batches.
360            if values[i] >= values[i + 1] {
361                self.series_count += 1;
362            }
363        }
364
365        // Updates the last timestamp
366        self.last_timestamp = Some(values[batch_rows - 1]);
367    }
368
369    /// Returns the estimated number of series.
370    pub(crate) fn finish(&mut self) -> u64 {
371        self.last_timestamp = None;
372        let count = self.series_count;
373        self.series_count = 0;
374
375        count
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use std::sync::Arc;
382
383    use datatypes::arrow::array::{
384        BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
385        UInt64Array,
386    };
387    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
388    use datatypes::arrow::record_batch::RecordBatch;
389
390    use super::*;
391
392    fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
393        // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type]
394        let num_cols = 4; // time_index + 3 internal columns
395        let time_index_pos = time_index_column_index(num_cols);
396        assert_eq!(time_index_pos, 0); // For 4 columns, time index should be at position 0
397
398        let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
399        let pk_array = Arc::new(DictionaryArray::new(
400            UInt32Array::from(vec![0; timestamps.len()]),
401            Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
402        ));
403        let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
404        let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
405
406        let schema = Arc::new(Schema::new(vec![
407            Field::new(
408                "time",
409                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
410                false,
411            ),
412            Field::new_dictionary(
413                "__primary_key",
414                ArrowDataType::UInt32,
415                ArrowDataType::Binary,
416                false,
417            ),
418            Field::new("__sequence", ArrowDataType::UInt64, false),
419            Field::new("__op_type", ArrowDataType::UInt8, false),
420        ]));
421
422        RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
423    }
424
425    #[test]
426    fn test_series_estimator_flat_empty_batch() {
427        let mut estimator = SeriesEstimator::default();
428        let record_batch = new_flat_record_batch(&[]);
429        estimator.update_flat(&record_batch);
430        assert_eq!(0, estimator.finish());
431    }
432
433    #[test]
434    fn test_series_estimator_flat_single_batch() {
435        let mut estimator = SeriesEstimator::default();
436        let record_batch = new_flat_record_batch(&[1, 2, 3]);
437        estimator.update_flat(&record_batch);
438        assert_eq!(1, estimator.finish());
439    }
440
441    #[test]
442    fn test_series_estimator_flat_series_boundary_within_batch() {
443        let mut estimator = SeriesEstimator::default();
444        // Timestamps decrease from 3 to 2, indicating a series boundary
445        let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
446        estimator.update_flat(&record_batch);
447        // Should detect boundary at position 3 (3 >= 2)
448        assert_eq!(2, estimator.finish());
449    }
450
451    #[test]
452    fn test_series_estimator_flat_multiple_boundaries_within_batch() {
453        let mut estimator = SeriesEstimator::default();
454        // Multiple series boundaries: 5>=4, 6>=3
455        let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
456        estimator.update_flat(&record_batch);
457        assert_eq!(3, estimator.finish());
458    }
459
460    #[test]
461    fn test_series_estimator_flat_equal_timestamps() {
462        let mut estimator = SeriesEstimator::default();
463        // Equal timestamps are considered as new series
464        let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
465        estimator.update_flat(&record_batch);
466        // Boundaries at: 2>=2, 3>=3, 3>=3
467        assert_eq!(4, estimator.finish());
468    }
469
470    #[test]
471    fn test_series_estimator_flat_multiple_batches_continuation() {
472        let mut estimator = SeriesEstimator::default();
473
474        // First batch: timestamps 1, 2, 3
475        let batch1 = new_flat_record_batch(&[1, 2, 3]);
476        estimator.update_flat(&batch1);
477
478        // Second batch: timestamps 4, 5, 6 (continuation)
479        let batch2 = new_flat_record_batch(&[4, 5, 6]);
480        estimator.update_flat(&batch2);
481
482        assert_eq!(1, estimator.finish());
483    }
484
485    #[test]
486    fn test_series_estimator_flat_multiple_batches_new_series() {
487        let mut estimator = SeriesEstimator::default();
488
489        // First batch: timestamps 1, 2, 3
490        let batch1 = new_flat_record_batch(&[1, 2, 3]);
491        estimator.update_flat(&batch1);
492
493        // Second batch: timestamps 2, 3, 4 (goes back to 2, new series)
494        let batch2 = new_flat_record_batch(&[2, 3, 4]);
495        estimator.update_flat(&batch2);
496
497        assert_eq!(2, estimator.finish());
498    }
499
500    #[test]
501    fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
502        let mut estimator = SeriesEstimator::default();
503
504        // First batch ending at 5
505        let batch1 = new_flat_record_batch(&[1, 2, 5]);
506        estimator.update_flat(&batch1);
507
508        // Second batch starting at 5 (equal timestamp, new series)
509        let batch2 = new_flat_record_batch(&[5, 6, 7]);
510        estimator.update_flat(&batch2);
511
512        assert_eq!(2, estimator.finish());
513    }
514
515    #[test]
516    fn test_series_estimator_flat_mixed_batches() {
517        let mut estimator = SeriesEstimator::default();
518
519        // Batch 1: single series [10, 20, 30]
520        let batch1 = new_flat_record_batch(&[10, 20, 30]);
521        estimator.update_flat(&batch1);
522
523        // Batch 2: starts new series [5, 15], boundary within batch [15, 10, 25]
524        let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
525        estimator.update_flat(&batch2);
526
527        // Batch 3: continues from 25 to [30, 35]
528        let batch3 = new_flat_record_batch(&[30, 35]);
529        estimator.update_flat(&batch3);
530
531        // Expected: 1 (batch1) + 1 (batch2 start) + 1 (within batch2) = 3
532        assert_eq!(3, estimator.finish());
533    }
534
535    #[test]
536    fn test_series_estimator_flat_descending_timestamps() {
537        let mut estimator = SeriesEstimator::default();
538        // Strictly descending timestamps - each pair creates a boundary
539        let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
540        estimator.update_flat(&record_batch);
541        // Boundaries: 10>=9, 9>=8, 8>=7, 7>=6 = 4 boundaries + 1 initial = 5 series
542        assert_eq!(5, estimator.finish());
543    }
544
545    #[test]
546    fn test_series_estimator_flat_finish_resets_state() {
547        let mut estimator = SeriesEstimator::default();
548
549        let batch1 = new_flat_record_batch(&[1, 2, 3]);
550        estimator.update_flat(&batch1);
551
552        assert_eq!(1, estimator.finish());
553
554        // After finish, state should be reset
555        let batch2 = new_flat_record_batch(&[4, 5, 6]);
556        estimator.update_flat(&batch2);
557
558        assert_eq!(1, estimator.finish());
559    }
560}