1use std::sync::Arc;
18
19use api::v1::SemanticType;
20use common_base::readable_size::ReadableSize;
21use datatypes::arrow::datatypes::{
22 DataType as ArrowDataType, Field, FieldRef, Fields, Schema, SchemaRef,
23};
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use datatypes::timestamp::timestamp_array_to_primitive;
27use serde::{Deserialize, Serialize};
28use store_api::codec::PrimaryKeyEncoding;
29use store_api::metadata::RegionMetadata;
30use store_api::storage::consts::{
31 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
32};
33
34use crate::sst::parquet::flat_format::time_index_column_index;
35
36pub mod file;
37pub mod file_purger;
38pub mod file_ref;
39pub mod index;
40pub mod location;
41pub mod parquet;
42pub(crate) mod version;
43
44pub const DEFAULT_WRITE_BUFFER_SIZE: ReadableSize = ReadableSize::mb(8);
46
47pub const DEFAULT_WRITE_CONCURRENCY: usize = 8;
49
50#[derive(Debug, Default, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, strum::EnumString)]
52#[serde(rename_all = "snake_case")]
53#[strum(serialize_all = "snake_case")]
54pub enum FormatType {
55 #[default]
57 PrimaryKey,
58 Flat,
60}
61
62pub fn to_sst_arrow_schema(metadata: &RegionMetadata) -> SchemaRef {
64 let fields = Fields::from_iter(
65 metadata
66 .schema
67 .arrow_schema()
68 .fields()
69 .iter()
70 .zip(&metadata.column_metadatas)
71 .filter_map(|(field, column_meta)| {
72 if column_meta.semantic_type == SemanticType::Field {
73 Some(field.clone())
74 } else {
75 None
77 }
78 })
79 .chain([metadata.time_index_field()])
80 .chain(internal_fields()),
81 );
82
83 Arc::new(Schema::new(fields))
84}
85
86pub struct FlatSchemaOptions {
88 pub raw_pk_columns: bool,
90 pub string_pk_use_dict: bool,
94}
95
96impl Default for FlatSchemaOptions {
97 fn default() -> Self {
98 Self {
99 raw_pk_columns: true,
100 string_pk_use_dict: true,
101 }
102 }
103}
104
105impl FlatSchemaOptions {
106 pub fn from_encoding(encoding: PrimaryKeyEncoding) -> Self {
108 if encoding == PrimaryKeyEncoding::Dense {
109 Self::default()
110 } else {
111 Self {
112 raw_pk_columns: false,
113 string_pk_use_dict: false,
114 }
115 }
116 }
117}
118
119pub fn to_flat_sst_arrow_schema(
129 metadata: &RegionMetadata,
130 options: &FlatSchemaOptions,
131) -> SchemaRef {
132 let num_fields = flat_sst_arrow_schema_column_num(metadata, options);
133 let mut fields = Vec::with_capacity(num_fields);
134 let schema = metadata.schema.arrow_schema();
135 if options.raw_pk_columns {
136 for pk_id in &metadata.primary_key {
137 let pk_index = metadata.column_index_by_id(*pk_id).unwrap();
138 if options.string_pk_use_dict {
139 let old_field = &schema.fields[pk_index];
140 let new_field = tag_maybe_to_dictionary_field(
141 &metadata.column_metadatas[pk_index].column_schema.data_type,
142 old_field,
143 );
144 fields.push(new_field);
145 }
146 }
147 }
148 let remaining_fields = schema
149 .fields()
150 .iter()
151 .zip(&metadata.column_metadatas)
152 .filter_map(|(field, column_meta)| {
153 if column_meta.semantic_type == SemanticType::Field {
154 Some(field.clone())
155 } else {
156 None
157 }
158 })
159 .chain([metadata.time_index_field()])
160 .chain(internal_fields());
161 for field in remaining_fields {
162 fields.push(field);
163 }
164
165 Arc::new(Schema::new(fields))
166}
167
168pub fn flat_sst_arrow_schema_column_num(
170 metadata: &RegionMetadata,
171 options: &FlatSchemaOptions,
172) -> usize {
173 if options.raw_pk_columns {
174 metadata.column_metadatas.len() + 3
175 } else {
176 metadata.column_metadatas.len() + 3 - metadata.primary_key.len()
177 }
178}
179
180fn to_dictionary_field(field: &Field) -> Field {
182 Field::new_dictionary(
183 field.name(),
184 datatypes::arrow::datatypes::DataType::UInt32,
185 field.data_type().clone(),
186 field.is_nullable(),
187 )
188}
189
190pub(crate) fn tag_maybe_to_dictionary_field(
192 data_type: &ConcreteDataType,
193 field: &Arc<Field>,
194) -> Arc<Field> {
195 if data_type.is_string() {
196 Arc::new(to_dictionary_field(field))
197 } else {
198 field.clone()
199 }
200}
201
202pub(crate) fn internal_fields() -> [FieldRef; 3] {
204 [
206 Arc::new(Field::new_dictionary(
207 PRIMARY_KEY_COLUMN_NAME,
208 ArrowDataType::UInt32,
209 ArrowDataType::Binary,
210 false,
211 )),
212 Arc::new(Field::new(
213 SEQUENCE_COLUMN_NAME,
214 ArrowDataType::UInt64,
215 false,
216 )),
217 Arc::new(Field::new(OP_TYPE_COLUMN_NAME, ArrowDataType::UInt8, false)),
218 ]
219}
220
221#[derive(Default)]
226pub(crate) struct SeriesEstimator {
227 last_timestamp: Option<i64>,
229 series_count: u64,
231}
232
233impl SeriesEstimator {
234 pub(crate) fn update_flat(&mut self, record_batch: &RecordBatch) {
238 let batch_rows = record_batch.num_rows();
239 if batch_rows == 0 {
240 return;
241 }
242
243 let time_index_pos = time_index_column_index(record_batch.num_columns());
244 let timestamps = record_batch.column(time_index_pos);
245 let Some((ts_values, _unit)) = timestamp_array_to_primitive(timestamps) else {
246 return;
247 };
248 let values = ts_values.values();
249
250 if let Some(last_ts) = self.last_timestamp {
252 if values[0] <= last_ts {
253 self.series_count += 1;
254 }
255 } else {
256 self.series_count = 1;
258 }
259
260 for i in 0..batch_rows - 1 {
262 if values[i] >= values[i + 1] {
265 self.series_count += 1;
266 }
267 }
268
269 self.last_timestamp = Some(values[batch_rows - 1]);
271 }
272
273 pub(crate) fn finish(&mut self) -> u64 {
275 self.last_timestamp = None;
276 let count = self.series_count;
277 self.series_count = 0;
278
279 count
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use std::sync::Arc;
286
287 use datatypes::arrow::array::{
288 BinaryArray, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array,
289 UInt64Array,
290 };
291 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
292 use datatypes::arrow::record_batch::RecordBatch;
293
294 use super::*;
295
296 fn new_flat_record_batch(timestamps: &[i64]) -> RecordBatch {
297 let num_cols = 4; let time_index_pos = time_index_column_index(num_cols);
300 assert_eq!(time_index_pos, 0); let time_array = Arc::new(TimestampMillisecondArray::from(timestamps.to_vec()));
303 let pk_array = Arc::new(DictionaryArray::new(
304 UInt32Array::from(vec![0; timestamps.len()]),
305 Arc::new(BinaryArray::from(vec![b"test".as_slice()])),
306 ));
307 let seq_array = Arc::new(UInt64Array::from(vec![1; timestamps.len()]));
308 let op_array = Arc::new(UInt8Array::from(vec![1; timestamps.len()]));
309
310 let schema = Arc::new(Schema::new(vec![
311 Field::new(
312 "time",
313 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
314 false,
315 ),
316 Field::new_dictionary(
317 "__primary_key",
318 ArrowDataType::UInt32,
319 ArrowDataType::Binary,
320 false,
321 ),
322 Field::new("__sequence", ArrowDataType::UInt64, false),
323 Field::new("__op_type", ArrowDataType::UInt8, false),
324 ]));
325
326 RecordBatch::try_new(schema, vec![time_array, pk_array, seq_array, op_array]).unwrap()
327 }
328
329 #[test]
330 fn test_series_estimator_flat_empty_batch() {
331 let mut estimator = SeriesEstimator::default();
332 let record_batch = new_flat_record_batch(&[]);
333 estimator.update_flat(&record_batch);
334 assert_eq!(0, estimator.finish());
335 }
336
337 #[test]
338 fn test_series_estimator_flat_single_batch() {
339 let mut estimator = SeriesEstimator::default();
340 let record_batch = new_flat_record_batch(&[1, 2, 3]);
341 estimator.update_flat(&record_batch);
342 assert_eq!(1, estimator.finish());
343 }
344
345 #[test]
346 fn test_series_estimator_flat_series_boundary_within_batch() {
347 let mut estimator = SeriesEstimator::default();
348 let record_batch = new_flat_record_batch(&[1, 2, 3, 2, 4, 5]);
350 estimator.update_flat(&record_batch);
351 assert_eq!(2, estimator.finish());
353 }
354
355 #[test]
356 fn test_series_estimator_flat_multiple_boundaries_within_batch() {
357 let mut estimator = SeriesEstimator::default();
358 let record_batch = new_flat_record_batch(&[1, 2, 5, 4, 6, 3, 7]);
360 estimator.update_flat(&record_batch);
361 assert_eq!(3, estimator.finish());
362 }
363
364 #[test]
365 fn test_series_estimator_flat_equal_timestamps() {
366 let mut estimator = SeriesEstimator::default();
367 let record_batch = new_flat_record_batch(&[1, 2, 2, 3, 3, 3, 4]);
369 estimator.update_flat(&record_batch);
370 assert_eq!(4, estimator.finish());
372 }
373
374 #[test]
375 fn test_series_estimator_flat_multiple_batches_continuation() {
376 let mut estimator = SeriesEstimator::default();
377
378 let batch1 = new_flat_record_batch(&[1, 2, 3]);
380 estimator.update_flat(&batch1);
381
382 let batch2 = new_flat_record_batch(&[4, 5, 6]);
384 estimator.update_flat(&batch2);
385
386 assert_eq!(1, estimator.finish());
387 }
388
389 #[test]
390 fn test_series_estimator_flat_multiple_batches_new_series() {
391 let mut estimator = SeriesEstimator::default();
392
393 let batch1 = new_flat_record_batch(&[1, 2, 3]);
395 estimator.update_flat(&batch1);
396
397 let batch2 = new_flat_record_batch(&[2, 3, 4]);
399 estimator.update_flat(&batch2);
400
401 assert_eq!(2, estimator.finish());
402 }
403
404 #[test]
405 fn test_series_estimator_flat_boundary_at_batch_edge_equal() {
406 let mut estimator = SeriesEstimator::default();
407
408 let batch1 = new_flat_record_batch(&[1, 2, 5]);
410 estimator.update_flat(&batch1);
411
412 let batch2 = new_flat_record_batch(&[5, 6, 7]);
414 estimator.update_flat(&batch2);
415
416 assert_eq!(2, estimator.finish());
417 }
418
419 #[test]
420 fn test_series_estimator_flat_mixed_batches() {
421 let mut estimator = SeriesEstimator::default();
422
423 let batch1 = new_flat_record_batch(&[10, 20, 30]);
425 estimator.update_flat(&batch1);
426
427 let batch2 = new_flat_record_batch(&[5, 15, 10, 25]);
429 estimator.update_flat(&batch2);
430
431 let batch3 = new_flat_record_batch(&[30, 35]);
433 estimator.update_flat(&batch3);
434
435 assert_eq!(3, estimator.finish());
437 }
438
439 #[test]
440 fn test_series_estimator_flat_descending_timestamps() {
441 let mut estimator = SeriesEstimator::default();
442 let record_batch = new_flat_record_batch(&[10, 9, 8, 7, 6]);
444 estimator.update_flat(&record_batch);
445 assert_eq!(5, estimator.finish());
447 }
448
449 #[test]
450 fn test_series_estimator_flat_finish_resets_state() {
451 let mut estimator = SeriesEstimator::default();
452
453 let batch1 = new_flat_record_batch(&[1, 2, 3]);
454 estimator.update_flat(&batch1);
455
456 assert_eq!(1, estimator.finish());
457
458 let batch2 = new_flat_record_batch(&[4, 5, 6]);
460 estimator.update_flat(&batch2);
461
462 assert_eq!(1, estimator.finish());
463 }
464}