Skip to main content

mito2/
sst.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sorted strings tables.
16
17use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22    DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31    OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::sst::parquet::flat_format::time_index_column_index;
35
36pub mod file;
37pub mod file_purger;
38pub mod file_ref;
39pub mod index;
40pub mod location;
41pub mod parquet;
42pub(crate) mod version;
43
44/// Default write buffer size, it should be greater than the default minimum upload part of S3 (5mb).
45pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
46
47/// Default number of concurrent write, it only works on object store backend(e.g., S3).
48pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
49
50/// Format type of the SST file.
51#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
52#[serde(rename_all = "snake_case")]
53#[strum(serialize_all = "snake_case")]
54pub enum FormatType {
55    /// Parquet with primary key encoded.
56    #[default]
57    PrimaryKey,
58    /// Flat Parquet format.
59    Flat,
60}
61
62/// Gets the arrow schema to store in parquet.
63pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
64    let fields = Fields::from_iter(
65        metadata
66            .schema
67            .arrow_schema()
68            .fields()
69            .iter()
70            .zip(&metadata.column_metadatas)
71            .filter_map(|(field, column_meta)| {
72                if column_meta.semantic_type == SemanticType::Field {
73                    Some(field.clone())
74                } else {
75                    // We have fixed positions for tags (primary key) and time index.
76                    None
77                }
78            })
79            .chain([metadata.time_index_field()])
80            .chain(internal_fields()),
81    );
82
83    Arc::new(Schema::new(fields))
84}
85
86/// Options of flat schema.
87pub struct FlatSchemaOptions {
88    /// Whether to store primary key columns additionally instead of an encoded column.
89    pub raw_pk_columns: bool,
90    /// Whether to use dictionary encoding for string primary key columns
91    /// when storing primary key columns.
92    /// Only takes effect when `raw_pk_columns` is true.
93    pub string_pk_use_dict: bool,
94}
95
96impl Default for FlatSchemaOptions {
97    fn default() -> Self {
98        Self {
99            raw_pk_columns: true,
100            string_pk_use_dict: true,
101        }
102    }
103}
104
105impl FlatSchemaOptions {
106    /// Creates a options according to the primary key encoding.
107    pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
108        if encoding == PrimaryKeyEncoding::Dense {
109            Self::default()
110        } else {
111            Self {
112                raw_pk_columns: false,
113                string_pk_use_dict: false,
114            }
115        }
116    }
117}
118
119/// Gets the arrow schema to store in parquet.
120///
121/// The schema is:
122/// ```text
123/// primary key columns, field columns, time index, __primary_key, __sequence, __op_type
124/// ```
125///
126/// # Panics
127/// Panics if the metadata is invalid.
128pub fn to_flat_sst_arrow_schema(
129    metadata: &RegionMetadata,
130    options: &FlatSchemaOptions,
131) -> SchemaRef {
132    let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
133    let mut fields = Vec::with_capacity(num_fields);
134    let schema = metadata.schema.arrow_schema();
135    if options.raw_pk_columns {
136        for pk_id in &metadata.primary_key {
137            let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
138            if options.string_pk_use_dict {
139                let old_field = &schema.fields[pk_index];
140                let new_field = tag_maybe_to_dictionary_field(
141                    &metadata.column_metadatas[pk_index].column_schema.data_type,
142                    old_field,
143                );
144                fields.push(new_field);
145            }
146        }
147    }
148    let remaining_fields = schema
149        .fields()
150        .iter()
151        .zip(&metadata.column_metadatas)
152        .filter_map(|(field, column_meta)| {
153            if column_meta.semantic_type == SemanticType::Field {
154                Some(field.clone())
155            } else {
156                None
157            }
158        })
159        .chain([metadata.time_index_field()])
160        .chain(internal_fields());
161    for field in remaining_fields {
162        fields.push(field);
163    }
164
165    Arc::new(Schema::new(fields))
166}
167
168/// Returns the number of columns in the flat format.
169pub fn flat_sst_arrow_schema_column_num(
170    metadata: &RegionMetadata,
171    options: &FlatSchemaOptions,
172) -> usize {
173    if options.raw_pk_columns {
174        metadata.column_metadatas.len() + 3
175    } else {
176        metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
177    }
178}
179
180/// Helper function to create a dictionary field from a field.
181fn to_dictionary_field(field: &Field) -> Field {
182    Field::new_dictionary(
183        field.name(),
184        datatypes::arrow::datatypes::DataType::UInt32,
185        field.data_type().clone(),
186        field.is_nullable(),
187    )
188}
189
190/// Helper function to create a dictionary field from a field if it is a string column.
191pub(crate) fn tag_maybe_to_dictionary_field(
192    data_type: &ConcreteDataType,
193    field: &Arc<Field>,
194) -> Arc<Field> {
195    if data_type.is_string() {
196        Arc::new(to_dictionary_field(field))
197    } else {
198        field.clone()
199    }
200}
201
202/// Fields for internal columns.
203pub(crate) fn internal_fields() -> [FieldRef; 3] {
204    // Internal columns are always not null.
205    [
206        Arc::new(Field::new_dictionary(
207            PRIMARY_KEY_COLUMN_NAME,
208            ArrowDataType::UInt32,
209            ArrowDataType::Binary,
210            false,
211        )),
212        Arc::new(Field::new(
213            SEQUENCE_COLUMN_NAME,
214            ArrowDataType::UInt64,
215            false,
216        )),
217        Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
218    ]
219}
220
221/// Gets the estimated number of series from record batches.
222///
223/// This struct tracks the last timestamp value to detect series boundaries
224/// by observing when timestamps decrease (indicating a new series).
225#[derive(Default)]
226pub(crate) struct SeriesEstimator {
227    /// The last timestamp value seen
228    last_timestamp: Option<i64>,
229    /// The estimated number of series
230    series_count: u64,
231}
232
233impl SeriesEstimator {
234    /// Updates the estimator with a new record batch in flat format.
235    ///
236    /// This method examines the time index column to detect series boundaries.
237    pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
238        let batch_rows = record_batch.num_rows();
239        if batch_rows == 0 {
240            return;
241        }
242
243        let time_index_pos = time_index_column_index(record_batch.num_columns());
244        let timestamps = record_batch.column(time_index_pos);
245        let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
246            return;
247        };
248        let values = ts_values.values();
249
250        // Checks if there's a boundary between the last batch and this batch
251        if let Some(last_ts) = self.last_timestamp {
252            if values[0] <= last_ts {
253                self.series_count += 1;
254            }
255        } else {
256            // First batch, counts as first series
257            self.series_count = 1;
258        }
259
260        // Counts series boundaries within this batch.
261        for i in 0..batch_rows - 1 {
262            // We assumes the same timestamp as a new series, which is different from
263            // how we split batches.
264            if values[i] >= values[i + 1] {
265                self.series_count += 1;
266            }
267        }
268
269        // Updates the last timestamp
270        self.last_timestamp = Some(values[batch_rows - 1]);
271    }
272
273    /// Returns the estimated number of series.
274    pub(crate) fn finish(&mut self) -> u64 {
275        self.last_timestamp = None;
276        let count = self.series_count;
277        self.series_count = 0;
278
279        count
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use std::sync::Arc;
286
287    use datatypes::arrow::array::{
288        BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
289        UInt64Array,
290    };
291    use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
292    use datatypes::arrow::record_batch::RecordBatch;
293
294    use super::*;
295
296    fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
297        // Flat format has: [fields..., time_index, __primary_key, __sequence, __op_type]
298        let num_cols = 4; // time_index + 3 internal columns
299        let time_index_pos = time_index_column_index(num_cols);
300        assert_eq!(time_index_pos, 0); // For 4 columns, time index should be at position 0
301
302        let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
303        let pk_array = Arc::new(DictionaryArray::new(
304            UInt32Array::from(vec![0; timestamps.len()]),
305            Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
306        ));
307        let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
308        let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
309
310        let schema = Arc::new(Schema::new(vec![
311            Field::new(
312                "time",
313                ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
314                false,
315            ),
316            Field::new_dictionary(
317                "__primary_key",
318                ArrowDataType::UInt32,
319                ArrowDataType::Binary,
320                false,
321            ),
322            Field::new("__sequence", ArrowDataType::UInt64, false),
323            Field::new("__op_type", ArrowDataType::UInt8, false),
324        ]));
325
326        RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
327    }
328
329    #[test]
330    fn test_series_estimator_flat_empty_batch() {
331        let mut estimator = SeriesEstimator::default();
332        let record_batch = new_flat_record_batch(&[]);
333        estimator.update_flat(&record_batch);
334        assert_eq!(0, estimator.finish());
335    }
336
337    #[test]
338    fn test_series_estimator_flat_single_batch() {
339        let mut estimator = SeriesEstimator::default();
340        let record_batch = new_flat_record_batch(&[1, 2, 3]);
341        estimator.update_flat(&record_batch);
342        assert_eq!(1, estimator.finish());
343    }
344
345    #[test]
346    fn test_series_estimator_flat_series_boundary_within_batch() {
347        let mut estimator = SeriesEstimator::default();
348        // Timestamps decrease from 3 to 2, indicating a series boundary
349        let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
350        estimator.update_flat(&record_batch);
351        // Should detect boundary at position 3 (3 >= 2)
352        assert_eq!(2, estimator.finish());
353    }
354
355    #[test]
356    fn test_series_estimator_flat_multiple_boundaries_within_batch() {
357        let mut estimator = SeriesEstimator::default();
358        // Multiple series boundaries: 5>=4, 6>=3
359        let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
360        estimator.update_flat(&record_batch);
361        assert_eq!(3, estimator.finish());
362    }
363
364    #[test]
365    fn test_series_estimator_flat_equal_timestamps() {
366        let mut estimator = SeriesEstimator::default();
367        // Equal timestamps are considered as new series
368        let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
369        estimator.update_flat(&record_batch);
370        // Boundaries at: 2>=2, 3>=3, 3>=3
371        assert_eq!(4, estimator.finish());
372    }
373
374    #[test]
375    fn test_series_estimator_flat_multiple_batches_continuation() {
376        let mut estimator = SeriesEstimator::default();
377
378        // First batch: timestamps 1, 2, 3
379        let batch1 = new_flat_record_batch(&[1, 2, 3]);
380        estimator.update_flat(&batch1);
381
382        // Second batch: timestamps 4, 5, 6 (continuation)
383        let batch2 = new_flat_record_batch(&[4, 5, 6]);
384        estimator.update_flat(&batch2);
385
386        assert_eq!(1, estimator.finish());
387    }
388
389    #[test]
390    fn test_series_estimator_flat_multiple_batches_new_series() {
391        let mut estimator = SeriesEstimator::default();
392
393        // First batch: timestamps 1, 2, 3
394        let batch1 = new_flat_record_batch(&[1, 2, 3]);
395        estimator.update_flat(&batch1);
396
397        // Second batch: timestamps 2, 3, 4 (goes back to 2, new series)
398        let batch2 = new_flat_record_batch(&[2, 3, 4]);
399        estimator.update_flat(&batch2);
400
401        assert_eq!(2, estimator.finish());
402    }
403
404    #[test]
405    fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
406        let mut estimator = SeriesEstimator::default();
407
408        // First batch ending at 5
409        let batch1 = new_flat_record_batch(&[1, 2, 5]);
410        estimator.update_flat(&batch1);
411
412        // Second batch starting at 5 (equal timestamp, new series)
413        let batch2 = new_flat_record_batch(&[5, 6, 7]);
414        estimator.update_flat(&batch2);
415
416        assert_eq!(2, estimator.finish());
417    }
418
419    #[test]
420    fn test_series_estimator_flat_mixed_batches() {
421        let mut estimator = SeriesEstimator::default();
422
423        // Batch 1: single series [10, 20, 30]
424        let batch1 = new_flat_record_batch(&[10, 20, 30]);
425        estimator.update_flat(&batch1);
426
427        // Batch 2: starts new series [5, 15], boundary within batch [15, 10, 25]
428        let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
429        estimator.update_flat(&batch2);
430
431        // Batch 3: continues from 25 to [30, 35]
432        let batch3 = new_flat_record_batch(&[30, 35]);
433        estimator.update_flat(&batch3);
434
435        // Expected: 1 (batch1) + 1 (batch2 start) + 1 (within batch2) = 3
436        assert_eq!(3, estimator.finish());
437    }
438
439    #[test]
440    fn test_series_estimator_flat_descending_timestamps() {
441        let mut estimator = SeriesEstimator::default();
442        // Strictly descending timestamps - each pair creates a boundary
443        let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
444        estimator.update_flat(&record_batch);
445        // Boundaries: 10>=9, 9>=8, 8>=7, 7>=6 = 4 boundaries + 1 initial = 5 series
446        assert_eq!(5, estimator.finish());
447    }
448
449    #[test]
450    fn test_series_estimator_flat_finish_resets_state() {
451        let mut estimator = SeriesEstimator::default();
452
453        let batch1 = new_flat_record_batch(&[1, 2, 3]);
454        estimator.update_flat(&batch1);
455
456        assert_eq!(1, estimator.finish());
457
458        // After finish, state should be reset
459        let batch2 = new_flat_record_batch(&[4, 5, 6]);
460        estimator.update_flat(&batch2);
461
462        assert_eq!(1, estimator.finish());
463    }
464}