1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::read::Batch;
35use crate::sst::parquet::flat_format::time_index_column_index;
36
37pub mod file;
38pub mod file_purger;
39pub mod file_ref;
40pub mod index;
41pub mod location;
42pub mod parquet;
43pub(crate) mod version;
44
45pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
47
48pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
50
51#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
53#[serde(rename_all = "snake_case")]
54#[strum(serialize_all = "snake_case")]
55pub enum FormatType {
56 #[default]
58 PrimaryKey,
59 Flat,
61}
62
63pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
65 let fields = Fields::from_iter(
66 metadata
67 .schema
68 .arrow_schema()
69 .fields()
70 .iter()
71 .zip(&metadata.column_metadatas)
72 .filter_map(|(field, column_meta)| {
73 if column_meta.semantic_type == SemanticType::Field {
74 Some(field.clone())
75 } else {
76 None
78 }
79 })
80 .chain([metadata.time_index_field()])
81 .chain(internal_fields()),
82 );
83
84 Arc::new(Schema::new(fields))
85}
86
87pub struct FlatSchemaOptions {
89 pub raw_pk_columns: bool,
91 pub string_pk_use_dict: bool,
95}
96
97impl Default for FlatSchemaOptions {
98 fn default() -> Self {
99 Self {
100 raw_pk_columns: true,
101 string_pk_use_dict: true,
102 }
103 }
104}
105
106impl FlatSchemaOptions {
107 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
109 if encoding == PrimaryKeyEncoding::Dense {
110 Self::default()
111 } else {
112 Self {
113 raw_pk_columns: false,
114 string_pk_use_dict: false,
115 }
116 }
117 }
118}
119
120pub fn to_flat_sst_arrow_schema(
130 metadata: &RegionMetadata,
131 options: &FlatSchemaOptions,
132) -> SchemaRef {
133 let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
134 let mut fields = Vec::with_capacity(num_fields);
135 let schema = metadata.schema.arrow_schema();
136 if options.raw_pk_columns {
137 for pk_id in &metadata.primary_key {
138 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
139 if options.string_pk_use_dict {
140 let old_field = &schema.fields[pk_index];
141 let new_field = tag_maybe_to_dictionary_field(
142 &metadata.column_metadatas[pk_index].column_schema.data_type,
143 old_field,
144 );
145 fields.push(new_field);
146 }
147 }
148 }
149 let remaining_fields = schema
150 .fields()
151 .iter()
152 .zip(&metadata.column_metadatas)
153 .filter_map(|(field, column_meta)| {
154 if column_meta.semantic_type == SemanticType::Field {
155 Some(field.clone())
156 } else {
157 None
158 }
159 })
160 .chain([metadata.time_index_field()])
161 .chain(internal_fields());
162 for field in remaining_fields {
163 fields.push(field);
164 }
165
166 Arc::new(Schema::new(fields))
167}
168
169pub fn flat_sst_arrow_schema_column_num(
171 metadata: &RegionMetadata,
172 options: &FlatSchemaOptions,
173) -> usize {
174 if options.raw_pk_columns {
175 metadata.column_metadatas.len() + 3
176 } else {
177 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
178 }
179}
180
181fn to_dictionary_field(field: &Field) -> Field {
183 Field::new_dictionary(
184 field.name(),
185 datatypes::arrow::datatypes::DataType::UInt32,
186 field.data_type().clone(),
187 field.is_nullable(),
188 )
189}
190
191pub(crate) fn tag_maybe_to_dictionary_field(
193 data_type: &ConcreteDataType,
194 field: &Arc<Field>,
195) -> Arc<Field> {
196 if data_type.is_string() {
197 Arc::new(to_dictionary_field(field))
198 } else {
199 field.clone()
200 }
201}
202
203pub(crate) fn internal_fields() -> [FieldRef; 3] {
205 [
207 Arc::new(Field::new_dictionary(
208 PRIMARY_KEY_COLUMN_NAME,
209 ArrowDataType::UInt32,
210 ArrowDataType::Binary,
211 false,
212 )),
213 Arc::new(Field::new(
214 SEQUENCE_COLUMN_NAME,
215 ArrowDataType::UInt64,
216 false,
217 )),
218 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
219 ]
220}
221
222pub fn to_plain_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
224 let fields = Fields::from_iter(
225 metadata
226 .schema
227 .arrow_schema()
228 .fields()
229 .iter()
230 .cloned()
231 .chain(plain_internal_fields()),
232 );
233
234 Arc::new(Schema::new(fields))
235}
236
237fn plain_internal_fields() -> [FieldRef; 2] {
239 [
241 Arc::new(Field::new(
242 SEQUENCE_COLUMN_NAME,
243 ArrowDataType::UInt64,
244 false,
245 )),
246 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
247 ]
248}
249
250#[derive(Default)]
255pub(crate) struct SeriesEstimator {
256 last_timestamp: Option<i64>,
258 series_count: u64,
260}
261
262impl SeriesEstimator {
263 pub(crate) fn update(&mut self, batch: &Batch) {
268 let Some(last_ts) = batch.last_timestamp() else {
269 return;
270 };
271
272 if let Some(prev_last_ts) = self.last_timestamp {
274 if let Some(first_ts) = batch.first_timestamp()
277 && first_ts.value() <= prev_last_ts
278 {
279 self.series_count += 1;
280 }
281 } else {
282 self.series_count = 1;
284 }
285
286 self.last_timestamp = Some(last_ts.value());
288 }
289
290 pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
294 let batch_rows = record_batch.num_rows();
295 if batch_rows == 0 {
296 return;
297 }
298
299 let time_index_pos = time_index_column_index(record_batch.num_columns());
300 let timestamps = record_batch.column(time_index_pos);
301 let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
302 return;
303 };
304 let values = ts_values.values();
305
306 if let Some(last_ts) = self.last_timestamp {
308 if values[0] <= last_ts {
309 self.series_count += 1;
310 }
311 } else {
312 self.series_count = 1;
314 }
315
316 for i in 0..batch_rows - 1 {
318 if values[i] >= values[i + 1] {
321 self.series_count += 1;
322 }
323 }
324
325 self.last_timestamp = Some(values[batch_rows - 1]);
327 }
328
329 pub(crate) fn finish(&mut self) -> u64 {
331 self.last_timestamp = None;
332 let count = self.series_count;
333 self.series_count = 0;
334
335 count
336 }
337}
338
339#[cfg(test)]
340mod tests {
341 use std::sync::Arc;
342
343 use api::v1::OpType;
344 use datatypes::arrow::array::{
345 BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt8Builder,
346 UInt32Array, UInt64Array,
347 };
348 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
349 use datatypes::arrow::record_batch::RecordBatch;
350
351 use super::*;
352 use crate::read::{Batch, BatchBuilder};
353
354 fn new_batch(
355 primary_key: &[u8],
356 timestamps: &[i64],
357 sequences: &[u64],
358 op_types: &[OpType],
359 ) -> Batch {
360 let timestamps = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
361 let sequences = Arc::new(UInt64Array::from(sequences.to_vec()));
362 let mut op_type_builder = UInt8Builder::with_capacity(op_types.len());
363 for op_type in op_types {
364 op_type_builder.append_value(*op_type as u8);
365 }
366 let op_types = Arc::new(UInt8Array::from(
367 op_types.iter().map(|op| *op as u8).collect::<Vec<_>>(),
368 ));
369
370 let mut builder = BatchBuilder::new(primary_key.to_vec());
371 builder
372 .timestamps_array(timestamps)
373 .unwrap()
374 .sequences_array(sequences)
375 .unwrap()
376 .op_types_array(op_types)
377 .unwrap();
378 builder.build().unwrap()
379 }
380
381 fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
382 let num_cols = 4; let time_index_pos = time_index_column_index(num_cols);
385 assert_eq!(time_index_pos, 0); let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
388 let pk_array = Arc::new(DictionaryArray::new(
389 UInt32Array::from(vec![0; timestamps.len()]),
390 Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
391 ));
392 let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
393 let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
394
395 let schema = Arc::new(Schema::new(vec![
396 Field::new(
397 "time",
398 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
399 false,
400 ),
401 Field::new_dictionary(
402 "__primary_key",
403 ArrowDataType::UInt32,
404 ArrowDataType::Binary,
405 false,
406 ),
407 Field::new("__sequence", ArrowDataType::UInt64, false),
408 Field::new("__op_type", ArrowDataType::UInt8, false),
409 ]));
410
411 RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
412 }
413
414 #[test]
415 fn test_series_estimator_empty_batch() {
416 let mut estimator = SeriesEstimator::default();
417 let batch = new_batch(b"test", &[], &[], &[]);
418 estimator.update(&batch);
419 assert_eq!(0, estimator.finish());
420 }
421
422 #[test]
423 fn test_series_estimator_single_batch() {
424 let mut estimator = SeriesEstimator::default();
425 let batch = new_batch(
426 b"test",
427 &[1, 2, 3],
428 &[1, 2, 3],
429 &[OpType::Put, OpType::Put, OpType::Put],
430 );
431 estimator.update(&batch);
432 assert_eq!(1, estimator.finish());
433 }
434
435 #[test]
436 fn test_series_estimator_multiple_batches_same_series() {
437 let mut estimator = SeriesEstimator::default();
438
439 let batch1 = new_batch(
441 b"test",
442 &[1, 2, 3],
443 &[1, 2, 3],
444 &[OpType::Put, OpType::Put, OpType::Put],
445 );
446 estimator.update(&batch1);
447
448 let batch2 = new_batch(
450 b"test",
451 &[4, 5, 6],
452 &[4, 5, 6],
453 &[OpType::Put, OpType::Put, OpType::Put],
454 );
455 estimator.update(&batch2);
456
457 assert_eq!(1, estimator.finish());
458 }
459
460 #[test]
461 fn test_series_estimator_new_series_detected() {
462 let mut estimator = SeriesEstimator::default();
463
464 let batch1 = new_batch(
466 b"pk0",
467 &[1, 2, 3],
468 &[1, 2, 3],
469 &[OpType::Put, OpType::Put, OpType::Put],
470 );
471 estimator.update(&batch1);
472
473 let batch2 = new_batch(
475 b"pk1",
476 &[2, 3, 4],
477 &[4, 5, 6],
478 &[OpType::Put, OpType::Put, OpType::Put],
479 );
480 estimator.update(&batch2);
481
482 assert_eq!(2, estimator.finish());
483 }
484
485 #[test]
486 fn test_series_estimator_equal_timestamp_boundary() {
487 let mut estimator = SeriesEstimator::default();
488
489 let batch1 = new_batch(
491 b"test",
492 &[1, 2, 5],
493 &[1, 2, 3],
494 &[OpType::Put, OpType::Put, OpType::Put],
495 );
496 estimator.update(&batch1);
497
498 let batch2 = new_batch(
500 b"test",
501 &[5, 6, 7],
502 &[4, 5, 6],
503 &[OpType::Put, OpType::Put, OpType::Put],
504 );
505 estimator.update(&batch2);
506
507 assert_eq!(2, estimator.finish());
508 }
509
510 #[test]
511 fn test_series_estimator_finish_resets_state() {
512 let mut estimator = SeriesEstimator::default();
513
514 let batch1 = new_batch(
515 b"test",
516 &[1, 2, 3],
517 &[1, 2, 3],
518 &[OpType::Put, OpType::Put, OpType::Put],
519 );
520 estimator.update(&batch1);
521
522 assert_eq!(1, estimator.finish());
523
524 let batch2 = new_batch(
526 b"test",
527 &[4, 5, 6],
528 &[4, 5, 6],
529 &[OpType::Put, OpType::Put, OpType::Put],
530 );
531 estimator.update(&batch2);
532
533 assert_eq!(1, estimator.finish());
534 }
535
536 #[test]
537 fn test_series_estimator_flat_empty_batch() {
538 let mut estimator = SeriesEstimator::default();
539 let record_batch = new_flat_record_batch(&[]);
540 estimator.update_flat(&record_batch);
541 assert_eq!(0, estimator.finish());
542 }
543
544 #[test]
545 fn test_series_estimator_flat_single_batch() {
546 let mut estimator = SeriesEstimator::default();
547 let record_batch = new_flat_record_batch(&[1, 2, 3]);
548 estimator.update_flat(&record_batch);
549 assert_eq!(1, estimator.finish());
550 }
551
552 #[test]
553 fn test_series_estimator_flat_series_boundary_within_batch() {
554 let mut estimator = SeriesEstimator::default();
555 let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
557 estimator.update_flat(&record_batch);
558 assert_eq!(2, estimator.finish());
560 }
561
562 #[test]
563 fn test_series_estimator_flat_multiple_boundaries_within_batch() {
564 let mut estimator = SeriesEstimator::default();
565 let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
567 estimator.update_flat(&record_batch);
568 assert_eq!(3, estimator.finish());
569 }
570
571 #[test]
572 fn test_series_estimator_flat_equal_timestamps() {
573 let mut estimator = SeriesEstimator::default();
574 let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
576 estimator.update_flat(&record_batch);
577 assert_eq!(4, estimator.finish());
579 }
580
581 #[test]
582 fn test_series_estimator_flat_multiple_batches_continuation() {
583 let mut estimator = SeriesEstimator::default();
584
585 let batch1 = new_flat_record_batch(&[1, 2, 3]);
587 estimator.update_flat(&batch1);
588
589 let batch2 = new_flat_record_batch(&[4, 5, 6]);
591 estimator.update_flat(&batch2);
592
593 assert_eq!(1, estimator.finish());
594 }
595
596 #[test]
597 fn test_series_estimator_flat_multiple_batches_new_series() {
598 let mut estimator = SeriesEstimator::default();
599
600 let batch1 = new_flat_record_batch(&[1, 2, 3]);
602 estimator.update_flat(&batch1);
603
604 let batch2 = new_flat_record_batch(&[2, 3, 4]);
606 estimator.update_flat(&batch2);
607
608 assert_eq!(2, estimator.finish());
609 }
610
611 #[test]
612 fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
613 let mut estimator = SeriesEstimator::default();
614
615 let batch1 = new_flat_record_batch(&[1, 2, 5]);
617 estimator.update_flat(&batch1);
618
619 let batch2 = new_flat_record_batch(&[5, 6, 7]);
621 estimator.update_flat(&batch2);
622
623 assert_eq!(2, estimator.finish());
624 }
625
626 #[test]
627 fn test_series_estimator_flat_mixed_batches() {
628 let mut estimator = SeriesEstimator::default();
629
630 let batch1 = new_flat_record_batch(&[10, 20, 30]);
632 estimator.update_flat(&batch1);
633
634 let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
636 estimator.update_flat(&batch2);
637
638 let batch3 = new_flat_record_batch(&[30, 35]);
640 estimator.update_flat(&batch3);
641
642 assert_eq!(3, estimator.finish());
644 }
645
646 #[test]
647 fn test_series_estimator_flat_descending_timestamps() {
648 let mut estimator = SeriesEstimator::default();
649 let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
651 estimator.update_flat(&record_batch);
652 assert_eq!(5, estimator.finish());
654 }
655
656 #[test]
657 fn test_series_estimator_flat_finish_resets_state() {
658 let mut estimator = SeriesEstimator::default();
659
660 let batch1 = new_flat_record_batch(&[1, 2, 3]);
661 estimator.update_flat(&batch1);
662
663 assert_eq!(1, estimator.finish());
664
665 let batch2 = new_flat_record_batch(&[4, 5, 6]);
667 estimator.update_flat(&batch2);
668
669 assert_eq!(1, estimator.finish());
670 }
671}