1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use arrow_schema::{DataType as ArrowDataType, FieldRef};
37use datatypes::arrow::array::{
38 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
39};
40use datatypes::arrow::compute::kernels::take::take;
41use datatypes::arrow::datatypes::{Schema, SchemaRef};
42use datatypes::arrow::record_batch::RecordBatch;
43use datatypes::prelude::{ConcreteDataType, DataType};
44use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
45use parquet::file::metadata::RowGroupMetaData;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::codec::PrimaryKeyEncoding;
48use store_api::metadata::{RegionMetadata, RegionMetadataRef};
49use store_api::storage::{ColumnId, NestedPath, SequenceNumber};
50
51use crate::error::{
52 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
53 NewRecordBatchSnafu, Result,
54};
55use crate::read::read_columns::ReadColumns;
56use crate::sst::parquet::format::{
57 FIXED_POS_COLUMN_NUM, FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray,
58 PrimaryKeyReadFormat, StatValues, column_null_counts, column_values,
59};
60use crate::sst::parquet::read_columns::ParquetReadColumns;
61use crate::sst::{
62 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
63 to_flat_sst_arrow_schema, with_field_id,
64};
65
66pub(crate) struct FlatWriteFormat {
68 arrow_schema: SchemaRef,
70 override_sequence: Option<SequenceNumber>,
71}
72
73impl FlatWriteFormat {
74 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
76 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
77 FlatWriteFormat {
78 arrow_schema,
79 override_sequence: None,
80 }
81 }
82
83 pub(crate) fn with_override_sequence(
85 mut self,
86 override_sequence: Option<SequenceNumber>,
87 ) -> Self {
88 self.override_sequence = override_sequence;
89 self
90 }
91
92 #[cfg(test)]
94 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
95 &self.arrow_schema
96 }
97
98 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
100 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
101
102 let Some(override_sequence) = self.override_sequence else {
103 return Ok(batch.clone());
104 };
105
106 let mut columns = batch.columns().to_vec();
107 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
108 columns[sequence_column_index(batch.num_columns())] = sequence_array;
109
110 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
111 }
112}
113
114pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
116 num_columns - 2
117}
118
119pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
121 num_columns - 4
122}
123
124pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
126 num_columns - 3
127}
128
129pub(crate) fn wrap_pk_binary_to_dict(
131 record_batch: RecordBatch,
132 dict_schema: &SchemaRef,
133) -> Result<RecordBatch> {
134 let pk_idx = primary_key_column_index(record_batch.num_columns());
135 let pk_column = record_batch.column(pk_idx);
136 let binary_array = pk_column
137 .as_any()
138 .downcast_ref::<BinaryArray>()
139 .with_context(|| InvalidRecordBatchSnafu {
140 reason: format!(
141 "expected BinaryArray for __primary_key, got {:?}",
142 pk_column.data_type()
143 ),
144 })?;
145 let n = binary_array.len();
146 let keys = UInt32Array::from_iter_values(0..n as u32);
147 let dict_array: ArrayRef = Arc::new(DictionaryArray::new(keys, pk_column.clone()));
148
149 let mut columns = record_batch.columns().to_vec();
150 columns[pk_idx] = dict_array;
151
152 RecordBatch::try_new(dict_schema.clone(), columns).context(NewRecordBatchSnafu)
153}
154
155pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
157 num_columns - 1
158}
159
160pub(crate) fn field_column_start(metadata: &RegionMetadata, num_columns: usize) -> usize {
169 let field_column_count = metadata.column_metadatas.len() - 1 - metadata.primary_key.len();
172 num_columns - FIXED_POS_COLUMN_NUM - field_column_count
173}
174
175pub struct FlatReadFormat {
182 override_sequence: Option<SequenceNumber>,
184 parquet_adapter: ParquetAdapter,
186 pk_dict_wrap_schema: Option<SchemaRef>,
188}
189
190impl FlatReadFormat {
191 pub fn new(
195 metadata: RegionMetadataRef,
196 read_cols: ReadColumns,
197 file_schema: Option<SchemaRef>,
198 file_path: &str,
199 skip_auto_convert: bool,
200 ) -> Result<FlatReadFormat> {
201 let num_columns = file_schema.as_ref().map(|x| x.fields().len());
202 let is_legacy = match num_columns {
203 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
204 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
205 };
206
207 let parquet_adapter = if is_legacy {
208 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
210 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
212 metadata,
213 read_cols,
214 skip_auto_convert,
215 ))
216 } else {
217 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
218 metadata, read_cols, false,
219 ))
220 }
221 } else {
222 let file_schema = file_schema
223 .unwrap_or_else(|| to_flat_sst_arrow_schema(&metadata, &Default::default()));
224 ParquetAdapter::Flat(ParquetFlat::new(metadata, read_cols, file_schema))
225 };
226
227 Ok(FlatReadFormat {
228 override_sequence: None,
229 parquet_adapter,
230 pk_dict_wrap_schema: None,
231 })
232 }
233
234 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
236 self.override_sequence = sequence;
237 }
238
239 pub(crate) fn set_pk_as_binary(&mut self) -> Result<()> {
241 self.pk_dict_wrap_schema = Some(self.output_arrow_schema()?);
242 Ok(())
243 }
244
245 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
247 self.format_projection()
248 .column_id_to_projected_index
249 .get(&column_id)
250 .copied()
251 }
252
253 pub fn min_values(
255 &self,
256 row_groups: &[impl Borrow<RowGroupMetaData>],
257 column_id: ColumnId,
258 ) -> StatValues {
259 match &self.parquet_adapter {
260 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
261 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
262 }
263 }
264
265 pub fn max_values(
267 &self,
268 row_groups: &[impl Borrow<RowGroupMetaData>],
269 column_id: ColumnId,
270 ) -> StatValues {
271 match &self.parquet_adapter {
272 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
273 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
274 }
275 }
276
277 pub fn null_counts(
279 &self,
280 row_groups: &[impl Borrow<RowGroupMetaData>],
281 column_id: ColumnId,
282 ) -> StatValues {
283 match &self.parquet_adapter {
284 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
285 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
286 }
287 }
288
289 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
291 match &self.parquet_adapter {
292 ParquetAdapter::Flat(p) => &p.arrow_schema,
293 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
294 }
295 }
296
297 pub(crate) fn output_arrow_schema(&self) -> Result<SchemaRef> {
299 let read_columns = self.parquet_read_columns();
300 let projection = read_columns.root_indices();
301 let mut schema = self
302 .arrow_schema()
303 .project(projection)
304 .context(ComputeArrowSnafu)?;
305 if read_columns.has_nested() {
306 debug_assert_eq!(schema.fields().len(), read_columns.columns().len());
307 let nested_paths = read_columns.columns().iter().map(|x| x.nested_paths());
308 prune_schema_by_nested_paths(&mut schema, nested_paths);
309 }
310 Ok(Arc::new(schema))
311 }
312
313 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
315 match &self.parquet_adapter {
316 ParquetAdapter::Flat(p) => &p.metadata,
317 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
318 }
319 }
320
321 pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
323 match &self.parquet_adapter {
324 ParquetAdapter::Flat(p) => &p.format_projection.parquet_read_cols,
325 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.parquet_read_columns(),
326 }
327 }
328
329 pub(crate) fn format_projection(&self) -> &FormatProjection {
334 match &self.parquet_adapter {
335 ParquetAdapter::Flat(p) => &p.format_projection,
336 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
337 }
338 }
339
340 pub(crate) fn batch_has_raw_pk_columns(&self) -> bool {
344 matches!(&self.parquet_adapter, ParquetAdapter::Flat(_))
345 }
346
347 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
349 self.override_sequence
350 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
351 }
352
353 pub(crate) fn convert_batch(
358 &self,
359 record_batch: RecordBatch,
360 override_sequence_array: Option<&ArrayRef>,
361 ) -> Result<RecordBatch> {
362 let record_batch = if let Some(dict_schema) = &self.pk_dict_wrap_schema {
363 wrap_pk_binary_to_dict(record_batch, dict_schema)?
364 } else {
365 record_batch
366 };
367
368 let batch = match &self.parquet_adapter {
370 ParquetAdapter::Flat(_) => record_batch,
371 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
372 };
373
374 let Some(override_array) = override_sequence_array else {
376 return Ok(batch);
377 };
378
379 let mut columns = batch.columns().to_vec();
380 let sequence_column_idx = sequence_column_index(batch.num_columns());
381
382 let sequence_array = if override_array.len() > batch.num_rows() {
384 override_array.slice(0, batch.num_rows())
385 } else {
386 override_array.clone()
387 };
388
389 columns[sequence_column_idx] = sequence_array;
390
391 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
392 }
393
394 pub(crate) fn is_legacy_format(
400 metadata: &RegionMetadata,
401 num_columns: usize,
402 file_path: &str,
403 ) -> Result<bool> {
404 if metadata.primary_key.is_empty() {
405 return Ok(false);
406 }
407
408 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
411
412 if expected_columns == num_columns {
413 Ok(false)
415 } else {
416 ensure!(
417 expected_columns >= num_columns,
418 InvalidParquetSnafu {
419 file: file_path,
420 reason: format!(
421 "Expected columns {} should be >= actual columns {}",
422 expected_columns, num_columns
423 )
424 }
425 );
426
427 let column_diff = expected_columns - num_columns;
429
430 ensure!(
431 column_diff == metadata.primary_key.len(),
432 InvalidParquetSnafu {
433 file: file_path,
434 reason: format!(
435 "Column number difference {} does not match primary key count {}",
436 column_diff,
437 metadata.primary_key.len()
438 )
439 }
440 );
441
442 Ok(true)
443 }
444 }
445}
446
447fn prune_schema_by_nested_paths<'a, I>(schema: &mut Schema, nested_paths: I)
448where
449 I: IntoIterator<Item = &'a [NestedPath]>,
450{
451 let fields = schema
452 .fields
453 .into_iter()
454 .zip(nested_paths)
455 .map(|(field, paths)| {
456 if matches!(field.data_type(), ArrowDataType::Struct(_)) && !paths.is_empty() {
457 let child_paths = paths
458 .iter()
459 .map(|path| {
460 if path.first().is_some_and(|root| root == field.name()) {
461 &path[1..]
462 } else {
463 path
464 }
465 })
466 .collect::<Vec<_>>();
467 prune_field_by_nested_paths(field, &child_paths)
468 } else {
469 field.clone()
470 }
471 })
472 .collect::<Vec<_>>();
473 schema.fields = fields.into()
474}
475
476fn prune_field_by_nested_paths(field: &FieldRef, nested_paths: &[&[String]]) -> FieldRef {
477 let ArrowDataType::Struct(fields) = field.data_type() else {
478 return field.clone();
479 };
480
481 let pruned_fields = fields
482 .iter()
483 .filter_map(|field| {
484 let child_paths = nested_paths
485 .iter()
486 .filter_map(|path| {
487 path.first()
488 .is_some_and(|name| name == field.name())
489 .then_some(&path[1..])
490 })
491 .collect::<Vec<_>>();
492
493 if child_paths.is_empty() {
494 None
495 } else if child_paths.iter().any(|path| path.is_empty()) {
496 Some(field.clone())
497 } else {
498 Some(prune_field_by_nested_paths(field, &child_paths))
499 }
500 })
501 .collect::<Vec<_>>();
502
503 Arc::new(
504 field
505 .as_ref()
506 .clone()
507 .with_data_type(ArrowDataType::Struct(pruned_fields.into())),
508 )
509}
510
511enum ParquetAdapter {
513 Flat(ParquetFlat),
514 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
515}
516
517struct ParquetPrimaryKeyToFlat {
519 format: PrimaryKeyReadFormat,
521 convert_format: Option<FlatConvertFormat>,
523 format_projection: FormatProjection,
525}
526
527impl ParquetPrimaryKeyToFlat {
528 fn new(
530 metadata: RegionMetadataRef,
531 read_cols: ReadColumns,
532 skip_auto_convert: bool,
533 ) -> ParquetPrimaryKeyToFlat {
534 assert!(if skip_auto_convert {
535 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
536 } else {
537 true
538 });
539
540 let id_to_index = sst_column_id_indices(&metadata);
542 let sst_column_num =
543 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
544
545 let codec = build_primary_key_codec(&metadata);
546 let format = PrimaryKeyReadFormat::new(metadata.clone(), read_cols.clone());
547 let (convert_format, format_projection) = if skip_auto_convert {
548 (
549 None,
550 FormatProjection {
551 parquet_read_cols: format.parquet_read_columns().clone(),
552 column_id_to_projected_index: format.field_id_to_projected_index().clone(),
553 },
554 )
555 } else {
556 let format_projection = FormatProjection::compute_format_projection(
558 &id_to_index,
559 sst_column_num,
560 read_cols.clone(),
561 );
562 (
563 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec),
564 format_projection,
565 )
566 };
567
568 Self {
569 format,
570 convert_format,
571 format_projection,
572 }
573 }
574
575 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
576 if let Some(convert_format) = &self.convert_format {
577 convert_format.convert(record_batch)
578 } else {
579 Ok(record_batch)
580 }
581 }
582}
583
584struct ParquetFlat {
586 metadata: RegionMetadataRef,
588 arrow_schema: SchemaRef,
590 format_projection: FormatProjection,
592 column_id_to_sst_index: HashMap<ColumnId, usize>,
594}
595
596impl ParquetFlat {
597 fn new(
599 metadata: RegionMetadataRef,
600 read_cols: ReadColumns,
601 arrow_schema: SchemaRef,
602 ) -> ParquetFlat {
603 let id_to_index = sst_column_id_indices(&metadata);
605 let sst_column_num =
606 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
607 let format_projection =
608 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, read_cols);
609
610 Self {
611 metadata,
612 arrow_schema,
613 format_projection,
614 column_id_to_sst_index: id_to_index,
615 }
616 }
617
618 fn min_values(
620 &self,
621 row_groups: &[impl Borrow<RowGroupMetaData>],
622 column_id: ColumnId,
623 ) -> StatValues {
624 self.get_stat_values(row_groups, column_id, true)
625 }
626
627 fn max_values(
629 &self,
630 row_groups: &[impl Borrow<RowGroupMetaData>],
631 column_id: ColumnId,
632 ) -> StatValues {
633 self.get_stat_values(row_groups, column_id, false)
634 }
635
636 fn null_counts(
638 &self,
639 row_groups: &[impl Borrow<RowGroupMetaData>],
640 column_id: ColumnId,
641 ) -> StatValues {
642 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
643 return StatValues::NoColumn;
645 };
646
647 let stats = column_null_counts(row_groups, *index);
648 StatValues::from_stats_opt(stats)
649 }
650
651 fn get_stat_values(
652 &self,
653 row_groups: &[impl Borrow<RowGroupMetaData>],
654 column_id: ColumnId,
655 is_min: bool,
656 ) -> StatValues {
657 let Some(column) = self.metadata.column_by_id(column_id) else {
658 return StatValues::NoColumn;
660 };
661 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
663
664 let stats = column_values(row_groups, column, *index, is_min);
665 StatValues::from_stats_opt(stats)
666 }
667}
668
669pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
673 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
674 let mut column_index = 0;
675 for pk_id in &metadata.primary_key {
677 id_to_index.insert(*pk_id, column_index);
678 column_index += 1;
679 }
680 for column in &metadata.column_metadatas {
682 if column.semantic_type == SemanticType::Field {
683 id_to_index.insert(column.column_id, column_index);
684 column_index += 1;
685 }
686 }
687 id_to_index.insert(metadata.time_index_column().column_id, column_index);
689
690 id_to_index
691}
692
693pub(crate) fn decode_primary_keys(
697 codec: &dyn PrimaryKeyCodec,
698 batch: &RecordBatch,
699) -> Result<DecodedPrimaryKeys> {
700 let primary_key_index = primary_key_column_index(batch.num_columns());
701 let pk_dict_array = batch
702 .column(primary_key_index)
703 .as_any()
704 .downcast_ref::<PrimaryKeyArray>()
705 .with_context(|| InvalidRecordBatchSnafu {
706 reason: "Primary key column is not a dictionary array".to_string(),
707 })?;
708 let pk_values_array = pk_dict_array
709 .values()
710 .as_any()
711 .downcast_ref::<BinaryArray>()
712 .with_context(|| InvalidRecordBatchSnafu {
713 reason: "Primary key values are not binary array".to_string(),
714 })?;
715
716 let keys = pk_dict_array.keys();
717
718 let mut key_to_decoded_index = Vec::with_capacity(keys.len());
721 let mut decoded_pk_values = Vec::new();
722 let mut prev_key: Option<u32> = None;
723
724 let pk_indices = keys.values();
727 for ¤t_key in pk_indices.iter().take(keys.len()) {
728 if let Some(prev) = prev_key
730 && prev == current_key
731 {
732 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
734 continue;
735 }
736
737 let pk_bytes = pk_values_array.value(current_key as usize);
739 let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
740
741 decoded_pk_values.push(decoded_value);
742 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
743 prev_key = Some(current_key);
744 }
745
746 let keys_array = UInt32Array::from(key_to_decoded_index);
748
749 Ok(DecodedPrimaryKeys {
750 decoded_pk_values,
751 keys_array,
752 })
753}
754
755pub(crate) struct DecodedPrimaryKeys {
757 decoded_pk_values: Vec<CompositeValues>,
759 keys_array: UInt32Array,
761}
762
763impl DecodedPrimaryKeys {
764 pub(crate) fn get_tag_column(
769 &self,
770 column_id: ColumnId,
771 pk_index: Option<usize>,
772 column_type: &ConcreteDataType,
773 ) -> Result<ArrayRef> {
774 let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
776 for decoded in &self.decoded_pk_values {
777 match decoded {
778 CompositeValues::Dense(dense) => {
779 let pk_idx = pk_index.expect("pk_index required for dense encoding");
780 if pk_idx < dense.len() {
781 builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
782 } else {
783 builder.push_null();
784 }
785 }
786 CompositeValues::Sparse(sparse) => {
787 let value = sparse.get_or_null(column_id);
788 builder.push_value_ref(&value.as_value_ref());
789 }
790 };
791 }
792
793 let values_vector = builder.to_vector();
794 let values_array = values_vector.to_arrow_array();
795
796 if column_type.is_string() {
798 let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
801 Ok(Arc::new(dict_array))
802 } else {
803 let taken_array =
805 take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
806 Ok(taken_array)
807 }
808 }
809}
810
811pub(crate) struct FlatConvertFormat {
814 metadata: RegionMetadataRef,
816 codec: Arc<dyn PrimaryKeyCodec>,
818 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
820}
821
822impl FlatConvertFormat {
823 pub(crate) fn new(
830 metadata: RegionMetadataRef,
831 format_projection: &FormatProjection,
832 codec: Arc<dyn PrimaryKeyCodec>,
833 ) -> Option<Self> {
834 if metadata.primary_key.is_empty() {
835 return None;
836 }
837
838 let mut projected_primary_keys = Vec::new();
840 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
841 if format_projection
842 .column_id_to_projected_index
843 .contains_key(&column_id)
844 {
845 let column_index = metadata.column_index_by_id(column_id).unwrap();
847 projected_primary_keys.push((column_id, pk_index, column_index));
848 }
849 }
850
851 Some(Self {
852 metadata,
853 codec,
854 projected_primary_keys,
855 })
856 }
857
858 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
862 if self.projected_primary_keys.is_empty() {
863 return Ok(batch);
864 }
865
866 let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
867
868 let mut decoded_columns = Vec::new();
870 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
871 let column_metadata = &self.metadata.column_metadatas[*column_index];
872 let tag_column = decoded_pks.get_tag_column(
873 *column_id,
874 Some(*pk_index),
875 &column_metadata.column_schema.data_type,
876 )?;
877 decoded_columns.push(tag_column);
878 }
879
880 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
882 new_columns.extend(decoded_columns);
883 new_columns.extend_from_slice(batch.columns());
884
885 let mut new_fields =
887 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
888 for (column_id, _, column_index) in &self.projected_primary_keys {
889 let column_metadata = &self.metadata.column_metadatas[*column_index];
890 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
891 let field =
892 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
893 new_fields.push(Arc::new(with_field_id((*field).clone(), *column_id)));
894 }
895 new_fields.extend(batch.schema().fields().iter().cloned());
896
897 let new_schema = Arc::new(Schema::new(new_fields));
898 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
899 }
900}
901
902#[cfg(test)]
903impl FlatReadFormat {
904 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
906 Self::new(
907 Arc::clone(&metadata),
908 ReadColumns::from_deduped_column_ids(
909 metadata.column_metadatas.iter().map(|c| c.column_id),
910 ),
911 None,
912 "test",
913 false,
914 )
915 .unwrap()
916 }
917}
918
919#[cfg(test)]
920mod tests {
921 use std::sync::Arc;
922
923 use api::v1::SemanticType;
924 use arrow_schema::Field;
925 use datatypes::arrow::array::{
926 ArrayRef, BinaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
927 };
928 use datatypes::arrow::datatypes::DataType as ArrowDataType;
929 use datatypes::arrow::record_batch::RecordBatch;
930 use datatypes::prelude::ConcreteDataType;
931 use datatypes::schema::ColumnSchema;
932 use store_api::codec::PrimaryKeyEncoding;
933 use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataBuilder};
934 use store_api::storage::RegionId;
935 use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
936
937 use super::*;
938 use crate::read::read_columns::ReadColumns;
939 use crate::sst::{
940 FlatSchemaOptions, PARQUET_FIELD_ID_KEY, PRIMARY_KEY_PARQUET_FIELD_ID,
941 flat_sst_arrow_schema_column_num, override_pk_field_to_binary, to_flat_sst_arrow_schema,
942 };
943
944 fn build_metadata(
946 num_tags: usize,
947 num_fields: usize,
948 encoding: PrimaryKeyEncoding,
949 ) -> RegionMetadata {
950 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
951 let mut col_id = 0u32;
952
953 for i in 0..num_tags {
954 builder.push_column_metadata(ColumnMetadata {
955 column_schema: ColumnSchema::new(
956 format!("tag_{i}"),
957 ConcreteDataType::string_datatype(),
958 true,
959 ),
960 semantic_type: SemanticType::Tag,
961 column_id: col_id,
962 });
963 col_id += 1;
964 }
965
966 for i in 0..num_fields {
967 builder.push_column_metadata(ColumnMetadata {
968 column_schema: ColumnSchema::new(
969 format!("field_{i}"),
970 ConcreteDataType::uint64_datatype(),
971 true,
972 ),
973 semantic_type: SemanticType::Field,
974 column_id: col_id,
975 });
976 col_id += 1;
977 }
978
979 builder.push_column_metadata(ColumnMetadata {
980 column_schema: ColumnSchema::new(
981 "ts".to_string(),
982 ConcreteDataType::timestamp_millisecond_datatype(),
983 false,
984 ),
985 semantic_type: SemanticType::Timestamp,
986 column_id: col_id,
987 });
988
989 let primary_key: Vec<u32> = (0..num_tags as u32).collect();
990 builder.primary_key(primary_key);
991 builder.primary_key_encoding(encoding);
992 builder.build().unwrap()
993 }
994
995 #[test]
996 fn test_field_column_start() {
997 let cases = [
999 (1, 1, PrimaryKeyEncoding::Dense, 1),
1000 (2, 2, PrimaryKeyEncoding::Dense, 2),
1001 (0, 2, PrimaryKeyEncoding::Dense, 0),
1002 (2, 2, PrimaryKeyEncoding::Sparse, 0),
1003 ];
1004
1005 for (num_tags, num_fields, encoding, expected) in cases {
1006 let metadata = build_metadata(num_tags, num_fields, encoding);
1007 let options = FlatSchemaOptions::from_encoding(encoding);
1008 let num_columns = flat_sst_arrow_schema_column_num(&metadata, &options);
1009 let result = field_column_start(&metadata, num_columns);
1010 assert_eq!(
1011 result, expected,
1012 "num_tags={num_tags}, num_fields={num_fields}, encoding={encoding:?}"
1013 );
1014 }
1015 }
1016
1017 #[test]
1018 fn test_convert_batch_wraps_binary_pk_to_dict() {
1019 use datatypes::arrow::array::{Array, DictionaryArray, StringArray};
1020 use datatypes::arrow::datatypes::UInt32Type;
1021
1022 let metadata = Arc::new(build_metadata(1, 1, PrimaryKeyEncoding::Dense));
1026 let column_ids: Vec<u32> = metadata
1027 .column_metadatas
1028 .iter()
1029 .map(|c| c.column_id)
1030 .collect();
1031 let mut read_format = FlatReadFormat::new(
1032 metadata.clone(),
1033 ReadColumns::from_deduped_column_ids(column_ids),
1034 None,
1035 "test",
1036 false,
1037 )
1038 .unwrap();
1039 read_format.set_pk_as_binary().unwrap();
1040
1041 let output_schema = read_format.output_arrow_schema().unwrap();
1042 let binary_schema = override_pk_field_to_binary(&output_schema);
1043
1044 let pk_field = binary_schema
1047 .field_with_name(PRIMARY_KEY_COLUMN_NAME)
1048 .unwrap();
1049 assert_eq!(
1050 pk_field.metadata().get(PARQUET_FIELD_ID_KEY),
1051 Some(&PRIMARY_KEY_PARQUET_FIELD_ID.to_string()),
1052 "__primary_key field must retain its PARQUET:field_id after override_pk_field_to_binary"
1053 );
1054
1055 let tag_keys = UInt32Array::from(vec![0u32, 1, 1]);
1057 let tag_values = Arc::new(StringArray::from(vec!["t0", "t1"]));
1058 let tag_array: ArrayRef =
1059 Arc::new(DictionaryArray::<UInt32Type>::new(tag_keys, tag_values));
1060 let field_array: ArrayRef = Arc::new(UInt64Array::from(vec![10u64, 11, 12]));
1061 let ts_array: ArrayRef = Arc::new(TimestampMillisecondArray::from(vec![1i64, 2, 3]));
1062 let pk_array: ArrayRef = Arc::new(BinaryArray::from_iter_values(
1063 [b"alpha".as_ref(), b"beta", b"beta"].iter().copied(),
1064 ));
1065 let seq_array: ArrayRef = Arc::new(UInt64Array::from(vec![100u64, 101, 102]));
1066 let op_array: ArrayRef = Arc::new(UInt8Array::from(vec![1u8, 1, 1]));
1067
1068 let batch = RecordBatch::try_new(
1069 binary_schema,
1070 vec![
1071 tag_array,
1072 field_array,
1073 ts_array,
1074 pk_array,
1075 seq_array,
1076 op_array,
1077 ],
1078 )
1079 .unwrap();
1080
1081 let wrapped = read_format.convert_batch(batch, None).unwrap();
1082 assert_eq!(wrapped.schema(), output_schema);
1083
1084 let pk_idx = primary_key_column_index(wrapped.num_columns());
1085 let pk_col = wrapped.column(pk_idx);
1086 assert_eq!(
1087 pk_col.data_type(),
1088 &ArrowDataType::Dictionary(
1089 Box::new(ArrowDataType::UInt32),
1090 Box::new(ArrowDataType::Binary)
1091 )
1092 );
1093 let dict = pk_col
1094 .as_any()
1095 .downcast_ref::<DictionaryArray<UInt32Type>>()
1096 .unwrap();
1097 assert_eq!(dict.keys().values(), &[0, 1, 2]);
1098 let values = dict
1099 .values()
1100 .as_any()
1101 .downcast_ref::<BinaryArray>()
1102 .unwrap();
1103 assert_eq!(values.value(0), b"alpha");
1104 assert_eq!(values.value(1), b"beta");
1105 assert_eq!(values.value(2), b"beta");
1106 }
1107
1108 #[test]
1109 fn test_output_arrow_schema_uses_projection() {
1110 let metadata = Arc::new(build_metadata(1, 2, PrimaryKeyEncoding::Dense));
1111 let read_format = FlatReadFormat::new(
1112 metadata.clone(),
1113 ReadColumns::from_deduped_column_ids([0_u32, 2_u32]),
1114 None,
1115 "test",
1116 false,
1117 )
1118 .unwrap();
1119
1120 let output_schema = read_format.output_arrow_schema().unwrap();
1121 let projection = read_format.parquet_read_columns().root_indices();
1122 let expected = Arc::new(
1123 to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default())
1124 .project(projection)
1125 .unwrap(),
1126 );
1127
1128 assert_eq!(expected, output_schema);
1129 }
1130
1131 #[test]
1132 fn test_prune_schema_by_nested_paths() {
1133 fn new_field(name: &str, data_type: ArrowDataType) -> FieldRef {
1134 Arc::new(Field::new(name, data_type, true))
1135 }
1136
1137 fn struct_field(name: &str, fields: impl IntoIterator<Item = FieldRef>) -> FieldRef {
1138 new_field(name, ArrowDataType::Struct(fields.into_iter().collect()))
1139 }
1140
1141 let mut schema = Schema::new([
1142 struct_field(
1143 "j",
1144 [
1145 struct_field(
1146 "a",
1147 [
1148 new_field("x", ArrowDataType::Int64),
1149 new_field("y", ArrowDataType::Utf8),
1150 struct_field(
1151 "z",
1152 [
1153 new_field("q", ArrowDataType::Boolean),
1154 new_field("r", ArrowDataType::Float64),
1155 ],
1156 ),
1157 ],
1158 ),
1159 new_field("b", ArrowDataType::Utf8),
1160 struct_field(
1161 "c",
1162 vec![
1163 new_field("d", ArrowDataType::Int64),
1164 new_field("e", ArrowDataType::Utf8),
1165 ],
1166 ),
1167 ],
1168 ),
1169 new_field("tag", ArrowDataType::Utf8),
1170 struct_field(
1171 "k",
1172 [
1173 new_field("k_0", ArrowDataType::Int64),
1174 new_field("k_1", ArrowDataType::Utf8),
1175 ],
1176 ),
1177 ]);
1178
1179 let nested_paths = [
1180 vec![
1181 ["j", "a", "x"].iter().map(|x| x.to_string()).collect(),
1182 ["j", "a", "z", "q"].iter().map(|x| x.to_string()).collect(),
1183 ["j", "c"].iter().map(|x| x.to_string()).collect(),
1184 ],
1185 vec![],
1186 vec![],
1187 ];
1188
1189 prune_schema_by_nested_paths(
1190 &mut schema,
1191 nested_paths.iter().map(|paths| paths.as_slice()),
1192 );
1193
1194 let expected = Schema::new([
1195 struct_field(
1196 "j",
1197 [
1198 struct_field(
1199 "a",
1200 [
1201 new_field("x", ArrowDataType::Int64),
1202 struct_field("z", vec![new_field("q", ArrowDataType::Boolean)]),
1203 ],
1204 ),
1205 struct_field(
1206 "c",
1207 [
1208 new_field("d", ArrowDataType::Int64),
1209 new_field("e", ArrowDataType::Utf8),
1210 ],
1211 ),
1212 ],
1213 ),
1214 new_field("tag", ArrowDataType::Utf8),
1215 struct_field(
1216 "k",
1217 [
1218 new_field("k_0", ArrowDataType::Int64),
1219 new_field("k_1", ArrowDataType::Utf8),
1220 ],
1221 ),
1222 ]);
1223
1224 assert_eq!(schema, expected);
1225 }
1226}