1use std::borrow::Borrow;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35use api::v1::SemanticType;
36use datatypes::arrow::array::{
37 Array, ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::compute::kernels::take::take;
40use datatypes::arrow::datatypes::{Schema, SchemaRef};
41use datatypes::arrow::record_batch::RecordBatch;
42use datatypes::prelude::{ConcreteDataType, DataType};
43use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
44use parquet::file::metadata::RowGroupMetaData;
45use snafu::{OptionExt, ResultExt, ensure};
46use store_api::codec::PrimaryKeyEncoding;
47use store_api::metadata::{RegionMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ComputeArrowSnafu, DecodeSnafu, InvalidParquetSnafu, InvalidRecordBatchSnafu,
52 NewRecordBatchSnafu, Result,
53};
54use crate::sst::parquet::format::{
55 FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray, PrimaryKeyReadFormat, ReadFormat,
56 StatValues,
57};
58use crate::sst::{
59 FlatSchemaOptions, flat_sst_arrow_schema_column_num, tag_maybe_to_dictionary_field,
60 to_flat_sst_arrow_schema,
61};
62
63pub(crate) struct FlatWriteFormat {
65 arrow_schema: SchemaRef,
67 override_sequence: Option<SequenceNumber>,
68}
69
70impl FlatWriteFormat {
71 pub(crate) fn new(metadata: RegionMetadataRef, options: &FlatSchemaOptions) -> FlatWriteFormat {
73 let arrow_schema = to_flat_sst_arrow_schema(&metadata, options);
74 FlatWriteFormat {
75 arrow_schema,
76 override_sequence: None,
77 }
78 }
79
80 pub(crate) fn with_override_sequence(
82 mut self,
83 override_sequence: Option<SequenceNumber>,
84 ) -> Self {
85 self.override_sequence = override_sequence;
86 self
87 }
88
89 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
91 &self.arrow_schema
92 }
93
94 pub(crate) fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
96 debug_assert_eq!(batch.num_columns(), self.arrow_schema.fields().len());
97
98 let Some(override_sequence) = self.override_sequence else {
99 return Ok(batch.clone());
100 };
101
102 let mut columns = batch.columns().to_vec();
103 let sequence_array = Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
104 columns[sequence_column_index(batch.num_columns())] = sequence_array;
105
106 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
107 }
108}
109
110pub(crate) fn sequence_column_index(num_columns: usize) -> usize {
112 num_columns - 2
113}
114
115pub(crate) fn time_index_column_index(num_columns: usize) -> usize {
117 num_columns - 4
118}
119
120pub(crate) fn primary_key_column_index(num_columns: usize) -> usize {
122 num_columns - 3
123}
124
125pub(crate) fn op_type_column_index(num_columns: usize) -> usize {
127 num_columns - 1
128}
129
130pub struct FlatReadFormat {
135 override_sequence: Option<SequenceNumber>,
137 parquet_adapter: ParquetAdapter,
139}
140
141impl FlatReadFormat {
142 pub fn new(
146 metadata: RegionMetadataRef,
147 column_ids: impl Iterator<Item = ColumnId>,
148 num_columns: Option<usize>,
149 file_path: &str,
150 skip_auto_convert: bool,
151 ) -> Result<FlatReadFormat> {
152 let is_legacy = match num_columns {
153 Some(num) => Self::is_legacy_format(&metadata, num, file_path)?,
154 None => metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse,
155 };
156
157 let parquet_adapter = if is_legacy {
158 if metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse {
160 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
162 metadata,
163 column_ids,
164 skip_auto_convert,
165 ))
166 } else {
167 ParquetAdapter::PrimaryKeyToFlat(ParquetPrimaryKeyToFlat::new(
168 metadata, column_ids, false,
169 ))
170 }
171 } else {
172 ParquetAdapter::Flat(ParquetFlat::new(metadata, column_ids))
173 };
174
175 Ok(FlatReadFormat {
176 override_sequence: None,
177 parquet_adapter,
178 })
179 }
180
181 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
183 self.override_sequence = sequence;
184 }
185
186 pub fn projected_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
188 self.format_projection()
189 .column_id_to_projected_index
190 .get(&column_id)
191 .copied()
192 }
193
194 pub fn min_values(
196 &self,
197 row_groups: &[impl Borrow<RowGroupMetaData>],
198 column_id: ColumnId,
199 ) -> StatValues {
200 match &self.parquet_adapter {
201 ParquetAdapter::Flat(p) => p.min_values(row_groups, column_id),
202 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.min_values(row_groups, column_id),
203 }
204 }
205
206 pub fn max_values(
208 &self,
209 row_groups: &[impl Borrow<RowGroupMetaData>],
210 column_id: ColumnId,
211 ) -> StatValues {
212 match &self.parquet_adapter {
213 ParquetAdapter::Flat(p) => p.max_values(row_groups, column_id),
214 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.max_values(row_groups, column_id),
215 }
216 }
217
218 pub fn null_counts(
220 &self,
221 row_groups: &[impl Borrow<RowGroupMetaData>],
222 column_id: ColumnId,
223 ) -> StatValues {
224 match &self.parquet_adapter {
225 ParquetAdapter::Flat(p) => p.null_counts(row_groups, column_id),
226 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.null_counts(row_groups, column_id),
227 }
228 }
229
230 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
235 match &self.parquet_adapter {
236 ParquetAdapter::Flat(p) => &p.arrow_schema,
237 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.arrow_schema(),
238 }
239 }
240
241 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
243 match &self.parquet_adapter {
244 ParquetAdapter::Flat(p) => &p.metadata,
245 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.metadata(),
246 }
247 }
248
249 pub(crate) fn projection_indices(&self) -> &[usize] {
251 match &self.parquet_adapter {
252 ParquetAdapter::Flat(p) => &p.format_projection.projection_indices,
253 ParquetAdapter::PrimaryKeyToFlat(p) => p.format.projection_indices(),
254 }
255 }
256
257 pub(crate) fn format_projection(&self) -> &FormatProjection {
259 match &self.parquet_adapter {
260 ParquetAdapter::Flat(p) => &p.format_projection,
261 ParquetAdapter::PrimaryKeyToFlat(p) => &p.format_projection,
262 }
263 }
264
265 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
267 self.override_sequence
268 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
269 }
270
271 pub(crate) fn convert_batch(
276 &self,
277 record_batch: RecordBatch,
278 override_sequence_array: Option<&ArrayRef>,
279 ) -> Result<RecordBatch> {
280 let batch = match &self.parquet_adapter {
282 ParquetAdapter::Flat(_) => record_batch,
283 ParquetAdapter::PrimaryKeyToFlat(p) => p.convert_batch(record_batch)?,
284 };
285
286 let Some(override_array) = override_sequence_array else {
288 return Ok(batch);
289 };
290
291 let mut columns = batch.columns().to_vec();
292 let sequence_column_idx = sequence_column_index(batch.num_columns());
293
294 let sequence_array = if override_array.len() > batch.num_rows() {
296 override_array.slice(0, batch.num_rows())
297 } else {
298 override_array.clone()
299 };
300
301 columns[sequence_column_idx] = sequence_array;
302
303 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
304 }
305
306 pub(crate) fn is_legacy_format(
312 metadata: &RegionMetadata,
313 num_columns: usize,
314 file_path: &str,
315 ) -> Result<bool> {
316 if metadata.primary_key.is_empty() {
317 return Ok(false);
318 }
319
320 let expected_columns = metadata.column_metadatas.len() + INTERNAL_COLUMN_NUM;
323
324 if expected_columns == num_columns {
325 Ok(false)
327 } else {
328 ensure!(
329 expected_columns >= num_columns,
330 InvalidParquetSnafu {
331 file: file_path,
332 reason: format!(
333 "Expected columns {} should be >= actual columns {}",
334 expected_columns, num_columns
335 )
336 }
337 );
338
339 let column_diff = expected_columns - num_columns;
341
342 ensure!(
343 column_diff == metadata.primary_key.len(),
344 InvalidParquetSnafu {
345 file: file_path,
346 reason: format!(
347 "Column number difference {} does not match primary key count {}",
348 column_diff,
349 metadata.primary_key.len()
350 )
351 }
352 );
353
354 Ok(true)
355 }
356 }
357}
358
359enum ParquetAdapter {
361 Flat(ParquetFlat),
362 PrimaryKeyToFlat(ParquetPrimaryKeyToFlat),
363}
364
365struct ParquetPrimaryKeyToFlat {
367 format: PrimaryKeyReadFormat,
369 convert_format: Option<FlatConvertFormat>,
371 format_projection: FormatProjection,
373}
374
375impl ParquetPrimaryKeyToFlat {
376 fn new(
378 metadata: RegionMetadataRef,
379 column_ids: impl Iterator<Item = ColumnId>,
380 skip_auto_convert: bool,
381 ) -> ParquetPrimaryKeyToFlat {
382 assert!(if skip_auto_convert {
383 metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse
384 } else {
385 true
386 });
387
388 let column_ids: Vec<_> = column_ids.collect();
389
390 let id_to_index = sst_column_id_indices(&metadata);
392 let sst_column_num =
393 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
394 let format_projection = FormatProjection::compute_format_projection(
396 &id_to_index,
397 sst_column_num,
398 column_ids.iter().copied(),
399 );
400 let codec = build_primary_key_codec(&metadata);
401 let convert_format = if skip_auto_convert {
402 None
403 } else {
404 FlatConvertFormat::new(Arc::clone(&metadata), &format_projection, codec)
405 };
406
407 let format = PrimaryKeyReadFormat::new(metadata.clone(), column_ids.iter().copied());
408
409 Self {
410 format,
411 convert_format,
412 format_projection,
413 }
414 }
415
416 fn convert_batch(&self, record_batch: RecordBatch) -> Result<RecordBatch> {
417 if let Some(convert_format) = &self.convert_format {
418 convert_format.convert(record_batch)
419 } else {
420 Ok(record_batch)
421 }
422 }
423}
424
425struct ParquetFlat {
427 metadata: RegionMetadataRef,
429 arrow_schema: SchemaRef,
431 format_projection: FormatProjection,
433 column_id_to_sst_index: HashMap<ColumnId, usize>,
435}
436
437impl ParquetFlat {
438 fn new(metadata: RegionMetadataRef, column_ids: impl Iterator<Item = ColumnId>) -> ParquetFlat {
440 let id_to_index = sst_column_id_indices(&metadata);
442 let arrow_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
443 let sst_column_num =
444 flat_sst_arrow_schema_column_num(&metadata, &FlatSchemaOptions::default());
445 let format_projection =
446 FormatProjection::compute_format_projection(&id_to_index, sst_column_num, column_ids);
447
448 Self {
449 metadata,
450 arrow_schema,
451 format_projection,
452 column_id_to_sst_index: id_to_index,
453 }
454 }
455
456 fn min_values(
458 &self,
459 row_groups: &[impl Borrow<RowGroupMetaData>],
460 column_id: ColumnId,
461 ) -> StatValues {
462 self.get_stat_values(row_groups, column_id, true)
463 }
464
465 fn max_values(
467 &self,
468 row_groups: &[impl Borrow<RowGroupMetaData>],
469 column_id: ColumnId,
470 ) -> StatValues {
471 self.get_stat_values(row_groups, column_id, false)
472 }
473
474 fn null_counts(
476 &self,
477 row_groups: &[impl Borrow<RowGroupMetaData>],
478 column_id: ColumnId,
479 ) -> StatValues {
480 let Some(index) = self.column_id_to_sst_index.get(&column_id) else {
481 return StatValues::NoColumn;
483 };
484
485 let stats = ReadFormat::column_null_counts(row_groups, *index);
486 StatValues::from_stats_opt(stats)
487 }
488
489 fn get_stat_values(
490 &self,
491 row_groups: &[impl Borrow<RowGroupMetaData>],
492 column_id: ColumnId,
493 is_min: bool,
494 ) -> StatValues {
495 let Some(column) = self.metadata.column_by_id(column_id) else {
496 return StatValues::NoColumn;
498 };
499 let index = self.column_id_to_sst_index.get(&column_id).unwrap();
501
502 let stats = ReadFormat::column_values(row_groups, column, *index, is_min);
503 StatValues::from_stats_opt(stats)
504 }
505}
506
507pub(crate) fn sst_column_id_indices(metadata: &RegionMetadata) -> HashMap<ColumnId, usize> {
511 let mut id_to_index = HashMap::with_capacity(metadata.column_metadatas.len());
512 let mut column_index = 0;
513 for pk_id in &metadata.primary_key {
515 id_to_index.insert(*pk_id, column_index);
516 column_index += 1;
517 }
518 for column in &metadata.column_metadatas {
520 if column.semantic_type == SemanticType::Field {
521 id_to_index.insert(column.column_id, column_index);
522 column_index += 1;
523 }
524 }
525 id_to_index.insert(metadata.time_index_column().column_id, column_index);
527
528 id_to_index
529}
530
531pub(crate) struct FlatConvertFormat {
534 metadata: RegionMetadataRef,
536 codec: Arc<dyn PrimaryKeyCodec>,
538 projected_primary_keys: Vec<(ColumnId, usize, usize)>,
540}
541
542impl FlatConvertFormat {
543 pub(crate) fn new(
550 metadata: RegionMetadataRef,
551 format_projection: &FormatProjection,
552 codec: Arc<dyn PrimaryKeyCodec>,
553 ) -> Option<Self> {
554 if metadata.primary_key.is_empty() {
555 return None;
556 }
557
558 let mut projected_primary_keys = Vec::new();
560 for (pk_index, &column_id) in metadata.primary_key.iter().enumerate() {
561 if format_projection
562 .column_id_to_projected_index
563 .contains_key(&column_id)
564 {
565 let column_index = metadata.column_index_by_id(column_id).unwrap();
567 projected_primary_keys.push((column_id, pk_index, column_index));
568 }
569 }
570
571 Some(Self {
572 metadata,
573 codec,
574 projected_primary_keys,
575 })
576 }
577
578 pub(crate) fn convert(&self, batch: RecordBatch) -> Result<RecordBatch> {
584 if self.projected_primary_keys.is_empty() {
585 return Ok(batch);
586 }
587
588 let primary_key_index = primary_key_column_index(batch.num_columns());
589 let pk_dict_array = batch
590 .column(primary_key_index)
591 .as_any()
592 .downcast_ref::<PrimaryKeyArray>()
593 .with_context(|| InvalidRecordBatchSnafu {
594 reason: "Primary key column is not a dictionary array".to_string(),
595 })?;
596
597 let pk_values_array = pk_dict_array
598 .values()
599 .as_any()
600 .downcast_ref::<BinaryArray>()
601 .with_context(|| InvalidRecordBatchSnafu {
602 reason: "Primary key values are not binary array".to_string(),
603 })?;
604
605 let mut decoded_pk_values = Vec::with_capacity(pk_values_array.len());
607 for i in 0..pk_values_array.len() {
608 if pk_values_array.is_null(i) {
609 decoded_pk_values.push(None);
610 } else {
611 let pk_bytes = pk_values_array.value(i);
612 let decoded = self.codec.decode(pk_bytes).context(DecodeSnafu)?;
613 decoded_pk_values.push(Some(decoded));
614 }
615 }
616
617 let mut decoded_columns = Vec::new();
619 for (column_id, pk_index, column_index) in &self.projected_primary_keys {
620 let column_metadata = &self.metadata.column_metadatas[*column_index];
621 let tag_column = self.build_primary_key_column(
622 *column_id,
623 *pk_index,
624 &column_metadata.column_schema.data_type,
625 pk_dict_array.keys(),
626 &decoded_pk_values,
627 )?;
628 decoded_columns.push(tag_column);
629 }
630
631 let mut new_columns = Vec::with_capacity(batch.num_columns() + decoded_columns.len());
633 new_columns.extend(decoded_columns);
634 new_columns.extend_from_slice(batch.columns());
635
636 let mut new_fields =
638 Vec::with_capacity(batch.schema().fields().len() + self.projected_primary_keys.len());
639 for (_, _, column_index) in &self.projected_primary_keys {
640 let column_metadata = &self.metadata.column_metadatas[*column_index];
641 let old_field = &self.metadata.schema.arrow_schema().fields()[*column_index];
642 let field =
643 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, old_field);
644 new_fields.push(field);
645 }
646 new_fields.extend(batch.schema().fields().iter().cloned());
647
648 let new_schema = Arc::new(Schema::new(new_fields));
649 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)
650 }
651
652 fn build_primary_key_column(
657 &self,
658 column_id: ColumnId,
659 pk_index: usize,
660 column_type: &ConcreteDataType,
661 keys: &UInt32Array,
662 decoded_pk_values: &[Option<CompositeValues>],
663 ) -> Result<ArrayRef> {
664 let mut builder = column_type.create_mutable_vector(decoded_pk_values.len());
666 for decoded_opt in decoded_pk_values {
667 match decoded_opt {
668 Some(decoded) => {
669 match decoded {
670 CompositeValues::Dense(dense) => {
671 if pk_index < dense.len() {
672 builder.push_value_ref(&dense[pk_index].1.as_value_ref());
673 } else {
674 builder.push_null();
675 }
676 }
677 CompositeValues::Sparse(sparse) => {
678 let value = sparse.get_or_null(column_id);
679 builder.push_value_ref(&value.as_value_ref());
680 }
681 };
682 }
683 None => builder.push_null(),
684 }
685 }
686
687 let values_vector = builder.to_vector();
688 let values_array = values_vector.to_arrow_array();
689
690 if matches!(column_type, ConcreteDataType::String(_)) {
692 let dict_array = DictionaryArray::new(keys.clone(), values_array);
695 Ok(Arc::new(dict_array))
696 } else {
697 let taken_array = take(&values_array, keys, None).context(ComputeArrowSnafu)?;
699 Ok(taken_array)
700 }
701 }
702}
703
704#[cfg(test)]
705impl FlatReadFormat {
706 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> FlatReadFormat {
708 Self::new(
709 Arc::clone(&metadata),
710 metadata.column_metadatas.iter().map(|c| c.column_id),
711 None,
712 "test",
713 false,
714 )
715 .unwrap()
716 }
717}