1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52 NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55 FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56 StatValues,
57};
58use crate::sst::{
59 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60 to_flat_sst_arrow_schema,
61};
62
63pub(crate) struct FlatWriteFormat {
65 arrow_schema: SchemaRef,
67 override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74 FlatWriteFormat {
75 arrow_schema,
76 override_sequence: None,
77 }
78 }
79
80 pub(crate) fn with_override_sequence(
82 mut self,
83 override_sequence: Option<SequenceNumber>,
84 ) -> Self {
85 self.override_sequence = override_sequence;
86 self
87 }
88
89 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91 &self.arrow_schema
92 }
93
94 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98 let Some(override_sequence) = self.override_sequence else {
99 return Ok(batch.clone());
100 };
101
102 let mut columns = batch.columns().to_vec();
103 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104 columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107 }
108}
109
110pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112 num_columns - 2
113}
114
115pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117 num_columns - 4
118}
119
120pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122 num_columns - 3
123}
124
125pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127 num_columns - 1
128}
129
130pub struct FlatReadFormat {
137 override_sequence: Option<SequenceNumber>,
139 parquet_adapter: ParquetAdapter,
141}
142
143impl FlatReadFormat {
144 pub fn new(
148 metadata: RegionMetadataRef,
149 column_ids: impl Iterator<Item = ColumnId>,
150 num_columns: Option<usize>,
151 file_path: &str,
152 skip_auto_convert: bool,
153 ) -> Result<FlatReadFormat> {
154 let is_legacy = match num_columns {
155 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
156 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
157 };
158
159 let parquet_adapter = if is_legacy {
160 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
162 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
164 metadata,
165 column_ids,
166 skip_auto_convert,
167 ))
168 } else {
169 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
170 metadata, column_ids, false,
171 ))
172 }
173 } else {
174 ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
175 };
176
177 Ok(FlatReadFormat {
178 override_sequence: None,
179 parquet_adapter,
180 })
181 }
182
183 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
185 self.override_sequence = sequence;
186 }
187
188 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
190 self.format_projection()
191 .column_id_to_projected_index
192 .get(&column_id)
193 .copied()
194 }
195
196 pub fn min_values(
198 &self,
199 row_groups: &[impl Borrow<RowGroupMetaData>],
200 column_id: ColumnId,
201 ) -> StatValues {
202 match &self.parquet_adapter {
203 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
204 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
205 }
206 }
207
208 pub fn max_values(
210 &self,
211 row_groups: &[impl Borrow<RowGroupMetaData>],
212 column_id: ColumnId,
213 ) -> StatValues {
214 match &self.parquet_adapter {
215 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
216 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
217 }
218 }
219
220 pub fn null_counts(
222 &self,
223 row_groups: &[impl Borrow<RowGroupMetaData>],
224 column_id: ColumnId,
225 ) -> StatValues {
226 match &self.parquet_adapter {
227 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
228 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
229 }
230 }
231
232 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
237 match &self.parquet_adapter {
238 ParquetAdapter::Flat(p) => &p.arrow_schema,
239 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
240 }
241 }
242
243 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
245 match &self.parquet_adapter {
246 ParquetAdapter::Flat(p) => &p.metadata,
247 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
248 }
249 }
250
251 pub(crate) fn projection_indices(&self) -> &[usize] {
253 match &self.parquet_adapter {
254 ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
255 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
256 }
257 }
258
259 pub(crate) fn format_projection(&self) -> &FormatProjection {
264 match &self.parquet_adapter {
265 ParquetAdapter::Flat(p) => &p.format_projection,
266 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
267 }
268 }
269
270 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
272 self.override_sequence
273 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
274 }
275
276 pub(crate) fn convert_batch(
281 &self,
282 record_batch: RecordBatch,
283 override_sequence_array: Option<&ArrayRef>,
284 ) -> Result<RecordBatch> {
285 let batch = match &self.parquet_adapter {
287 ParquetAdapter::Flat(_) => record_batch,
288 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
289 };
290
291 let Some(override_array) = override_sequence_array else {
293 return Ok(batch);
294 };
295
296 let mut columns = batch.columns().to_vec();
297 let sequence_column_idx = sequence_column_index(batch.num_columns());
298
299 let sequence_array = if override_array.len() > batch.num_rows() {
301 override_array.slice(0, batch.num_rows())
302 } else {
303 override_array.clone()
304 };
305
306 columns[sequence_column_idx] = sequence_array;
307
308 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
309 }
310
311 pub(crate) fn is_legacy_format(
317 metadata: &RegionMetadata,
318 num_columns: usize,
319 file_path: &str,
320 ) -> Result<bool> {
321 if metadata.primary_key.is_empty() {
322 return Ok(false);
323 }
324
325 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
328
329 if expected_columns == num_columns {
330 Ok(false)
332 } else {
333 ensure!(
334 expected_columns >= num_columns,
335 InvalidParquetSnafu {
336 file: file_path,
337 reason: format!(
338 "Expected columns {} should be >= actual columns {}",
339 expected_columns, num_columns
340 )
341 }
342 );
343
344 let column_diff = expected_columns - num_columns;
346
347 ensure!(
348 column_diff == metadata.primary_key.len(),
349 InvalidParquetSnafu {
350 file: file_path,
351 reason: format!(
352 "Column number difference {} does not match primary key count {}",
353 column_diff,
354 metadata.primary_key.len()
355 )
356 }
357 );
358
359 Ok(true)
360 }
361 }
362}
363
364enum ParquetAdapter {
366 Flat(ParquetFlat),
367 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
368}
369
370struct ParquetPrimaryKeyToFlat {
372 format: PrimaryKeyReadFormat,
374 convert_format: Option<FlatConvertFormat>,
376 format_projection: FormatProjection,
378}
379
380impl ParquetPrimaryKeyToFlat {
381 fn new(
383 metadata: RegionMetadataRef,
384 column_ids: impl Iterator<Item = ColumnId>,
385 skip_auto_convert: bool,
386 ) -> ParquetPrimaryKeyToFlat {
387 assert!(if skip_auto_convert {
388 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
389 } else {
390 true
391 });
392
393 let column_ids: Vec<_> = column_ids.collect();
394
395 let id_to_index = sst_column_id_indices(&metadata);
397 let sst_column_num =
398 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
399
400 let codec = build_primary_key_codec(&metadata);
401 let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
402 let (convert_format, format_projection) = if skip_auto_convert {
403 (
404 None,
405 FormatProjection {
406 projection_indices: format.projection_indices().to_vec(),
407 column_id_to_projected_index: format.field_id_to_projected_index().clone(),
408 },
409 )
410 } else {
411 let format_projection = FormatProjection::compute_format_projection(
413 &id_to_index,
414 sst_column_num,
415 column_ids.iter().copied(),
416 );
417 (
418 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
419 format_projection,
420 )
421 };
422
423 Self {
424 format,
425 convert_format,
426 format_projection,
427 }
428 }
429
430 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
431 if let Some(convert_format) = &self.convert_format {
432 convert_format.convert(record_batch)
433 } else {
434 Ok(record_batch)
435 }
436 }
437}
438
439struct ParquetFlat {
441 metadata: RegionMetadataRef,
443 arrow_schema: SchemaRef,
445 format_projection: FormatProjection,
447 column_id_to_sst_index: HashMap<ColumnId, usize>,
449}
450
451impl ParquetFlat {
452 fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
454 let id_to_index = sst_column_id_indices(&metadata);
456 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
457 let sst_column_num =
458 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
459 let format_projection =
460 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
461
462 Self {
463 metadata,
464 arrow_schema,
465 format_projection,
466 column_id_to_sst_index: id_to_index,
467 }
468 }
469
470 fn min_values(
472 &self,
473 row_groups: &[impl Borrow<RowGroupMetaData>],
474 column_id: ColumnId,
475 ) -> StatValues {
476 self.get_stat_values(row_groups, column_id, true)
477 }
478
479 fn max_values(
481 &self,
482 row_groups: &[impl Borrow<RowGroupMetaData>],
483 column_id: ColumnId,
484 ) -> StatValues {
485 self.get_stat_values(row_groups, column_id, false)
486 }
487
488 fn null_counts(
490 &self,
491 row_groups: &[impl Borrow<RowGroupMetaData>],
492 column_id: ColumnId,
493 ) -> StatValues {
494 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
495 return StatValues::NoColumn;
497 };
498
499 let stats = ReadFormat::column_null_counts(row_groups, *index);
500 StatValues::from_stats_opt(stats)
501 }
502
503 fn get_stat_values(
504 &self,
505 row_groups: &[impl Borrow<RowGroupMetaData>],
506 column_id: ColumnId,
507 is_min: bool,
508 ) -> StatValues {
509 let Some(column) = self.metadata.column_by_id(column_id) else {
510 return StatValues::NoColumn;
512 };
513 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
515
516 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
517 StatValues::from_stats_opt(stats)
518 }
519}
520
521pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
525 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
526 let mut column_index = 0;
527 for pk_id in &metadata.primary_key {
529 id_to_index.insert(*pk_id, column_index);
530 column_index += 1;
531 }
532 for column in &metadata.column_metadatas {
534 if column.semantic_type == SemanticType::Field {
535 id_to_index.insert(column.column_id, column_index);
536 column_index += 1;
537 }
538 }
539 id_to_index.insert(metadata.time_index_column().column_id, column_index);
541
542 id_to_index
543}
544
545pub(crate) fn decode_primary_keys(
549 codec: &dyn PrimaryKeyCodec,
550 batch: &RecordBatch,
551) -> Result<DecodedPrimaryKeys> {
552 let primary_key_index = primary_key_column_index(batch.num_columns());
553 let pk_dict_array = batch
554 .column(primary_key_index)
555 .as_any()
556 .downcast_ref::<PrimaryKeyArray>()
557 .with_context(|| InvalidRecordBatchSnafu {
558 reason: "Primary key column is not a dictionary array".to_string(),
559 })?;
560 let pk_values_array = pk_dict_array
561 .values()
562 .as_any()
563 .downcast_ref::<BinaryArray>()
564 .with_context(|| InvalidRecordBatchSnafu {
565 reason: "Primary key values are not binary array".to_string(),
566 })?;
567
568 let keys = pk_dict_array.keys();
569
570 let mut key_to_decoded_index = Vec::with_capacity(keys.len());
573 let mut decoded_pk_values = Vec::new();
574 let mut prev_key: Option<u32> = None;
575
576 let pk_indices = keys.values();
579 for ¤t_key in pk_indices.iter().take(keys.len()) {
580 if let Some(prev) = prev_key
582 && prev == current_key
583 {
584 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
586 continue;
587 }
588
589 let pk_bytes = pk_values_array.value(current_key as usize);
591 let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
592
593 decoded_pk_values.push(decoded_value);
594 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
595 prev_key = Some(current_key);
596 }
597
598 let keys_array = UInt32Array::from(key_to_decoded_index);
600
601 Ok(DecodedPrimaryKeys {
602 decoded_pk_values,
603 keys_array,
604 })
605}
606
607pub(crate) struct DecodedPrimaryKeys {
609 decoded_pk_values: Vec<CompositeValues>,
611 keys_array: UInt32Array,
613}
614
615impl DecodedPrimaryKeys {
616 pub(crate) fn get_tag_column(
621 &self,
622 column_id: ColumnId,
623 pk_index: Option<usize>,
624 column_type: &ConcreteDataType,
625 ) -> Result<ArrayRef> {
626 let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
628 for decoded in &self.decoded_pk_values {
629 match decoded {
630 CompositeValues::Dense(dense) => {
631 let pk_idx = pk_index.expect("pk_index required for dense encoding");
632 if pk_idx < dense.len() {
633 builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
634 } else {
635 builder.push_null();
636 }
637 }
638 CompositeValues::Sparse(sparse) => {
639 let value = sparse.get_or_null(column_id);
640 builder.push_value_ref(&value.as_value_ref());
641 }
642 };
643 }
644
645 let values_vector = builder.to_vector();
646 let values_array = values_vector.to_arrow_array();
647
648 if column_type.is_string() {
650 let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
653 Ok(Arc::new(dict_array))
654 } else {
655 let taken_array =
657 take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
658 Ok(taken_array)
659 }
660 }
661}
662
663pub(crate) struct FlatConvertFormat {
666 metadata: RegionMetadataRef,
668 codec: Arc<dyn PrimaryKeyCodec>,
670 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
672}
673
674impl FlatConvertFormat {
675 pub(crate) fn new(
682 metadata: RegionMetadataRef,
683 format_projection: &FormatProjection,
684 codec: Arc<dyn PrimaryKeyCodec>,
685 ) -> Option<Self> {
686 if metadata.primary_key.is_empty() {
687 return None;
688 }
689
690 let mut projected_primary_keys = Vec::new();
692 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
693 if format_projection
694 .column_id_to_projected_index
695 .contains_key(&column_id)
696 {
697 let column_index = metadata.column_index_by_id(column_id).unwrap();
699 projected_primary_keys.push((column_id, pk_index, column_index));
700 }
701 }
702
703 Some(Self {
704 metadata,
705 codec,
706 projected_primary_keys,
707 })
708 }
709
710 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
714 if self.projected_primary_keys.is_empty() {
715 return Ok(batch);
716 }
717
718 let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
719
720 let mut decoded_columns = Vec::new();
722 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
723 let column_metadata = &self.metadata.column_metadatas[*column_index];
724 let tag_column = decoded_pks.get_tag_column(
725 *column_id,
726 Some(*pk_index),
727 &column_metadata.column_schema.data_type,
728 )?;
729 decoded_columns.push(tag_column);
730 }
731
732 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
734 new_columns.extend(decoded_columns);
735 new_columns.extend_from_slice(batch.columns());
736
737 let mut new_fields =
739 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
740 for (_, _, column_index) in &self.projected_primary_keys {
741 let column_metadata = &self.metadata.column_metadatas[*column_index];
742 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
743 let field =
744 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
745 new_fields.push(field);
746 }
747 new_fields.extend(batch.schema().fields().iter().cloned());
748
749 let new_schema = Arc::new(Schema::new(new_fields));
750 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
751 }
752}
753
754#[cfg(test)]
755impl FlatReadFormat {
756 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
758 Self::new(
759 Arc::clone(&metadata),
760 metadata.column_metadatas.iter().map(|c| c.column_id),
761 None,
762 "test",
763 false,
764 )
765 .unwrap()
766 }
767}