1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::sst::parquet::flat_format::time_index_column_index;
35
36pub mod file;
37pub mod file_purger;
38pub mod file_ref;
39pub mod index;
40pub mod location;
41pub mod parquet;
42pub(crate) mod version;
43
44pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
46
47pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
49
50#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
52#[serde(rename_all = "snake_case")]
53#[strum(serialize_all = "snake_case")]
54pub enum FormatType {
55 #[default]
57 PrimaryKey,
58 Flat,
60}
61
62pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
64
65pub fn with_field_id(mut field: Field, column_id: u32) -> Field {
67 field
68 .metadata_mut()
69 .insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string());
70 field
71}
72
73pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
75 let fields = Fields::from_iter(
76 metadata
77 .schema
78 .arrow_schema()
79 .fields()
80 .iter()
81 .zip(&metadata.column_metadatas)
82 .filter_map(|(field, column_meta)| {
83 if column_meta.semantic_type == SemanticType::Field {
84 Some(Arc::new(with_field_id(
85 (**field).clone(),
86 column_meta.column_id,
87 )))
88 } else {
89 None
91 }
92 })
93 .chain([Arc::new(with_field_id(
94 (*metadata.time_index_field()).clone(),
95 metadata.time_index_column().column_id,
96 ))])
97 .chain(internal_fields()),
98 );
99
100 Arc::new(Schema::new(fields))
101}
102
103pub struct FlatSchemaOptions {
105 pub raw_pk_columns: bool,
107 pub string_pk_use_dict: bool,
111}
112
113impl Default for FlatSchemaOptions {
114 fn default() -> Self {
115 Self {
116 raw_pk_columns: true,
117 string_pk_use_dict: true,
118 }
119 }
120}
121
122impl FlatSchemaOptions {
123 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
125 if encoding == PrimaryKeyEncoding::Dense {
126 Self::default()
127 } else {
128 Self {
129 raw_pk_columns: false,
130 string_pk_use_dict: false,
131 }
132 }
133 }
134}
135
136pub fn to_flat_sst_arrow_schema(
146 metadata: &RegionMetadata,
147 options: &FlatSchemaOptions,
148) -> SchemaRef {
149 let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
150 let mut fields = Vec::with_capacity(num_fields);
151 let schema = metadata.schema.arrow_schema();
152 if options.raw_pk_columns {
153 for pk_id in &metadata.primary_key {
154 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
155 let column_id = metadata.column_metadatas[pk_index].column_id;
156 if options.string_pk_use_dict {
157 let old_field = &schema.fields[pk_index];
158 let new_field = tag_maybe_to_dictionary_field(
159 &metadata.column_metadatas[pk_index].column_schema.data_type,
160 old_field,
161 );
162 fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
163 }
164 }
165 }
166 let remaining_fields = schema
167 .fields()
168 .iter()
169 .zip(&metadata.column_metadatas)
170 .filter_map(|(field, column_meta)| {
171 if column_meta.semantic_type == SemanticType::Field {
172 Some(Arc::new(with_field_id(
173 (**field).clone(),
174 column_meta.column_id,
175 )))
176 } else {
177 None
178 }
179 })
180 .chain([Arc::new(with_field_id(
181 (*metadata.time_index_field()).clone(),
182 metadata.time_index_column().column_id,
183 ))])
184 .chain(internal_fields());
185 for field in remaining_fields {
186 fields.push(field);
187 }
188
189 Arc::new(Schema::new(fields))
190}
191
192pub fn flat_sst_arrow_schema_column_num(
194 metadata: &RegionMetadata,
195 options: &FlatSchemaOptions,
196) -> usize {
197 if options.raw_pk_columns {
198 metadata.column_metadatas.len() + 3
199 } else {
200 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
201 }
202}
203
204fn to_dictionary_field(field: &Field) -> Field {
206 let mut new_field = Field::new_dictionary(
207 field.name(),
208 datatypes::arrow::datatypes::DataType::UInt32,
209 field.data_type().clone(),
210 field.is_nullable(),
211 );
212
213 if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
215 new_field
216 .metadata_mut()
217 .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
218 }
219
220 new_field
221}
222
223pub(crate) fn tag_maybe_to_dictionary_field(
225 data_type: &ConcreteDataType,
226 field: &Arc<Field>,
227) -> Arc<Field> {
228 if data_type.is_string() {
229 Arc::new(to_dictionary_field(field))
230 } else {
231 field.clone()
232 }
233}
234
235pub(crate) fn internal_fields() -> [FieldRef; 3] {
237 [
239 Arc::new(Field::new_dictionary(
240 PRIMARY_KEY_COLUMN_NAME,
241 ArrowDataType::UInt32,
242 ArrowDataType::Binary,
243 false,
244 )),
245 Arc::new(Field::new(
246 SEQUENCE_COLUMN_NAME,
247 ArrowDataType::UInt64,
248 false,
249 )),
250 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
251 ]
252}
253
254#[derive(Default)]
259pub(crate) struct SeriesEstimator {
260 last_timestamp: Option<i64>,
262 series_count: u64,
264}
265
266impl SeriesEstimator {
267 pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
271 let batch_rows = record_batch.num_rows();
272 if batch_rows == 0 {
273 return;
274 }
275
276 let time_index_pos = time_index_column_index(record_batch.num_columns());
277 let timestamps = record_batch.column(time_index_pos);
278 let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
279 return;
280 };
281 let values = ts_values.values();
282
283 if let Some(last_ts) = self.last_timestamp {
285 if values[0] <= last_ts {
286 self.series_count += 1;
287 }
288 } else {
289 self.series_count = 1;
291 }
292
293 for i in 0..batch_rows - 1 {
295 if values[i] >= values[i + 1] {
298 self.series_count += 1;
299 }
300 }
301
302 self.last_timestamp = Some(values[batch_rows - 1]);
304 }
305
306 pub(crate) fn finish(&mut self) -> u64 {
308 self.last_timestamp = None;
309 let count = self.series_count;
310 self.series_count = 0;
311
312 count
313 }
314}
315
316#[cfg(test)]
317mod tests {
318 use std::sync::Arc;
319
320 use datatypes::arrow::array::{
321 BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
322 UInt64Array,
323 };
324 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
325 use datatypes::arrow::record_batch::RecordBatch;
326
327 use super::*;
328
329 fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
330 let num_cols = 4; let time_index_pos = time_index_column_index(num_cols);
333 assert_eq!(time_index_pos, 0); let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
336 let pk_array = Arc::new(DictionaryArray::new(
337 UInt32Array::from(vec![0; timestamps.len()]),
338 Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
339 ));
340 let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
341 let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
342
343 let schema = Arc::new(Schema::new(vec![
344 Field::new(
345 "time",
346 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
347 false,
348 ),
349 Field::new_dictionary(
350 "__primary_key",
351 ArrowDataType::UInt32,
352 ArrowDataType::Binary,
353 false,
354 ),
355 Field::new("__sequence", ArrowDataType::UInt64, false),
356 Field::new("__op_type", ArrowDataType::UInt8, false),
357 ]));
358
359 RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
360 }
361
362 #[test]
363 fn test_series_estimator_flat_empty_batch() {
364 let mut estimator = SeriesEstimator::default();
365 let record_batch = new_flat_record_batch(&[]);
366 estimator.update_flat(&record_batch);
367 assert_eq!(0, estimator.finish());
368 }
369
370 #[test]
371 fn test_series_estimator_flat_single_batch() {
372 let mut estimator = SeriesEstimator::default();
373 let record_batch = new_flat_record_batch(&[1, 2, 3]);
374 estimator.update_flat(&record_batch);
375 assert_eq!(1, estimator.finish());
376 }
377
378 #[test]
379 fn test_series_estimator_flat_series_boundary_within_batch() {
380 let mut estimator = SeriesEstimator::default();
381 let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
383 estimator.update_flat(&record_batch);
384 assert_eq!(2, estimator.finish());
386 }
387
388 #[test]
389 fn test_series_estimator_flat_multiple_boundaries_within_batch() {
390 let mut estimator = SeriesEstimator::default();
391 let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
393 estimator.update_flat(&record_batch);
394 assert_eq!(3, estimator.finish());
395 }
396
397 #[test]
398 fn test_series_estimator_flat_equal_timestamps() {
399 let mut estimator = SeriesEstimator::default();
400 let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
402 estimator.update_flat(&record_batch);
403 assert_eq!(4, estimator.finish());
405 }
406
407 #[test]
408 fn test_series_estimator_flat_multiple_batches_continuation() {
409 let mut estimator = SeriesEstimator::default();
410
411 let batch1 = new_flat_record_batch(&[1, 2, 3]);
413 estimator.update_flat(&batch1);
414
415 let batch2 = new_flat_record_batch(&[4, 5, 6]);
417 estimator.update_flat(&batch2);
418
419 assert_eq!(1, estimator.finish());
420 }
421
422 #[test]
423 fn test_series_estimator_flat_multiple_batches_new_series() {
424 let mut estimator = SeriesEstimator::default();
425
426 let batch1 = new_flat_record_batch(&[1, 2, 3]);
428 estimator.update_flat(&batch1);
429
430 let batch2 = new_flat_record_batch(&[2, 3, 4]);
432 estimator.update_flat(&batch2);
433
434 assert_eq!(2, estimator.finish());
435 }
436
437 #[test]
438 fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
439 let mut estimator = SeriesEstimator::default();
440
441 let batch1 = new_flat_record_batch(&[1, 2, 5]);
443 estimator.update_flat(&batch1);
444
445 let batch2 = new_flat_record_batch(&[5, 6, 7]);
447 estimator.update_flat(&batch2);
448
449 assert_eq!(2, estimator.finish());
450 }
451
452 #[test]
453 fn test_series_estimator_flat_mixed_batches() {
454 let mut estimator = SeriesEstimator::default();
455
456 let batch1 = new_flat_record_batch(&[10, 20, 30]);
458 estimator.update_flat(&batch1);
459
460 let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
462 estimator.update_flat(&batch2);
463
464 let batch3 = new_flat_record_batch(&[30, 35]);
466 estimator.update_flat(&batch3);
467
468 assert_eq!(3, estimator.finish());
470 }
471
472 #[test]
473 fn test_series_estimator_flat_descending_timestamps() {
474 let mut estimator = SeriesEstimator::default();
475 let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
477 estimator.update_flat(&record_batch);
478 assert_eq!(5, estimator.finish());
480 }
481
482 #[test]
483 fn test_series_estimator_flat_finish_resets_state() {
484 let mut estimator = SeriesEstimator::default();
485
486 let batch1 = new_flat_record_batch(&[1, 2, 3]);
487 estimator.update_flat(&batch1);
488
489 assert_eq!(1, estimator.finish());
490
491 let batch2 = new_flat_record_batch(&[4, 5, 6]);
493 estimator.update_flat(&batch2);
494
495 assert_eq!(1, estimator.finish());
496 }
497}