Skip to main content

mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::sst::parquet::flat_format::time_index_column_index;
35
36pub mod file;
37pub mod file_purger;
38pub mod file_ref;
39pub mod index;
40pub mod location;
41pub mod parquet;
42pub(crate) mod version;
43
44/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
45pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
46
47/// Default number of concurrent write, it only works on object store backend(e.g., S3).
48pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
49
50/// Format type of the SST file.
51#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
52#[serde(rename_all = "snake_case")]
53#[strum(serialize_all = "snake_case")]
54pub enum FormatType {
55    /// Parquet with primary key encoded.
56    #[default]
57    PrimaryKey,
58    /// Flat Parquet format.
59    Flat,
60}
61
62/// Iceberg-compatible column field ID key stored in Parquet column metadata.
63pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
64
65/// Adds `PARQUET:field_id` metadata to an Arrow field.
66pub fn with_field_id(mut field: Field, column_id: u32) -> Field {
67    field
68        .metadata_mut()
69        .insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string());
70    field
71}
72
73/// Gets the arrow schema to store in parquet.
74pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
75    let fields = Fields::from_iter(
76        metadata
77            .schema
78            .arrow_schema()
79            .fields()
80            .iter()
81            .zip(&metadata.column_metadatas)
82            .filter_map(|(field, column_meta)| {
83                if column_meta.semantic_type == SemanticType::Field {
84                    Some(Arc::new(with_field_id(
85                        (**field).clone(),
86                        column_meta.column_id,
87                    )))
88                } else {
89                    // We have fixed positions for tags (primary key) and time index.
90                    None
91                }
92            })
93            .chain([Arc::new(with_field_id(
94                (*metadata.time_index_field()).clone(),
95                metadata.time_index_column().column_id,
96            ))])
97            .chain(internal_fields()),
98    );
99
100    Arc::new(Schema::new(fields))
101}
102
103/// Options of flat schema.
104pub struct FlatSchemaOptions {
105    /// Whether to store primary key columns additionally instead of an encoded column.
106    pub raw_pk_columns: bool,
107    /// Whether to use dictionary encoding for string primary key columns
108    /// when storing primary key columns.
109    /// Only takes effect when `raw_pk_columns` is true.
110    pub string_pk_use_dict: bool,
111}
112
113impl Default for FlatSchemaOptions {
114    fn default() -> Self {
115        Self {
116            raw_pk_columns: true,
117            string_pk_use_dict: true,
118        }
119    }
120}
121
122impl FlatSchemaOptions {
123    /// Creates a options according to the primary key encoding.
124    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
125        if encoding == PrimaryKeyEncoding::Dense {
126            Self::default()
127        } else {
128            Self {
129                raw_pk_columns: false,
130                string_pk_use_dict: false,
131            }
132        }
133    }
134}
135
136/// Gets the arrow schema to store in parquet.
137///
138/// The schema is:
139/// ```text
140/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type
141/// ```
142///
143/// # Panics
144/// Panics if the metadata is invalid.
145pub fn to_flat_sst_arrow_schema(
146    metadata: &RegionMetadata,
147    options: &FlatSchemaOptions,
148) -> SchemaRef {
149    let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
150    let mut fields = Vec::with_capacity(num_fields);
151    let schema = metadata.schema.arrow_schema();
152    if options.raw_pk_columns {
153        for pk_id in &metadata.primary_key {
154            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
155            let column_id = metadata.column_metadatas[pk_index].column_id;
156            if options.string_pk_use_dict {
157                let old_field = &schema.fields[pk_index];
158                let new_field = tag_maybe_to_dictionary_field(
159                    &metadata.column_metadatas[pk_index].column_schema.data_type,
160                    old_field,
161                );
162                fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
163            }
164        }
165    }
166    let remaining_fields = schema
167        .fields()
168        .iter()
169        .zip(&metadata.column_metadatas)
170        .filter_map(|(field, column_meta)| {
171            if column_meta.semantic_type == SemanticType::Field {
172                Some(Arc::new(with_field_id(
173                    (**field).clone(),
174                    column_meta.column_id,
175                )))
176            } else {
177                None
178            }
179        })
180        .chain([Arc::new(with_field_id(
181            (*metadata.time_index_field()).clone(),
182            metadata.time_index_column().column_id,
183        ))])
184        .chain(internal_fields());
185    for field in remaining_fields {
186        fields.push(field);
187    }
188
189    Arc::new(Schema::new(fields))
190}
191
192/// Returns the number of columns in the flat format.
193pub fn flat_sst_arrow_schema_column_num(
194    metadata: &RegionMetadata,
195    options: &FlatSchemaOptions,
196) -> usize {
197    if options.raw_pk_columns {
198        metadata.column_metadatas.len() + 3
199    } else {
200        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
201    }
202}
203
204/// Helper function to create a dictionary field from a field.
205fn to_dictionary_field(field: &Field) -> Field {
206    let mut new_field = Field::new_dictionary(
207        field.name(),
208        datatypes::arrow::datatypes::DataType::UInt32,
209        field.data_type().clone(),
210        field.is_nullable(),
211    );
212
213    // retain field_id metadata
214    if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
215        new_field
216            .metadata_mut()
217            .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
218    }
219
220    new_field
221}
222
223/// Helper function to create a dictionary field from a field if it is a string column.
224pub(crate) fn tag_maybe_to_dictionary_field(
225    data_type: &ConcreteDataType,
226    field: &Arc<Field>,
227) -> Arc<Field> {
228    if data_type.is_string() {
229        Arc::new(to_dictionary_field(field))
230    } else {
231        field.clone()
232    }
233}
234
235/// Fields for internal columns.
236pub(crate) fn internal_fields() -> [FieldRef; 3] {
237    // Internal columns are always not null.
238    [
239        Arc::new(Field::new_dictionary(
240            PRIMARY_KEY_COLUMN_NAME,
241            ArrowDataType::UInt32,
242            ArrowDataType::Binary,
243            false,
244        )),
245        Arc::new(Field::new(
246            SEQUENCE_COLUMN_NAME,
247            ArrowDataType::UInt64,
248            false,
249        )),
250        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
251    ]
252}
253
254/// Gets the estimated number of series from record batches.
255///
256/// This struct tracks the last timestamp value to detect series boundaries
257/// by observing when timestamps decrease (indicating a new series).
258#[derive(Default)]
259pub(crate) struct SeriesEstimator {
260    /// The last timestamp value seen
261    last_timestamp: Option<i64>,
262    /// The estimated number of series
263    series_count: u64,
264}
265
266impl SeriesEstimator {
267    /// Updates the estimator with a new record batch in flat format.
268    ///
269    /// This method examines the time index column to detect series boundaries.
270    pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
271        let batch_rows = record_batch.num_rows();
272        if batch_rows == 0 {
273            return;
274        }
275
276        let time_index_pos = time_index_column_index(record_batch.num_columns());
277        let timestamps = record_batch.column(time_index_pos);
278        let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
279            return;
280        };
281        let values = ts_values.values();
282
283        // Checks if there's a boundary between the last batch and this batch
284        if let Some(last_ts) = self.last_timestamp {
285            if values[0] <= last_ts {
286                self.series_count += 1;
287            }
288        } else {
289            // First batch, counts as first series
290            self.series_count = 1;
291        }
292
293        // Counts series boundaries within this batch.
294        for i in 0..batch_rows - 1 {
295            // We assumes the same timestamp as a new series, which is different from
296            // how we split batches.
297            if values[i] >= values[i + 1] {
298                self.series_count += 1;
299            }
300        }
301
302        // Updates the last timestamp
303        self.last_timestamp = Some(values[batch_rows - 1]);
304    }
305
306    /// Returns the estimated number of series.
307    pub(crate) fn finish(&mut self) -> u64 {
308        self.last_timestamp = None;
309        let count = self.series_count;
310        self.series_count = 0;
311
312        count
313    }
314}
315
316#[cfg(test)]
317mod tests {
318    use std::sync::Arc;
319
320    use datatypes::arrow::array::{
321        BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
322        UInt64Array,
323    };
324    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
325    use datatypes::arrow::record_batch::RecordBatch;
326
327    use super::*;
328
329    fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
330        // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type]
331        let num_cols = 4; // time_index + 3 internal columns
332        let time_index_pos = time_index_column_index(num_cols);
333        assert_eq!(time_index_pos, 0); // For 4 columns, time index should be at position 0
334
335        let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
336        let pk_array = Arc::new(DictionaryArray::new(
337            UInt32Array::from(vec![0; timestamps.len()]),
338            Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
339        ));
340        let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
341        let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
342
343        let schema = Arc::new(Schema::new(vec![
344            Field::new(
345                "time",
346                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
347                false,
348            ),
349            Field::new_dictionary(
350                "__primary_key",
351                ArrowDataType::UInt32,
352                ArrowDataType::Binary,
353                false,
354            ),
355            Field::new("__sequence", ArrowDataType::UInt64, false),
356            Field::new("__op_type", ArrowDataType::UInt8, false),
357        ]));
358
359        RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
360    }
361
362    #[test]
363    fn test_series_estimator_flat_empty_batch() {
364        let mut estimator = SeriesEstimator::default();
365        let record_batch = new_flat_record_batch(&[]);
366        estimator.update_flat(&record_batch);
367        assert_eq!(0, estimator.finish());
368    }
369
370    #[test]
371    fn test_series_estimator_flat_single_batch() {
372        let mut estimator = SeriesEstimator::default();
373        let record_batch = new_flat_record_batch(&[1, 2, 3]);
374        estimator.update_flat(&record_batch);
375        assert_eq!(1, estimator.finish());
376    }
377
378    #[test]
379    fn test_series_estimator_flat_series_boundary_within_batch() {
380        let mut estimator = SeriesEstimator::default();
381        // Timestamps decrease from 3 to 2, indicating a series boundary
382        let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
383        estimator.update_flat(&record_batch);
384        // Should detect boundary at position 3 (3 >= 2)
385        assert_eq!(2, estimator.finish());
386    }
387
388    #[test]
389    fn test_series_estimator_flat_multiple_boundaries_within_batch() {
390        let mut estimator = SeriesEstimator::default();
391        // Multiple series boundaries: 5>=4, 6>=3
392        let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
393        estimator.update_flat(&record_batch);
394        assert_eq!(3, estimator.finish());
395    }
396
397    #[test]
398    fn test_series_estimator_flat_equal_timestamps() {
399        let mut estimator = SeriesEstimator::default();
400        // Equal timestamps are considered as new series
401        let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
402        estimator.update_flat(&record_batch);
403        // Boundaries at: 2>=2, 3>=3, 3>=3
404        assert_eq!(4, estimator.finish());
405    }
406
407    #[test]
408    fn test_series_estimator_flat_multiple_batches_continuation() {
409        let mut estimator = SeriesEstimator::default();
410
411        // First batch: timestamps 1, 2, 3
412        let batch1 = new_flat_record_batch(&[1, 2, 3]);
413        estimator.update_flat(&batch1);
414
415        // Second batch: timestamps 4, 5, 6 (continuation)
416        let batch2 = new_flat_record_batch(&[4, 5, 6]);
417        estimator.update_flat(&batch2);
418
419        assert_eq!(1, estimator.finish());
420    }
421
422    #[test]
423    fn test_series_estimator_flat_multiple_batches_new_series() {
424        let mut estimator = SeriesEstimator::default();
425
426        // First batch: timestamps 1, 2, 3
427        let batch1 = new_flat_record_batch(&[1, 2, 3]);
428        estimator.update_flat(&batch1);
429
430        // Second batch: timestamps 2, 3, 4 (goes back to 2, new series)
431        let batch2 = new_flat_record_batch(&[2, 3, 4]);
432        estimator.update_flat(&batch2);
433
434        assert_eq!(2, estimator.finish());
435    }
436
437    #[test]
438    fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
439        let mut estimator = SeriesEstimator::default();
440
441        // First batch ending at 5
442        let batch1 = new_flat_record_batch(&[1, 2, 5]);
443        estimator.update_flat(&batch1);
444
445        // Second batch starting at 5 (equal timestamp, new series)
446        let batch2 = new_flat_record_batch(&[5, 6, 7]);
447        estimator.update_flat(&batch2);
448
449        assert_eq!(2, estimator.finish());
450    }
451
452    #[test]
453    fn test_series_estimator_flat_mixed_batches() {
454        let mut estimator = SeriesEstimator::default();
455
456        // Batch 1: single series [10, 20, 30]
457        let batch1 = new_flat_record_batch(&[10, 20, 30]);
458        estimator.update_flat(&batch1);
459
460        // Batch 2: starts new series [5, 15], boundary within batch [15, 10, 25]
461        let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
462        estimator.update_flat(&batch2);
463
464        // Batch 3: continues from 25 to [30, 35]
465        let batch3 = new_flat_record_batch(&[30, 35]);
466        estimator.update_flat(&batch3);
467
468        // Expected: 1 (batch1) + 1 (batch2 start) + 1 (within batch2) = 3
469        assert_eq!(3, estimator.finish());
470    }
471
472    #[test]
473    fn test_series_estimator_flat_descending_timestamps() {
474        let mut estimator = SeriesEstimator::default();
475        // Strictly descending timestamps - each pair creates a boundary
476        let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
477        estimator.update_flat(&record_batch);
478        // Boundaries: 10>=9, 9>=8, 8>=7, 7>=6 = 4 boundaries + 1 initial = 5 series
479        assert_eq!(5, estimator.finish());
480    }
481
482    #[test]
483    fn test_series_estimator_flat_finish_resets_state() {
484        let mut estimator = SeriesEstimator::default();
485
486        let batch1 = new_flat_record_batch(&[1, 2, 3]);
487        estimator.update_flat(&batch1);
488
489        assert_eq!(1, estimator.finish());
490
491        // After finish, state should be reset
492        let batch2 = new_flat_record_batch(&[4, 5, 6]);
493        estimator.update_flat(&batch2);
494
495        assert_eq!(1, estimator.finish());
496    }
497}