mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::read::Batch;
35use crate::sst::parquet::flat_format::time_index_column_index;
36
37pub mod file;
38pub mod file_purger;
39pub mod file_ref;
40pub mod index;
41pub mod location;
42pub mod parquet;
43pub(crate) mod version;
44
45/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
46pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
47
48/// Default number of concurrent write, it only works on object store backend(e.g., S3).
49pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
50
51/// Format type of the SST file.
52#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
53#[serde(rename_all = "snake_case")]
54#[strum(serialize_all = "snake_case")]
55pub enum FormatType {
56    /// Parquet with primary key encoded.
57    #[default]
58    PrimaryKey,
59    /// Flat Parquet format.
60    Flat,
61}
62
63/// Gets the arrow schema to store in parquet.
64pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
65    let fields = Fields::from_iter(
66        metadata
67            .schema
68            .arrow_schema()
69            .fields()
70            .iter()
71            .zip(&metadata.column_metadatas)
72            .filter_map(|(field, column_meta)| {
73                if column_meta.semantic_type == SemanticType::Field {
74                    Some(field.clone())
75                } else {
76                    // We have fixed positions for tags (primary key) and time index.
77                    None
78                }
79            })
80            .chain([metadata.time_index_field()])
81            .chain(internal_fields()),
82    );
83
84    Arc::new(Schema::new(fields))
85}
86
87/// Options of flat schema.
88pub struct FlatSchemaOptions {
89    /// Whether to store primary key columns additionally instead of an encoded column.
90    pub raw_pk_columns: bool,
91    /// Whether to use dictionary encoding for string primary key columns
92    /// when storing primary key columns.
93    /// Only takes effect when `raw_pk_columns` is true.
94    pub string_pk_use_dict: bool,
95}
96
97impl Default for FlatSchemaOptions {
98    fn default() -> Self {
99        Self {
100            raw_pk_columns: true,
101            string_pk_use_dict: true,
102        }
103    }
104}
105
106impl FlatSchemaOptions {
107    /// Creates a options according to the primary key encoding.
108    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
109        if encoding == PrimaryKeyEncoding::Dense {
110            Self::default()
111        } else {
112            Self {
113                raw_pk_columns: false,
114                string_pk_use_dict: false,
115            }
116        }
117    }
118}
119
120/// Gets the arrow schema to store in parquet.
121///
122/// The schema is:
123/// ```text
124/// primary key columns, field columns, time index, __prmary_key, __sequence, __op_type
125/// ```
126///
127/// # Panics
128/// Panics if the metadata is invalid.
129pub fn to_flat_sst_arrow_schema(
130    metadata: &RegionMetadata,
131    options: &FlatSchemaOptions,
132) -> SchemaRef {
133    let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
134    let mut fields = Vec::with_capacity(num_fields);
135    let schema = metadata.schema.arrow_schema();
136    if options.raw_pk_columns {
137        for pk_id in &metadata.primary_key {
138            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
139            if options.string_pk_use_dict {
140                let old_field = &schema.fields[pk_index];
141                let new_field = tag_maybe_to_dictionary_field(
142                    &metadata.column_metadatas[pk_index].column_schema.data_type,
143                    old_field,
144                );
145                fields.push(new_field);
146            }
147        }
148    }
149    let remaining_fields = schema
150        .fields()
151        .iter()
152        .zip(&metadata.column_metadatas)
153        .filter_map(|(field, column_meta)| {
154            if column_meta.semantic_type == SemanticType::Field {
155                Some(field.clone())
156            } else {
157                None
158            }
159        })
160        .chain([metadata.time_index_field()])
161        .chain(internal_fields());
162    for field in remaining_fields {
163        fields.push(field);
164    }
165
166    Arc::new(Schema::new(fields))
167}
168
169/// Returns the number of columns in the flat format.
170pub fn flat_sst_arrow_schema_column_num(
171    metadata: &RegionMetadata,
172    options: &FlatSchemaOptions,
173) -> usize {
174    if options.raw_pk_columns {
175        metadata.column_metadatas.len() + 3
176    } else {
177        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
178    }
179}
180
181/// Helper function to create a dictionary field from a field.
182fn to_dictionary_field(field: &Field) -> Field {
183    Field::new_dictionary(
184        field.name(),
185        datatypes::arrow::datatypes::DataType::UInt32,
186        field.data_type().clone(),
187        field.is_nullable(),
188    )
189}
190
191/// Helper function to create a dictionary field from a field if it is a string column.
192pub(crate) fn tag_maybe_to_dictionary_field(
193    data_type: &ConcreteDataType,
194    field: &Arc<Field>,
195) -> Arc<Field> {
196    if data_type.is_string() {
197        Arc::new(to_dictionary_field(field))
198    } else {
199        field.clone()
200    }
201}
202
203/// Fields for internal columns.
204pub(crate) fn internal_fields() -> [FieldRef; 3] {
205    // Internal columns are always not null.
206    [
207        Arc::new(Field::new_dictionary(
208            PRIMARY_KEY_COLUMN_NAME,
209            ArrowDataType::UInt32,
210            ArrowDataType::Binary,
211            false,
212        )),
213        Arc::new(Field::new(
214            SEQUENCE_COLUMN_NAME,
215            ArrowDataType::UInt64,
216            false,
217        )),
218        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
219    ]
220}
221
222/// Gets the arrow schema to store in parquet.
223pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
224    let fields = Fields::from_iter(
225        metadata
226            .schema
227            .arrow_schema()
228            .fields()
229            .iter()
230            .cloned()
231            .chain(plain_internal_fields()),
232    );
233
234    Arc::new(Schema::new(fields))
235}
236
237/// Fields for internal columns.
238fn plain_internal_fields() -> [FieldRef; 2] {
239    // Internal columns are always not null.
240    [
241        Arc::new(Field::new(
242            SEQUENCE_COLUMN_NAME,
243            ArrowDataType::UInt64,
244            false,
245        )),
246        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
247    ]
248}
249
250/// Gets the estimated number of series from record batches.
251///
252/// This struct tracks the last timestamp value to detect series boundaries
253/// by observing when timestamps decrease (indicating a new series).
254#[derive(Default)]
255pub(crate) struct SeriesEstimator {
256    /// The last timestamp value seen
257    last_timestamp: Option<i64>,
258    /// The estimated number of series
259    series_count: u64,
260}
261
262impl SeriesEstimator {
263    /// Updates the estimator with a new Batch.
264    ///
265    /// Since each Batch contains only one series, this increments the series count
266    /// and updates the last timestamp.
267    pub(crate) fn update(&mut self, batch: &Batch) {
268        let Some(last_ts) = batch.last_timestamp() else {
269            return;
270        };
271
272        // Checks if there's a boundary between the last batch and this batch
273        if let Some(prev_last_ts) = self.last_timestamp {
274            // If the first timestamp of this batch is less than the last timestamp
275            // we've seen, it indicates a new series
276            if let Some(first_ts) = batch.first_timestamp()
277                && first_ts.value() <= prev_last_ts
278            {
279                self.series_count += 1;
280            }
281        } else {
282            // First batch, counts as first series
283            self.series_count = 1;
284        }
285
286        // Updates the last timestamp
287        self.last_timestamp = Some(last_ts.value());
288    }
289
290    /// Updates the estimator with a new record batch in flat format.
291    ///
292    /// This method examines the time index column to detect series boundaries.
293    pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
294        let batch_rows = record_batch.num_rows();
295        if batch_rows == 0 {
296            return;
297        }
298
299        let time_index_pos = time_index_column_index(record_batch.num_columns());
300        let timestamps = record_batch.column(time_index_pos);
301        let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
302            return;
303        };
304        let values = ts_values.values();
305
306        // Checks if there's a boundary between the last batch and this batch
307        if let Some(last_ts) = self.last_timestamp {
308            if values[0] <= last_ts {
309                self.series_count += 1;
310            }
311        } else {
312            // First batch, counts as first series
313            self.series_count = 1;
314        }
315
316        // Counts series boundaries within this batch.
317        for i in 0..batch_rows - 1 {
318            // We assumes the same timestamp as a new series, which is different from
319            // how we split batches.
320            if values[i] >= values[i + 1] {
321                self.series_count += 1;
322            }
323        }
324
325        // Updates the last timestamp
326        self.last_timestamp = Some(values[batch_rows - 1]);
327    }
328
329    /// Returns the estimated number of series.
330    pub(crate) fn finish(&mut self) -> u64 {
331        self.last_timestamp = None;
332        let count = self.series_count;
333        self.series_count = 0;
334
335        count
336    }
337}
338
339#[cfg(test)]
340mod tests {
341    use std::sync::Arc;
342
343    use api::v1::OpType;
344    use datatypes::arrow::array::{
345        BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt8Builder,
346        UInt32Array, UInt64Array,
347    };
348    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
349    use datatypes::arrow::record_batch::RecordBatch;
350
351    use super::*;
352    use crate::read::{Batch, BatchBuilder};
353
354    fn new_batch(
355        primary_key: &[u8],
356        timestamps: &[i64],
357        sequences: &[u64],
358        op_types: &[OpType],
359    ) -> Batch {
360        let timestamps = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
361        let sequences = Arc::new(UInt64Array::from(sequences.to_vec()));
362        let mut op_type_builder = UInt8Builder::with_capacity(op_types.len());
363        for op_type in op_types {
364            op_type_builder.append_value(*op_type as u8);
365        }
366        let op_types = Arc::new(UInt8Array::from(
367            op_types.iter().map(|op| *op as u8).collect::<Vec<_>>(),
368        ));
369
370        let mut builder = BatchBuilder::new(primary_key.to_vec());
371        builder
372            .timestamps_array(timestamps)
373            .unwrap()
374            .sequences_array(sequences)
375            .unwrap()
376            .op_types_array(op_types)
377            .unwrap();
378        builder.build().unwrap()
379    }
380
381    fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
382        // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type]
383        let num_cols = 4; // time_index + 3 internal columns
384        let time_index_pos = time_index_column_index(num_cols);
385        assert_eq!(time_index_pos, 0); // For 4 columns, time index should be at position 0
386
387        let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
388        let pk_array = Arc::new(DictionaryArray::new(
389            UInt32Array::from(vec![0; timestamps.len()]),
390            Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
391        ));
392        let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
393        let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
394
395        let schema = Arc::new(Schema::new(vec![
396            Field::new(
397                "time",
398                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
399                false,
400            ),
401            Field::new_dictionary(
402                "__primary_key",
403                ArrowDataType::UInt32,
404                ArrowDataType::Binary,
405                false,
406            ),
407            Field::new("__sequence", ArrowDataType::UInt64, false),
408            Field::new("__op_type", ArrowDataType::UInt8, false),
409        ]));
410
411        RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
412    }
413
414    #[test]
415    fn test_series_estimator_empty_batch() {
416        let mut estimator = SeriesEstimator::default();
417        let batch = new_batch(b"test", &[], &[], &[]);
418        estimator.update(&batch);
419        assert_eq!(0, estimator.finish());
420    }
421
422    #[test]
423    fn test_series_estimator_single_batch() {
424        let mut estimator = SeriesEstimator::default();
425        let batch = new_batch(
426            b"test",
427            &[1, 2, 3],
428            &[1, 2, 3],
429            &[OpType::Put, OpType::Put, OpType::Put],
430        );
431        estimator.update(&batch);
432        assert_eq!(1, estimator.finish());
433    }
434
435    #[test]
436    fn test_series_estimator_multiple_batches_same_series() {
437        let mut estimator = SeriesEstimator::default();
438
439        // First batch with timestamps 1, 2, 3
440        let batch1 = new_batch(
441            b"test",
442            &[1, 2, 3],
443            &[1, 2, 3],
444            &[OpType::Put, OpType::Put, OpType::Put],
445        );
446        estimator.update(&batch1);
447
448        // Second batch with timestamps 4, 5, 6 (continuation)
449        let batch2 = new_batch(
450            b"test",
451            &[4, 5, 6],
452            &[4, 5, 6],
453            &[OpType::Put, OpType::Put, OpType::Put],
454        );
455        estimator.update(&batch2);
456
457        assert_eq!(1, estimator.finish());
458    }
459
460    #[test]
461    fn test_series_estimator_new_series_detected() {
462        let mut estimator = SeriesEstimator::default();
463
464        // First batch with timestamps 1, 2, 3
465        let batch1 = new_batch(
466            b"pk0",
467            &[1, 2, 3],
468            &[1, 2, 3],
469            &[OpType::Put, OpType::Put, OpType::Put],
470        );
471        estimator.update(&batch1);
472
473        // Second batch with timestamps 2, 3, 4 (timestamp goes back, new series)
474        let batch2 = new_batch(
475            b"pk1",
476            &[2, 3, 4],
477            &[4, 5, 6],
478            &[OpType::Put, OpType::Put, OpType::Put],
479        );
480        estimator.update(&batch2);
481
482        assert_eq!(2, estimator.finish());
483    }
484
485    #[test]
486    fn test_series_estimator_equal_timestamp_boundary() {
487        let mut estimator = SeriesEstimator::default();
488
489        // First batch ending at timestamp 5
490        let batch1 = new_batch(
491            b"test",
492            &[1, 2, 5],
493            &[1, 2, 3],
494            &[OpType::Put, OpType::Put, OpType::Put],
495        );
496        estimator.update(&batch1);
497
498        // Second batch starting at timestamp 5 (equal, indicates new series)
499        let batch2 = new_batch(
500            b"test",
501            &[5, 6, 7],
502            &[4, 5, 6],
503            &[OpType::Put, OpType::Put, OpType::Put],
504        );
505        estimator.update(&batch2);
506
507        assert_eq!(2, estimator.finish());
508    }
509
510    #[test]
511    fn test_series_estimator_finish_resets_state() {
512        let mut estimator = SeriesEstimator::default();
513
514        let batch1 = new_batch(
515            b"test",
516            &[1, 2, 3],
517            &[1, 2, 3],
518            &[OpType::Put, OpType::Put, OpType::Put],
519        );
520        estimator.update(&batch1);
521
522        assert_eq!(1, estimator.finish());
523
524        // After finish, state should be reset
525        let batch2 = new_batch(
526            b"test",
527            &[4, 5, 6],
528            &[4, 5, 6],
529            &[OpType::Put, OpType::Put, OpType::Put],
530        );
531        estimator.update(&batch2);
532
533        assert_eq!(1, estimator.finish());
534    }
535
536    #[test]
537    fn test_series_estimator_flat_empty_batch() {
538        let mut estimator = SeriesEstimator::default();
539        let record_batch = new_flat_record_batch(&[]);
540        estimator.update_flat(&record_batch);
541        assert_eq!(0, estimator.finish());
542    }
543
544    #[test]
545    fn test_series_estimator_flat_single_batch() {
546        let mut estimator = SeriesEstimator::default();
547        let record_batch = new_flat_record_batch(&[1, 2, 3]);
548        estimator.update_flat(&record_batch);
549        assert_eq!(1, estimator.finish());
550    }
551
552    #[test]
553    fn test_series_estimator_flat_series_boundary_within_batch() {
554        let mut estimator = SeriesEstimator::default();
555        // Timestamps decrease from 3 to 2, indicating a series boundary
556        let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
557        estimator.update_flat(&record_batch);
558        // Should detect boundary at position 3 (3 >= 2)
559        assert_eq!(2, estimator.finish());
560    }
561
562    #[test]
563    fn test_series_estimator_flat_multiple_boundaries_within_batch() {
564        let mut estimator = SeriesEstimator::default();
565        // Multiple series boundaries: 5>=4, 6>=3
566        let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
567        estimator.update_flat(&record_batch);
568        assert_eq!(3, estimator.finish());
569    }
570
571    #[test]
572    fn test_series_estimator_flat_equal_timestamps() {
573        let mut estimator = SeriesEstimator::default();
574        // Equal timestamps are considered as new series
575        let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
576        estimator.update_flat(&record_batch);
577        // Boundaries at: 2>=2, 3>=3, 3>=3
578        assert_eq!(4, estimator.finish());
579    }
580
581    #[test]
582    fn test_series_estimator_flat_multiple_batches_continuation() {
583        let mut estimator = SeriesEstimator::default();
584
585        // First batch: timestamps 1, 2, 3
586        let batch1 = new_flat_record_batch(&[1, 2, 3]);
587        estimator.update_flat(&batch1);
588
589        // Second batch: timestamps 4, 5, 6 (continuation)
590        let batch2 = new_flat_record_batch(&[4, 5, 6]);
591        estimator.update_flat(&batch2);
592
593        assert_eq!(1, estimator.finish());
594    }
595
596    #[test]
597    fn test_series_estimator_flat_multiple_batches_new_series() {
598        let mut estimator = SeriesEstimator::default();
599
600        // First batch: timestamps 1, 2, 3
601        let batch1 = new_flat_record_batch(&[1, 2, 3]);
602        estimator.update_flat(&batch1);
603
604        // Second batch: timestamps 2, 3, 4 (goes back to 2, new series)
605        let batch2 = new_flat_record_batch(&[2, 3, 4]);
606        estimator.update_flat(&batch2);
607
608        assert_eq!(2, estimator.finish());
609    }
610
611    #[test]
612    fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
613        let mut estimator = SeriesEstimator::default();
614
615        // First batch ending at 5
616        let batch1 = new_flat_record_batch(&[1, 2, 5]);
617        estimator.update_flat(&batch1);
618
619        // Second batch starting at 5 (equal timestamp, new series)
620        let batch2 = new_flat_record_batch(&[5, 6, 7]);
621        estimator.update_flat(&batch2);
622
623        assert_eq!(2, estimator.finish());
624    }
625
626    #[test]
627    fn test_series_estimator_flat_mixed_batches() {
628        let mut estimator = SeriesEstimator::default();
629
630        // Batch 1: single series [10, 20, 30]
631        let batch1 = new_flat_record_batch(&[10, 20, 30]);
632        estimator.update_flat(&batch1);
633
634        // Batch 2: starts new series [5, 15], boundary within batch [15, 10, 25]
635        let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
636        estimator.update_flat(&batch2);
637
638        // Batch 3: continues from 25 to [30, 35]
639        let batch3 = new_flat_record_batch(&[30, 35]);
640        estimator.update_flat(&batch3);
641
642        // Expected: 1 (batch1) + 1 (batch2 start) + 1 (within batch2) = 3
643        assert_eq!(3, estimator.finish());
644    }
645
646    #[test]
647    fn test_series_estimator_flat_descending_timestamps() {
648        let mut estimator = SeriesEstimator::default();
649        // Strictly descending timestamps - each pair creates a boundary
650        let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
651        estimator.update_flat(&record_batch);
652        // Boundaries: 10>=9, 9>=8, 8>=7, 7>=6 = 4 boundaries + 1 initial = 5 series
653        assert_eq!(5, estimator.finish());
654    }
655
656    #[test]
657    fn test_series_estimator_flat_finish_resets_state() {
658        let mut estimator = SeriesEstimator::default();
659
660        let batch1 = new_flat_record_batch(&[1, 2, 3]);
661        estimator.update_flat(&batch1);
662
663        assert_eq!(1, estimator.finish());
664
665        // After finish, state should be reset
666        let batch2 = new_flat_record_batch(&[4, 5, 6]);
667        estimator.update_flat(&batch2);
668
669        assert_eq!(1, estimator.finish());
670    }
671}