1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use arrow_schema::DataType;
22use common_base::readable_size::ReadableSize;
23use datatypes::arrow::datatypes::{
24 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
25};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::ConcreteDataType;
28use datatypes::timestamp::timestamp_array_to_primitive;
29use serde::{Deserialize, Serialize};
30use store_api::codec::PrimaryKeyEncoding;
31use store_api::metadata::RegionMetadata;
32use store_api::storage::consts::{
33 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
34};
35
36use crate::sst::parquet::flat_format::time_index_column_index;
37
38pub mod file;
39pub mod file_purger;
40pub mod file_ref;
41pub mod index;
42pub mod location;
43pub mod parquet;
44pub(crate) mod version;
45
46pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
48
49pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
51
52#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
54#[serde(rename_all = "snake_case")]
55#[strum(serialize_all = "snake_case")]
56pub enum FormatType {
57 #[default]
59 PrimaryKey,
60 Flat,
62}
63
64pub const PARQUET_FIELD_ID_KEY: &str = "PARQUET:field_id";
66
67pub fn with_field_id(mut field: Field, column_id: u32) -> Field {
69 field
70 .metadata_mut()
71 .insert(PARQUET_FIELD_ID_KEY.to_string(), column_id.to_string());
72 field
73}
74
75pub(crate) const INTERNAL_PARQUET_FIELD_ID_BASE: u32 = 1 << 30;
78
79pub(crate) const PRIMARY_KEY_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE;
81pub(crate) const SEQUENCE_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE + 1;
83pub(crate) const OP_TYPE_PARQUET_FIELD_ID: u32 = INTERNAL_PARQUET_FIELD_ID_BASE + 2;
85
86pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
88 let fields = Fields::from_iter(
89 metadata
90 .schema
91 .arrow_schema()
92 .fields()
93 .iter()
94 .zip(&metadata.column_metadatas)
95 .filter_map(|(field, column_meta)| {
96 if column_meta.semantic_type == SemanticType::Field {
97 Some(Arc::new(with_field_id(
98 (**field).clone(),
99 column_meta.column_id,
100 )))
101 } else {
102 None
104 }
105 })
106 .chain([Arc::new(with_field_id(
107 (*metadata.time_index_field()).clone(),
108 metadata.time_index_column().column_id,
109 ))])
110 .chain(internal_fields()),
111 );
112
113 Arc::new(Schema::new(fields))
114}
115
116pub struct FlatSchemaOptions {
118 pub raw_pk_columns: bool,
120 pub string_pk_use_dict: bool,
124 pub concretized_json_types: HashMap<String, DataType>,
127}
128
129impl Default for FlatSchemaOptions {
130 fn default() -> Self {
131 Self {
132 raw_pk_columns: true,
133 string_pk_use_dict: true,
134 concretized_json_types: HashMap::new(),
135 }
136 }
137}
138
139impl FlatSchemaOptions {
140 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
142 if encoding == PrimaryKeyEncoding::Dense {
143 Self::default()
144 } else {
145 Self {
146 raw_pk_columns: false,
147 string_pk_use_dict: false,
148 concretized_json_types: HashMap::new(),
149 }
150 }
151 }
152}
153
154pub fn to_flat_sst_arrow_schema(
164 metadata: &RegionMetadata,
165 options: &FlatSchemaOptions,
166) -> SchemaRef {
167 let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
168 let mut fields = Vec::with_capacity(num_fields);
169 let schema = metadata.schema.arrow_schema();
170 if options.raw_pk_columns {
171 for pk_id in &metadata.primary_key {
172 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
173 let column_id = metadata.column_metadatas[pk_index].column_id;
174 if options.string_pk_use_dict {
175 let old_field = &schema.fields[pk_index];
176 let new_field = tag_maybe_to_dictionary_field(
177 &metadata.column_metadatas[pk_index].column_schema.data_type,
178 old_field,
179 );
180 let new_field = concretize_json_type(new_field, options);
181 fields.push(Arc::new(with_field_id((*new_field).clone(), column_id)));
182 }
183 }
184 }
185 let remaining_fields = schema
186 .fields()
187 .iter()
188 .zip(&metadata.column_metadatas)
189 .filter_map(|(field, column_meta)| {
190 if column_meta.semantic_type == SemanticType::Field {
191 let field = concretize_json_type(field.clone(), options);
192 Some(Arc::new(with_field_id(
193 Arc::unwrap_or_clone(field),
194 column_meta.column_id,
195 )))
196 } else {
197 None
198 }
199 })
200 .chain([Arc::new(with_field_id(
201 (*metadata.time_index_field()).clone(),
202 metadata.time_index_column().column_id,
203 ))])
204 .chain(internal_fields());
205 for field in remaining_fields {
206 fields.push(field);
207 }
208
209 Arc::new(Schema::new(fields))
210}
211
212fn concretize_json_type(field: Arc<Field>, options: &FlatSchemaOptions) -> Arc<Field> {
213 if let Some(data_type) = options.concretized_json_types.get(field.name()) {
214 let mut field = Arc::unwrap_or_clone(field);
215 field.set_data_type(data_type.clone());
216 Arc::new(field)
217 } else {
218 field
219 }
220}
221
222pub fn flat_sst_arrow_schema_column_num(
224 metadata: &RegionMetadata,
225 options: &FlatSchemaOptions,
226) -> usize {
227 if options.raw_pk_columns {
228 metadata.column_metadatas.len() + 3
229 } else {
230 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
231 }
232}
233
234fn to_dictionary_field(field: &Field) -> Field {
236 let mut new_field = Field::new_dictionary(
237 field.name(),
238 datatypes::arrow::datatypes::DataType::UInt32,
239 field.data_type().clone(),
240 field.is_nullable(),
241 );
242
243 if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
245 new_field
246 .metadata_mut()
247 .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
248 }
249
250 new_field
251}
252
253pub(crate) fn tag_maybe_to_dictionary_field(
255 data_type: &ConcreteDataType,
256 field: &Arc<Field>,
257) -> Arc<Field> {
258 if data_type.is_string() {
259 Arc::new(to_dictionary_field(field))
260 } else {
261 field.clone()
262 }
263}
264
265pub(crate) fn internal_fields() -> [FieldRef; 3] {
267 [
269 Arc::new(with_field_id(
270 Field::new_dictionary(
271 PRIMARY_KEY_COLUMN_NAME,
272 ArrowDataType::UInt32,
273 ArrowDataType::Binary,
274 false,
275 ),
276 PRIMARY_KEY_PARQUET_FIELD_ID,
277 )),
278 Arc::new(with_field_id(
279 Field::new(SEQUENCE_COLUMN_NAME, ArrowDataType::UInt64, false),
280 SEQUENCE_PARQUET_FIELD_ID,
281 )),
282 Arc::new(with_field_id(
283 Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false),
284 OP_TYPE_PARQUET_FIELD_ID,
285 )),
286 ]
287}
288
289pub(crate) fn override_pk_field_to_binary(schema: &SchemaRef) -> SchemaRef {
291 let new_fields = schema
292 .fields()
293 .iter()
294 .map(|field| {
295 if field.name() == PRIMARY_KEY_COLUMN_NAME {
296 let mut new_field = Field::new(
297 PRIMARY_KEY_COLUMN_NAME,
298 ArrowDataType::Binary,
299 field.is_nullable(),
300 );
301 if let Some(field_id) = field.metadata().get(PARQUET_FIELD_ID_KEY) {
304 new_field
305 .metadata_mut()
306 .insert(PARQUET_FIELD_ID_KEY.to_string(), field_id.clone());
307 }
308 Arc::new(new_field)
309 } else {
310 field.clone()
311 }
312 })
313 .collect::<Vec<_>>();
314 Arc::new(Schema::new(new_fields))
315}
316
317#[derive(Default)]
322pub(crate) struct SeriesEstimator {
323 last_timestamp: Option<i64>,
325 series_count: u64,
327}
328
329impl SeriesEstimator {
330 pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
334 let batch_rows = record_batch.num_rows();
335 if batch_rows == 0 {
336 return;
337 }
338
339 let time_index_pos = time_index_column_index(record_batch.num_columns());
340 let timestamps = record_batch.column(time_index_pos);
341 let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
342 return;
343 };
344 let values = ts_values.values();
345
346 if let Some(last_ts) = self.last_timestamp {
348 if values[0] <= last_ts {
349 self.series_count += 1;
350 }
351 } else {
352 self.series_count = 1;
354 }
355
356 for i in 0..batch_rows - 1 {
358 if values[i] >= values[i + 1] {
361 self.series_count += 1;
362 }
363 }
364
365 self.last_timestamp = Some(values[batch_rows - 1]);
367 }
368
369 pub(crate) fn finish(&mut self) -> u64 {
371 self.last_timestamp = None;
372 let count = self.series_count;
373 self.series_count = 0;
374
375 count
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use std::sync::Arc;
382
383 use datatypes::arrow::array::{
384 BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
385 UInt64Array,
386 };
387 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
388 use datatypes::arrow::record_batch::RecordBatch;
389
390 use super::*;
391
392 fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
393 let num_cols = 4; let time_index_pos = time_index_column_index(num_cols);
396 assert_eq!(time_index_pos, 0); let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
399 let pk_array = Arc::new(DictionaryArray::new(
400 UInt32Array::from(vec![0; timestamps.len()]),
401 Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
402 ));
403 let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
404 let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
405
406 let schema = Arc::new(Schema::new(vec![
407 Field::new(
408 "time",
409 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
410 false,
411 ),
412 Field::new_dictionary(
413 "__primary_key",
414 ArrowDataType::UInt32,
415 ArrowDataType::Binary,
416 false,
417 ),
418 Field::new("__sequence", ArrowDataType::UInt64, false),
419 Field::new("__op_type", ArrowDataType::UInt8, false),
420 ]));
421
422 RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
423 }
424
425 #[test]
426 fn test_series_estimator_flat_empty_batch() {
427 let mut estimator = SeriesEstimator::default();
428 let record_batch = new_flat_record_batch(&[]);
429 estimator.update_flat(&record_batch);
430 assert_eq!(0, estimator.finish());
431 }
432
433 #[test]
434 fn test_series_estimator_flat_single_batch() {
435 let mut estimator = SeriesEstimator::default();
436 let record_batch = new_flat_record_batch(&[1, 2, 3]);
437 estimator.update_flat(&record_batch);
438 assert_eq!(1, estimator.finish());
439 }
440
441 #[test]
442 fn test_series_estimator_flat_series_boundary_within_batch() {
443 let mut estimator = SeriesEstimator::default();
444 let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
446 estimator.update_flat(&record_batch);
447 assert_eq!(2, estimator.finish());
449 }
450
451 #[test]
452 fn test_series_estimator_flat_multiple_boundaries_within_batch() {
453 let mut estimator = SeriesEstimator::default();
454 let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
456 estimator.update_flat(&record_batch);
457 assert_eq!(3, estimator.finish());
458 }
459
460 #[test]
461 fn test_series_estimator_flat_equal_timestamps() {
462 let mut estimator = SeriesEstimator::default();
463 let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
465 estimator.update_flat(&record_batch);
466 assert_eq!(4, estimator.finish());
468 }
469
470 #[test]
471 fn test_series_estimator_flat_multiple_batches_continuation() {
472 let mut estimator = SeriesEstimator::default();
473
474 let batch1 = new_flat_record_batch(&[1, 2, 3]);
476 estimator.update_flat(&batch1);
477
478 let batch2 = new_flat_record_batch(&[4, 5, 6]);
480 estimator.update_flat(&batch2);
481
482 assert_eq!(1, estimator.finish());
483 }
484
485 #[test]
486 fn test_series_estimator_flat_multiple_batches_new_series() {
487 let mut estimator = SeriesEstimator::default();
488
489 let batch1 = new_flat_record_batch(&[1, 2, 3]);
491 estimator.update_flat(&batch1);
492
493 let batch2 = new_flat_record_batch(&[2, 3, 4]);
495 estimator.update_flat(&batch2);
496
497 assert_eq!(2, estimator.finish());
498 }
499
500 #[test]
501 fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
502 let mut estimator = SeriesEstimator::default();
503
504 let batch1 = new_flat_record_batch(&[1, 2, 5]);
506 estimator.update_flat(&batch1);
507
508 let batch2 = new_flat_record_batch(&[5, 6, 7]);
510 estimator.update_flat(&batch2);
511
512 assert_eq!(2, estimator.finish());
513 }
514
515 #[test]
516 fn test_series_estimator_flat_mixed_batches() {
517 let mut estimator = SeriesEstimator::default();
518
519 let batch1 = new_flat_record_batch(&[10, 20, 30]);
521 estimator.update_flat(&batch1);
522
523 let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
525 estimator.update_flat(&batch2);
526
527 let batch3 = new_flat_record_batch(&[30, 35]);
529 estimator.update_flat(&batch3);
530
531 assert_eq!(3, estimator.finish());
533 }
534
535 #[test]
536 fn test_series_estimator_flat_descending_timestamps() {
537 let mut estimator = SeriesEstimator::default();
538 let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
540 estimator.update_flat(&record_batch);
541 assert_eq!(5, estimator.finish());
543 }
544
545 #[test]
546 fn test_series_estimator_flat_finish_resets_state() {
547 let mut estimator = SeriesEstimator::default();
548
549 let batch1 = new_flat_record_batch(&[1, 2, 3]);
550 estimator.update_flat(&batch1);
551
552 assert_eq!(1, estimator.finish());
553
554 let batch2 = new_flat_record_batch(&[4, 5, 6]);
556 estimator.update_flat(&batch2);
557
558 assert_eq!(1, estimator.finish());
559 }
560}