1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52 NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55 FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
56 PrimaryKeyReadFormat, ReadFormat, StatValues,
57};
58use crate::sst::{
59 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60 to_flat_sst_arrow_schema,
61};
62
63pub(crate) struct FlatWriteFormat {
65 arrow_schema: SchemaRef,
67 override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74 FlatWriteFormat {
75 arrow_schema,
76 override_sequence: None,
77 }
78 }
79
80 pub(crate) fn with_override_sequence(
82 mut self,
83 override_sequence: Option<SequenceNumber>,
84 ) -> Self {
85 self.override_sequence = override_sequence;
86 self
87 }
88
89 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91 &self.arrow_schema
92 }
93
94 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98 let Some(override_sequence) = self.override_sequence else {
99 return Ok(batch.clone());
100 };
101
102 let mut columns = batch.columns().to_vec();
103 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104 columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107 }
108}
109
110pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112 num_columns - 2
113}
114
115pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117 num_columns - 4
118}
119
120pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122 num_columns - 3
123}
124
125pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127 num_columns - 1
128}
129
130pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
139 let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
142 num_columns - FIXED_POS_COLUMN_NUM - field_column_count
143}
144
145pub struct FlatReadFormat {
152 override_sequence: Option<SequenceNumber>,
154 parquet_adapter: ParquetAdapter,
156}
157
158impl FlatReadFormat {
159 pub fn new(
163 metadata: RegionMetadataRef,
164 column_ids: impl Iterator<Item = ColumnId>,
165 num_columns: Option<usize>,
166 file_path: &str,
167 skip_auto_convert: bool,
168 ) -> Result<FlatReadFormat> {
169 let is_legacy = match num_columns {
170 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
171 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
172 };
173
174 let parquet_adapter = if is_legacy {
175 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
177 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
179 metadata,
180 column_ids,
181 skip_auto_convert,
182 ))
183 } else {
184 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
185 metadata, column_ids, false,
186 ))
187 }
188 } else {
189 ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
190 };
191
192 Ok(FlatReadFormat {
193 override_sequence: None,
194 parquet_adapter,
195 })
196 }
197
198 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
200 self.override_sequence = sequence;
201 }
202
203 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
205 self.format_projection()
206 .column_id_to_projected_index
207 .get(&column_id)
208 .copied()
209 }
210
211 pub fn min_values(
213 &self,
214 row_groups: &[impl Borrow<RowGroupMetaData>],
215 column_id: ColumnId,
216 ) -> StatValues {
217 match &self.parquet_adapter {
218 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
219 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
220 }
221 }
222
223 pub fn max_values(
225 &self,
226 row_groups: &[impl Borrow<RowGroupMetaData>],
227 column_id: ColumnId,
228 ) -> StatValues {
229 match &self.parquet_adapter {
230 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
231 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
232 }
233 }
234
235 pub fn null_counts(
237 &self,
238 row_groups: &[impl Borrow<RowGroupMetaData>],
239 column_id: ColumnId,
240 ) -> StatValues {
241 match &self.parquet_adapter {
242 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
243 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
244 }
245 }
246
247 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
252 match &self.parquet_adapter {
253 ParquetAdapter::Flat(p) => &p.arrow_schema,
254 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
255 }
256 }
257
258 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
260 match &self.parquet_adapter {
261 ParquetAdapter::Flat(p) => &p.metadata,
262 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
263 }
264 }
265
266 pub(crate) fn projection_indices(&self) -> &[usize] {
268 match &self.parquet_adapter {
269 ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
270 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
271 }
272 }
273
274 pub(crate) fn format_projection(&self) -> &FormatProjection {
279 match &self.parquet_adapter {
280 ParquetAdapter::Flat(p) => &p.format_projection,
281 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
282 }
283 }
284
285 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
287 self.override_sequence
288 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
289 }
290
291 pub(crate) fn convert_batch(
296 &self,
297 record_batch: RecordBatch,
298 override_sequence_array: Option<&ArrayRef>,
299 ) -> Result<RecordBatch> {
300 let batch = match &self.parquet_adapter {
302 ParquetAdapter::Flat(_) => record_batch,
303 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
304 };
305
306 let Some(override_array) = override_sequence_array else {
308 return Ok(batch);
309 };
310
311 let mut columns = batch.columns().to_vec();
312 let sequence_column_idx = sequence_column_index(batch.num_columns());
313
314 let sequence_array = if override_array.len() > batch.num_rows() {
316 override_array.slice(0, batch.num_rows())
317 } else {
318 override_array.clone()
319 };
320
321 columns[sequence_column_idx] = sequence_array;
322
323 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
324 }
325
326 pub(crate) fn is_legacy_format(
332 metadata: &RegionMetadata,
333 num_columns: usize,
334 file_path: &str,
335 ) -> Result<bool> {
336 if metadata.primary_key.is_empty() {
337 return Ok(false);
338 }
339
340 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
343
344 if expected_columns == num_columns {
345 Ok(false)
347 } else {
348 ensure!(
349 expected_columns >= num_columns,
350 InvalidParquetSnafu {
351 file: file_path,
352 reason: format!(
353 "Expected columns {} should be >= actual columns {}",
354 expected_columns, num_columns
355 )
356 }
357 );
358
359 let column_diff = expected_columns - num_columns;
361
362 ensure!(
363 column_diff == metadata.primary_key.len(),
364 InvalidParquetSnafu {
365 file: file_path,
366 reason: format!(
367 "Column number difference {} does not match primary key count {}",
368 column_diff,
369 metadata.primary_key.len()
370 )
371 }
372 );
373
374 Ok(true)
375 }
376 }
377}
378
379enum ParquetAdapter {
381 Flat(ParquetFlat),
382 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
383}
384
385struct ParquetPrimaryKeyToFlat {
387 format: PrimaryKeyReadFormat,
389 convert_format: Option<FlatConvertFormat>,
391 format_projection: FormatProjection,
393}
394
395impl ParquetPrimaryKeyToFlat {
396 fn new(
398 metadata: RegionMetadataRef,
399 column_ids: impl Iterator<Item = ColumnId>,
400 skip_auto_convert: bool,
401 ) -> ParquetPrimaryKeyToFlat {
402 assert!(if skip_auto_convert {
403 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
404 } else {
405 true
406 });
407
408 let column_ids: Vec<_> = column_ids.collect();
409
410 let id_to_index = sst_column_id_indices(&metadata);
412 let sst_column_num =
413 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
414
415 let codec = build_primary_key_codec(&metadata);
416 let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
417 let (convert_format, format_projection) = if skip_auto_convert {
418 (
419 None,
420 FormatProjection {
421 projection_indices: format.projection_indices().to_vec(),
422 column_id_to_projected_index: format.field_id_to_projected_index().clone(),
423 },
424 )
425 } else {
426 let format_projection = FormatProjection::compute_format_projection(
428 &id_to_index,
429 sst_column_num,
430 column_ids.iter().copied(),
431 );
432 (
433 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
434 format_projection,
435 )
436 };
437
438 Self {
439 format,
440 convert_format,
441 format_projection,
442 }
443 }
444
445 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
446 if let Some(convert_format) = &self.convert_format {
447 convert_format.convert(record_batch)
448 } else {
449 Ok(record_batch)
450 }
451 }
452}
453
454struct ParquetFlat {
456 metadata: RegionMetadataRef,
458 arrow_schema: SchemaRef,
460 format_projection: FormatProjection,
462 column_id_to_sst_index: HashMap<ColumnId, usize>,
464}
465
466impl ParquetFlat {
467 fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
469 let id_to_index = sst_column_id_indices(&metadata);
471 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
472 let sst_column_num =
473 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
474 let format_projection =
475 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
476
477 Self {
478 metadata,
479 arrow_schema,
480 format_projection,
481 column_id_to_sst_index: id_to_index,
482 }
483 }
484
485 fn min_values(
487 &self,
488 row_groups: &[impl Borrow<RowGroupMetaData>],
489 column_id: ColumnId,
490 ) -> StatValues {
491 self.get_stat_values(row_groups, column_id, true)
492 }
493
494 fn max_values(
496 &self,
497 row_groups: &[impl Borrow<RowGroupMetaData>],
498 column_id: ColumnId,
499 ) -> StatValues {
500 self.get_stat_values(row_groups, column_id, false)
501 }
502
503 fn null_counts(
505 &self,
506 row_groups: &[impl Borrow<RowGroupMetaData>],
507 column_id: ColumnId,
508 ) -> StatValues {
509 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
510 return StatValues::NoColumn;
512 };
513
514 let stats = ReadFormat::column_null_counts(row_groups, *index);
515 StatValues::from_stats_opt(stats)
516 }
517
518 fn get_stat_values(
519 &self,
520 row_groups: &[impl Borrow<RowGroupMetaData>],
521 column_id: ColumnId,
522 is_min: bool,
523 ) -> StatValues {
524 let Some(column) = self.metadata.column_by_id(column_id) else {
525 return StatValues::NoColumn;
527 };
528 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
530
531 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
532 StatValues::from_stats_opt(stats)
533 }
534}
535
536pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
540 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
541 let mut column_index = 0;
542 for pk_id in &metadata.primary_key {
544 id_to_index.insert(*pk_id, column_index);
545 column_index += 1;
546 }
547 for column in &metadata.column_metadatas {
549 if column.semantic_type == SemanticType::Field {
550 id_to_index.insert(column.column_id, column_index);
551 column_index += 1;
552 }
553 }
554 id_to_index.insert(metadata.time_index_column().column_id, column_index);
556
557 id_to_index
558}
559
560pub(crate) fn decode_primary_keys(
564 codec: &dyn PrimaryKeyCodec,
565 batch: &RecordBatch,
566) -> Result<DecodedPrimaryKeys> {
567 let primary_key_index = primary_key_column_index(batch.num_columns());
568 let pk_dict_array = batch
569 .column(primary_key_index)
570 .as_any()
571 .downcast_ref::<PrimaryKeyArray>()
572 .with_context(|| InvalidRecordBatchSnafu {
573 reason: "Primary key column is not a dictionary array".to_string(),
574 })?;
575 let pk_values_array = pk_dict_array
576 .values()
577 .as_any()
578 .downcast_ref::<BinaryArray>()
579 .with_context(|| InvalidRecordBatchSnafu {
580 reason: "Primary key values are not binary array".to_string(),
581 })?;
582
583 let keys = pk_dict_array.keys();
584
585 let mut key_to_decoded_index = Vec::with_capacity(keys.len());
588 let mut decoded_pk_values = Vec::new();
589 let mut prev_key: Option<u32> = None;
590
591 let pk_indices = keys.values();
594 for ¤t_key in pk_indices.iter().take(keys.len()) {
595 if let Some(prev) = prev_key
597 && prev == current_key
598 {
599 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
601 continue;
602 }
603
604 let pk_bytes = pk_values_array.value(current_key as usize);
606 let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
607
608 decoded_pk_values.push(decoded_value);
609 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
610 prev_key = Some(current_key);
611 }
612
613 let keys_array = UInt32Array::from(key_to_decoded_index);
615
616 Ok(DecodedPrimaryKeys {
617 decoded_pk_values,
618 keys_array,
619 })
620}
621
622pub(crate) struct DecodedPrimaryKeys {
624 decoded_pk_values: Vec<CompositeValues>,
626 keys_array: UInt32Array,
628}
629
630impl DecodedPrimaryKeys {
631 pub(crate) fn get_tag_column(
636 &self,
637 column_id: ColumnId,
638 pk_index: Option<usize>,
639 column_type: &ConcreteDataType,
640 ) -> Result<ArrayRef> {
641 let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
643 for decoded in &self.decoded_pk_values {
644 match decoded {
645 CompositeValues::Dense(dense) => {
646 let pk_idx = pk_index.expect("pk_index required for dense encoding");
647 if pk_idx < dense.len() {
648 builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
649 } else {
650 builder.push_null();
651 }
652 }
653 CompositeValues::Sparse(sparse) => {
654 let value = sparse.get_or_null(column_id);
655 builder.push_value_ref(&value.as_value_ref());
656 }
657 };
658 }
659
660 let values_vector = builder.to_vector();
661 let values_array = values_vector.to_arrow_array();
662
663 if column_type.is_string() {
665 let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
668 Ok(Arc::new(dict_array))
669 } else {
670 let taken_array =
672 take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
673 Ok(taken_array)
674 }
675 }
676}
677
678pub(crate) struct FlatConvertFormat {
681 metadata: RegionMetadataRef,
683 codec: Arc<dyn PrimaryKeyCodec>,
685 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
687}
688
689impl FlatConvertFormat {
690 pub(crate) fn new(
697 metadata: RegionMetadataRef,
698 format_projection: &FormatProjection,
699 codec: Arc<dyn PrimaryKeyCodec>,
700 ) -> Option<Self> {
701 if metadata.primary_key.is_empty() {
702 return None;
703 }
704
705 let mut projected_primary_keys = Vec::new();
707 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
708 if format_projection
709 .column_id_to_projected_index
710 .contains_key(&column_id)
711 {
712 let column_index = metadata.column_index_by_id(column_id).unwrap();
714 projected_primary_keys.push((column_id, pk_index, column_index));
715 }
716 }
717
718 Some(Self {
719 metadata,
720 codec,
721 projected_primary_keys,
722 })
723 }
724
725 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
729 if self.projected_primary_keys.is_empty() {
730 return Ok(batch);
731 }
732
733 let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
734
735 let mut decoded_columns = Vec::new();
737 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
738 let column_metadata = &self.metadata.column_metadatas[*column_index];
739 let tag_column = decoded_pks.get_tag_column(
740 *column_id,
741 Some(*pk_index),
742 &column_metadata.column_schema.data_type,
743 )?;
744 decoded_columns.push(tag_column);
745 }
746
747 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
749 new_columns.extend(decoded_columns);
750 new_columns.extend_from_slice(batch.columns());
751
752 let mut new_fields =
754 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
755 for (_, _, column_index) in &self.projected_primary_keys {
756 let column_metadata = &self.metadata.column_metadatas[*column_index];
757 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
758 let field =
759 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
760 new_fields.push(field);
761 }
762 new_fields.extend(batch.schema().fields().iter().cloned());
763
764 let new_schema = Arc::new(Schema::new(new_fields));
765 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
766 }
767}
768
769#[cfg(test)]
770impl FlatReadFormat {
771 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
773 Self::new(
774 Arc::clone(&metadata),
775 metadata.column_metadatas.iter().map(|c| c.column_id),
776 None,
777 "test",
778 false,
779 )
780 .unwrap()
781 }
782}
783
784#[cfg(test)]
785mod tests {
786 use api::v1::SemanticType;
787 use datatypes::prelude::ConcreteDataType;
788 use datatypes::schema::ColumnSchema;
789 use store_api::codec::PrimaryKeyEncoding;
790 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
791 use store_api::storage::RegionId;
792
793 use super::field_column_start;
794 use crate::sst::{FlatSchemaOptions, flat_sst_arrow_schema_column_num};
795
796 fn build_metadata(
798 num_tags: usize,
799 num_fields: usize,
800 encoding: PrimaryKeyEncoding,
801 ) -> RegionMetadata {
802 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
803 let mut col_id = 0u32;
804
805 for i in 0..num_tags {
806 builder.push_column_metadata(ColumnMetadata {
807 column_schema: ColumnSchema::new(
808 format!("tag_{i}"),
809 ConcreteDataType::string_datatype(),
810 true,
811 ),
812 semantic_type: SemanticType::Tag,
813 column_id: col_id,
814 });
815 col_id += 1;
816 }
817
818 for i in 0..num_fields {
819 builder.push_column_metadata(ColumnMetadata {
820 column_schema: ColumnSchema::new(
821 format!("field_{i}"),
822 ConcreteDataType::uint64_datatype(),
823 true,
824 ),
825 semantic_type: SemanticType::Field,
826 column_id: col_id,
827 });
828 col_id += 1;
829 }
830
831 builder.push_column_metadata(ColumnMetadata {
832 column_schema: ColumnSchema::new(
833 "ts".to_string(),
834 ConcreteDataType::timestamp_millisecond_datatype(),
835 false,
836 ),
837 semantic_type: SemanticType::Timestamp,
838 column_id: col_id,
839 });
840
841 let primary_key: Vec<u32> = (0..num_tags as u32).collect();
842 builder.primary_key(primary_key);
843 builder.primary_key_encoding(encoding);
844 builder.build().unwrap()
845 }
846
847 #[test]
848 fn test_field_column_start() {
849 let cases = [
851 (1, 1, PrimaryKeyEncoding::Dense, 1),
852 (2, 2, PrimaryKeyEncoding::Dense, 2),
853 (0, 2, PrimaryKeyEncoding::Dense, 0),
854 (2, 2, PrimaryKeyEncoding::Sparse, 0),
855 ];
856
857 for (num_tags, num_fields, encoding, expected) in cases {
858 let metadata = build_metadata(num_tags, num_fields, encoding);
859 let options = FlatSchemaOptions::from_encoding(encoding);
860 let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
861 let result = field_column_start(&metadata, num_columns);
862 assert_eq!(
863 result, expected,
864 "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
865 );
866 }
867 }
868}