1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52 NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55 FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56 StatValues,
57};
58use crate::sst::{
59 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60 to_flat_sst_arrow_schema,
61};
62
63pub(crate) struct FlatWriteFormat {
65 arrow_schema: SchemaRef,
67 override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74 FlatWriteFormat {
75 arrow_schema,
76 override_sequence: None,
77 }
78 }
79
80 pub(crate) fn with_override_sequence(
82 mut self,
83 override_sequence: Option<SequenceNumber>,
84 ) -> Self {
85 self.override_sequence = override_sequence;
86 self
87 }
88
89 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91 &self.arrow_schema
92 }
93
94 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98 let Some(override_sequence) = self.override_sequence else {
99 return Ok(batch.clone());
100 };
101
102 let mut columns = batch.columns().to_vec();
103 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104 columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107 }
108}
109
110pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112 num_columns - 2
113}
114
115pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117 num_columns - 4
118}
119
120pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122 num_columns - 3
123}
124
125pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127 num_columns - 1
128}
129
130pub struct FlatReadFormat {
137 override_sequence: Option<SequenceNumber>,
139 parquet_adapter: ParquetAdapter,
141}
142
143impl FlatReadFormat {
144 pub fn new(
148 metadata: RegionMetadataRef,
149 column_ids: impl Iterator<Item = ColumnId>,
150 num_columns: Option<usize>,
151 file_path: &str,
152 skip_auto_convert: bool,
153 ) -> Result<FlatReadFormat> {
154 let is_legacy = match num_columns {
155 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
156 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
157 };
158
159 let parquet_adapter = if is_legacy {
160 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
162 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
164 metadata,
165 column_ids,
166 skip_auto_convert,
167 ))
168 } else {
169 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
170 metadata, column_ids, false,
171 ))
172 }
173 } else {
174 ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
175 };
176
177 Ok(FlatReadFormat {
178 override_sequence: None,
179 parquet_adapter,
180 })
181 }
182
183 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
185 self.override_sequence = sequence;
186 }
187
188 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
190 self.format_projection()
191 .column_id_to_projected_index
192 .get(&column_id)
193 .copied()
194 }
195
196 pub fn min_values(
198 &self,
199 row_groups: &[impl Borrow<RowGroupMetaData>],
200 column_id: ColumnId,
201 ) -> StatValues {
202 match &self.parquet_adapter {
203 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
204 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
205 }
206 }
207
208 pub fn max_values(
210 &self,
211 row_groups: &[impl Borrow<RowGroupMetaData>],
212 column_id: ColumnId,
213 ) -> StatValues {
214 match &self.parquet_adapter {
215 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
216 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
217 }
218 }
219
220 pub fn null_counts(
222 &self,
223 row_groups: &[impl Borrow<RowGroupMetaData>],
224 column_id: ColumnId,
225 ) -> StatValues {
226 match &self.parquet_adapter {
227 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
228 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
229 }
230 }
231
232 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
237 match &self.parquet_adapter {
238 ParquetAdapter::Flat(p) => &p.arrow_schema,
239 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
240 }
241 }
242
243 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
245 match &self.parquet_adapter {
246 ParquetAdapter::Flat(p) => &p.metadata,
247 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
248 }
249 }
250
251 pub(crate) fn projection_indices(&self) -> &[usize] {
253 match &self.parquet_adapter {
254 ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
255 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
256 }
257 }
258
259 pub(crate) fn format_projection(&self) -> &FormatProjection {
261 match &self.parquet_adapter {
262 ParquetAdapter::Flat(p) => &p.format_projection,
263 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
264 }
265 }
266
267 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
269 self.override_sequence
270 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
271 }
272
273 pub(crate) fn convert_batch(
278 &self,
279 record_batch: RecordBatch,
280 override_sequence_array: Option<&ArrayRef>,
281 ) -> Result<RecordBatch> {
282 let batch = match &self.parquet_adapter {
284 ParquetAdapter::Flat(_) => record_batch,
285 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
286 };
287
288 let Some(override_array) = override_sequence_array else {
290 return Ok(batch);
291 };
292
293 let mut columns = batch.columns().to_vec();
294 let sequence_column_idx = sequence_column_index(batch.num_columns());
295
296 let sequence_array = if override_array.len() > batch.num_rows() {
298 override_array.slice(0, batch.num_rows())
299 } else {
300 override_array.clone()
301 };
302
303 columns[sequence_column_idx] = sequence_array;
304
305 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
306 }
307
308 pub(crate) fn is_legacy_format(
314 metadata: &RegionMetadata,
315 num_columns: usize,
316 file_path: &str,
317 ) -> Result<bool> {
318 if metadata.primary_key.is_empty() {
319 return Ok(false);
320 }
321
322 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
325
326 if expected_columns == num_columns {
327 Ok(false)
329 } else {
330 ensure!(
331 expected_columns >= num_columns,
332 InvalidParquetSnafu {
333 file: file_path,
334 reason: format!(
335 "Expected columns {} should be >= actual columns {}",
336 expected_columns, num_columns
337 )
338 }
339 );
340
341 let column_diff = expected_columns - num_columns;
343
344 ensure!(
345 column_diff == metadata.primary_key.len(),
346 InvalidParquetSnafu {
347 file: file_path,
348 reason: format!(
349 "Column number difference {} does not match primary key count {}",
350 column_diff,
351 metadata.primary_key.len()
352 )
353 }
354 );
355
356 Ok(true)
357 }
358 }
359}
360
361enum ParquetAdapter {
363 Flat(ParquetFlat),
364 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
365}
366
367struct ParquetPrimaryKeyToFlat {
369 format: PrimaryKeyReadFormat,
371 convert_format: Option<FlatConvertFormat>,
373 format_projection: FormatProjection,
375}
376
377impl ParquetPrimaryKeyToFlat {
378 fn new(
380 metadata: RegionMetadataRef,
381 column_ids: impl Iterator<Item = ColumnId>,
382 skip_auto_convert: bool,
383 ) -> ParquetPrimaryKeyToFlat {
384 assert!(if skip_auto_convert {
385 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
386 } else {
387 true
388 });
389
390 let column_ids: Vec<_> = column_ids.collect();
391
392 let id_to_index = sst_column_id_indices(&metadata);
394 let sst_column_num =
395 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
396 let format_projection = FormatProjection::compute_format_projection(
398 &id_to_index,
399 sst_column_num,
400 column_ids.iter().copied(),
401 );
402 let codec = build_primary_key_codec(&metadata);
403 let convert_format = if skip_auto_convert {
404 None
405 } else {
406 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
407 };
408
409 let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
410
411 Self {
412 format,
413 convert_format,
414 format_projection,
415 }
416 }
417
418 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
419 if let Some(convert_format) = &self.convert_format {
420 convert_format.convert(record_batch)
421 } else {
422 Ok(record_batch)
423 }
424 }
425}
426
427struct ParquetFlat {
429 metadata: RegionMetadataRef,
431 arrow_schema: SchemaRef,
433 format_projection: FormatProjection,
435 column_id_to_sst_index: HashMap<ColumnId, usize>,
437}
438
439impl ParquetFlat {
440 fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
442 let id_to_index = sst_column_id_indices(&metadata);
444 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
445 let sst_column_num =
446 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
447 let format_projection =
448 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
449
450 Self {
451 metadata,
452 arrow_schema,
453 format_projection,
454 column_id_to_sst_index: id_to_index,
455 }
456 }
457
458 fn min_values(
460 &self,
461 row_groups: &[impl Borrow<RowGroupMetaData>],
462 column_id: ColumnId,
463 ) -> StatValues {
464 self.get_stat_values(row_groups, column_id, true)
465 }
466
467 fn max_values(
469 &self,
470 row_groups: &[impl Borrow<RowGroupMetaData>],
471 column_id: ColumnId,
472 ) -> StatValues {
473 self.get_stat_values(row_groups, column_id, false)
474 }
475
476 fn null_counts(
478 &self,
479 row_groups: &[impl Borrow<RowGroupMetaData>],
480 column_id: ColumnId,
481 ) -> StatValues {
482 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
483 return StatValues::NoColumn;
485 };
486
487 let stats = ReadFormat::column_null_counts(row_groups, *index);
488 StatValues::from_stats_opt(stats)
489 }
490
491 fn get_stat_values(
492 &self,
493 row_groups: &[impl Borrow<RowGroupMetaData>],
494 column_id: ColumnId,
495 is_min: bool,
496 ) -> StatValues {
497 let Some(column) = self.metadata.column_by_id(column_id) else {
498 return StatValues::NoColumn;
500 };
501 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
503
504 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
505 StatValues::from_stats_opt(stats)
506 }
507}
508
509pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
513 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
514 let mut column_index = 0;
515 for pk_id in &metadata.primary_key {
517 id_to_index.insert(*pk_id, column_index);
518 column_index += 1;
519 }
520 for column in &metadata.column_metadatas {
522 if column.semantic_type == SemanticType::Field {
523 id_to_index.insert(column.column_id, column_index);
524 column_index += 1;
525 }
526 }
527 id_to_index.insert(metadata.time_index_column().column_id, column_index);
529
530 id_to_index
531}
532
533pub(crate) fn decode_primary_keys(
537 codec: &dyn PrimaryKeyCodec,
538 batch: &RecordBatch,
539) -> Result<DecodedPrimaryKeys> {
540 let primary_key_index = primary_key_column_index(batch.num_columns());
541 let pk_dict_array = batch
542 .column(primary_key_index)
543 .as_any()
544 .downcast_ref::<PrimaryKeyArray>()
545 .with_context(|| InvalidRecordBatchSnafu {
546 reason: "Primary key column is not a dictionary array".to_string(),
547 })?;
548 let pk_values_array = pk_dict_array
549 .values()
550 .as_any()
551 .downcast_ref::<BinaryArray>()
552 .with_context(|| InvalidRecordBatchSnafu {
553 reason: "Primary key values are not binary array".to_string(),
554 })?;
555
556 let keys = pk_dict_array.keys();
557
558 let mut key_to_decoded_index = Vec::with_capacity(keys.len());
561 let mut decoded_pk_values = Vec::new();
562 let mut prev_key: Option<u32> = None;
563
564 for i in 0..keys.len() {
567 let current_key = keys.value(i);
568
569 if let Some(prev) = prev_key
571 && prev == current_key
572 {
573 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
575 continue;
576 }
577
578 let pk_bytes = pk_values_array.value(current_key as usize);
580 let decoded_value = codec.decode(pk_bytes).context(DecodeSnafu)?;
581
582 decoded_pk_values.push(decoded_value);
583 key_to_decoded_index.push((decoded_pk_values.len() - 1) as u32);
584 prev_key = Some(current_key);
585 }
586
587 let keys_array = UInt32Array::from(key_to_decoded_index);
589
590 Ok(DecodedPrimaryKeys {
591 decoded_pk_values,
592 keys_array,
593 })
594}
595
596pub(crate) struct DecodedPrimaryKeys {
598 decoded_pk_values: Vec<CompositeValues>,
600 keys_array: UInt32Array,
602}
603
604impl DecodedPrimaryKeys {
605 pub(crate) fn get_tag_column(
610 &self,
611 column_id: ColumnId,
612 pk_index: Option<usize>,
613 column_type: &ConcreteDataType,
614 ) -> Result<ArrayRef> {
615 let mut builder = column_type.create_mutable_vector(self.decoded_pk_values.len());
617 for decoded in &self.decoded_pk_values {
618 match decoded {
619 CompositeValues::Dense(dense) => {
620 let pk_idx = pk_index.expect("pk_index required for dense encoding");
621 if pk_idx < dense.len() {
622 builder.push_value_ref(&dense[pk_idx].1.as_value_ref());
623 } else {
624 builder.push_null();
625 }
626 }
627 CompositeValues::Sparse(sparse) => {
628 let value = sparse.get_or_null(column_id);
629 builder.push_value_ref(&value.as_value_ref());
630 }
631 };
632 }
633
634 let values_vector = builder.to_vector();
635 let values_array = values_vector.to_arrow_array();
636
637 if column_type.is_string() {
639 let dict_array = DictionaryArray::new(self.keys_array.clone(), values_array);
642 Ok(Arc::new(dict_array))
643 } else {
644 let taken_array =
646 take(&values_array, &self.keys_array, None).context(ComputeArrowSnafu)?;
647 Ok(taken_array)
648 }
649 }
650}
651
652pub(crate) struct FlatConvertFormat {
655 metadata: RegionMetadataRef,
657 codec: Arc<dyn PrimaryKeyCodec>,
659 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
661}
662
663impl FlatConvertFormat {
664 pub(crate) fn new(
671 metadata: RegionMetadataRef,
672 format_projection: &FormatProjection,
673 codec: Arc<dyn PrimaryKeyCodec>,
674 ) -> Option<Self> {
675 if metadata.primary_key.is_empty() {
676 return None;
677 }
678
679 let mut projected_primary_keys = Vec::new();
681 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
682 if format_projection
683 .column_id_to_projected_index
684 .contains_key(&column_id)
685 {
686 let column_index = metadata.column_index_by_id(column_id).unwrap();
688 projected_primary_keys.push((column_id, pk_index, column_index));
689 }
690 }
691
692 Some(Self {
693 metadata,
694 codec,
695 projected_primary_keys,
696 })
697 }
698
699 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
703 if self.projected_primary_keys.is_empty() {
704 return Ok(batch);
705 }
706
707 let decoded_pks = decode_primary_keys(self.codec.as_ref(), &batch)?;
708
709 let mut decoded_columns = Vec::new();
711 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
712 let column_metadata = &self.metadata.column_metadatas[*column_index];
713 let tag_column = decoded_pks.get_tag_column(
714 *column_id,
715 Some(*pk_index),
716 &column_metadata.column_schema.data_type,
717 )?;
718 decoded_columns.push(tag_column);
719 }
720
721 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
723 new_columns.extend(decoded_columns);
724 new_columns.extend_from_slice(batch.columns());
725
726 let mut new_fields =
728 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
729 for (_, _, column_index) in &self.projected_primary_keys {
730 let column_metadata = &self.metadata.column_metadatas[*column_index];
731 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
732 let field =
733 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
734 new_fields.push(field);
735 }
736 new_fields.extend(batch.schema().fields().iter().cloned());
737
738 let new_schema = Arc::new(Schema::new(new_fields));
739 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
740 }
741}
742
743#[cfg(test)]
744impl FlatReadFormat {
745 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
747 Self::new(
748 Arc::clone(&metadata),
749 metadata.column_metadatas.iter().map(|c| c.column_id),
750 None,
751 "test",
752 false,
753 )
754 .unwrap()
755 }
756}