1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
44use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
45use parquet::file::statistics::Statistics;
46use snafu::{ensure, OptionExt, ResultExt};
47use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
52};
53use crate::read::{Batch, BatchBuilder, BatchColumn};
54use crate::sst::file::{FileMeta, FileTimeRange};
55use crate::sst::parquet::flat_format::FlatReadFormat;
56use crate::sst::to_sst_arrow_schema;
57
58pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
60pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
62
63pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
67pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
69
70pub(crate) struct PrimaryKeyWriteFormat {
72 metadata: RegionMetadataRef,
73 arrow_schema: SchemaRef,
75 override_sequence: Option<SequenceNumber>,
76}
77
78impl PrimaryKeyWriteFormat {
79 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
81 let arrow_schema = to_sst_arrow_schema(&metadata);
82 PrimaryKeyWriteFormat {
83 metadata,
84 arrow_schema,
85 override_sequence: None,
86 }
87 }
88
89 pub(crate) fn with_override_sequence(
91 mut self,
92 override_sequence: Option<SequenceNumber>,
93 ) -> Self {
94 self.override_sequence = override_sequence;
95 self
96 }
97
98 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
100 &self.arrow_schema
101 }
102
103 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
105 debug_assert_eq!(
106 batch.fields().len() + FIXED_POS_COLUMN_NUM,
107 self.arrow_schema.fields().len()
108 );
109 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
110 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
112 ensure!(
113 column.column_id == column_metadata.column_id,
114 InvalidBatchSnafu {
115 reason: format!(
116 "Batch has column {} but metadata has column {}",
117 column.column_id, column_metadata.column_id
118 ),
119 }
120 );
121
122 columns.push(column.data.to_arrow_array());
123 }
124 columns.push(batch.timestamps().to_arrow_array());
126 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
128
129 if let Some(override_sequence) = self.override_sequence {
130 let sequence_array =
131 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
132 columns.push(sequence_array);
133 } else {
134 columns.push(batch.sequences().to_arrow_array());
135 }
136 columns.push(batch.op_types().to_arrow_array());
137
138 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
139 }
140}
141
142pub enum ReadFormat {
144 PrimaryKey(PrimaryKeyReadFormat),
145 Flat(FlatReadFormat),
146}
147
148impl ReadFormat {
149 pub(crate) fn new(
150 metadata: RegionMetadataRef,
151 column_ids: impl Iterator<Item = ColumnId>,
152 flat_format: bool,
153 ) -> Self {
154 if flat_format {
155 Self::new_flat(metadata, column_ids, false)
156 } else {
157 Self::new_primary_key(metadata, column_ids)
158 }
159 }
160
161 pub fn new_primary_key(
163 metadata: RegionMetadataRef,
164 column_ids: impl Iterator<Item = ColumnId>,
165 ) -> Self {
166 ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
167 }
168
169 pub fn new_flat(
171 metadata: RegionMetadataRef,
172 column_ids: impl Iterator<Item = ColumnId>,
173 convert_to_flat: bool,
174 ) -> Self {
175 ReadFormat::Flat(FlatReadFormat::new(metadata, column_ids, convert_to_flat))
176 }
177
178 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
179 match self {
180 ReadFormat::PrimaryKey(format) => Some(format),
181 _ => None,
182 }
183 }
184
185 pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
186 match self {
187 ReadFormat::Flat(format) => Some(format),
188 _ => None,
189 }
190 }
191
192 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
197 match self {
198 ReadFormat::PrimaryKey(format) => format.arrow_schema(),
199 ReadFormat::Flat(format) => format.arrow_schema(),
200 }
201 }
202
203 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
205 match self {
206 ReadFormat::PrimaryKey(format) => format.metadata(),
207 ReadFormat::Flat(format) => format.metadata(),
208 }
209 }
210
211 pub(crate) fn projection_indices(&self) -> &[usize] {
213 match self {
214 ReadFormat::PrimaryKey(format) => format.projection_indices(),
215 ReadFormat::Flat(format) => format.projection_indices(),
216 }
217 }
218
219 pub fn min_values(
221 &self,
222 row_groups: &[impl Borrow<RowGroupMetaData>],
223 column_id: ColumnId,
224 ) -> StatValues {
225 match self {
226 ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
227 ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
228 }
229 }
230
231 pub fn max_values(
233 &self,
234 row_groups: &[impl Borrow<RowGroupMetaData>],
235 column_id: ColumnId,
236 ) -> StatValues {
237 match self {
238 ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
239 ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
240 }
241 }
242
243 pub fn null_counts(
245 &self,
246 row_groups: &[impl Borrow<RowGroupMetaData>],
247 column_id: ColumnId,
248 ) -> StatValues {
249 match self {
250 ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
251 ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
252 }
253 }
254
255 pub(crate) fn column_values(
259 row_groups: &[impl Borrow<RowGroupMetaData>],
260 column: &ColumnMetadata,
261 column_index: usize,
262 is_min: bool,
263 ) -> Option<ArrayRef> {
264 let null_scalar: ScalarValue = column
265 .column_schema
266 .data_type
267 .as_arrow_type()
268 .try_into()
269 .ok()?;
270 let scalar_values = row_groups
271 .iter()
272 .map(|meta| {
273 let stats = meta.borrow().column(column_index).statistics()?;
274 match stats {
275 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
276 *s.min_opt()?
277 } else {
278 *s.max_opt()?
279 }))),
280 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
281 *s.min_opt()?
282 } else {
283 *s.max_opt()?
284 }))),
285 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
286 *s.min_opt()?
287 } else {
288 *s.max_opt()?
289 }))),
290
291 Statistics::Int96(_) => None,
292 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
293 *s.min_opt()?
294 } else {
295 *s.max_opt()?
296 }))),
297 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
298 *s.min_opt()?
299 } else {
300 *s.max_opt()?
301 }))),
302 Statistics::ByteArray(s) => {
303 let bytes = if is_min {
304 s.min_bytes_opt()?
305 } else {
306 s.max_bytes_opt()?
307 };
308 let s = String::from_utf8(bytes.to_vec()).ok();
309 Some(ScalarValue::Utf8(s))
310 }
311
312 Statistics::FixedLenByteArray(_) => None,
313 }
314 })
315 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
316 .collect::<Vec<ScalarValue>>();
317 debug_assert_eq!(scalar_values.len(), row_groups.len());
318 ScalarValue::iter_to_array(scalar_values).ok()
319 }
320
321 pub(crate) fn column_null_counts(
324 row_groups: &[impl Borrow<RowGroupMetaData>],
325 column_index: usize,
326 ) -> Option<ArrayRef> {
327 let values = row_groups.iter().map(|meta| {
328 let col = meta.borrow().column(column_index);
329 let stat = col.statistics()?;
330 stat.null_count_opt()
331 });
332 Some(Arc::new(UInt64Array::from_iter(values)))
333 }
334
335 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
337 match self {
338 ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
339 ReadFormat::Flat(format) => format.set_override_sequence(sequence),
340 }
341 }
342
343 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
345 match self {
346 ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
347 ReadFormat::Flat(format) => format.new_override_sequence_array(length),
348 }
349 }
350}
351
352pub struct PrimaryKeyReadFormat {
354 metadata: RegionMetadataRef,
356 arrow_schema: SchemaRef,
358 field_id_to_index: HashMap<ColumnId, usize>,
361 projection_indices: Vec<usize>,
363 field_id_to_projected_index: HashMap<ColumnId, usize>,
366 override_sequence: Option<SequenceNumber>,
368}
369
370impl PrimaryKeyReadFormat {
371 pub fn new(
373 metadata: RegionMetadataRef,
374 column_ids: impl Iterator<Item = ColumnId>,
375 ) -> PrimaryKeyReadFormat {
376 let field_id_to_index: HashMap<_, _> = metadata
377 .field_columns()
378 .enumerate()
379 .map(|(index, column)| (column.column_id, index))
380 .collect();
381 let arrow_schema = to_sst_arrow_schema(&metadata);
382
383 let format_projection = FormatProjection::compute_format_projection(
384 &field_id_to_index,
385 arrow_schema.fields.len(),
386 column_ids,
387 );
388
389 PrimaryKeyReadFormat {
390 metadata,
391 arrow_schema,
392 field_id_to_index,
393 projection_indices: format_projection.projection_indices,
394 field_id_to_projected_index: format_projection.column_id_to_projected_index,
395 override_sequence: None,
396 }
397 }
398
399 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
401 self.override_sequence = sequence;
402 }
403
404 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
409 &self.arrow_schema
410 }
411
412 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
414 &self.metadata
415 }
416
417 pub(crate) fn projection_indices(&self) -> &[usize] {
419 &self.projection_indices
420 }
421
422 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
424 self.override_sequence
425 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
426 }
427
428 pub fn convert_record_batch(
433 &self,
434 record_batch: &RecordBatch,
435 override_sequence_array: Option<&ArrayRef>,
436 batches: &mut VecDeque<Batch>,
437 ) -> Result<()> {
438 debug_assert!(batches.is_empty());
439
440 ensure!(
442 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
443 InvalidRecordBatchSnafu {
444 reason: format!(
445 "record batch only has {} columns",
446 record_batch.num_columns()
447 ),
448 }
449 );
450
451 let mut fixed_pos_columns = record_batch
452 .columns()
453 .iter()
454 .rev()
455 .take(FIXED_POS_COLUMN_NUM);
456 let op_type_array = fixed_pos_columns.next().unwrap();
458 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
459 let pk_array = fixed_pos_columns.next().unwrap();
460 let ts_array = fixed_pos_columns.next().unwrap();
461 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
462
463 if let Some(override_array) = override_sequence_array {
465 assert!(override_array.len() >= sequence_array.len());
466 sequence_array = if override_array.len() > sequence_array.len() {
469 override_array.slice(0, sequence_array.len())
470 } else {
471 override_array.clone()
472 };
473 }
474
475 let pk_dict_array = pk_array
477 .as_any()
478 .downcast_ref::<PrimaryKeyArray>()
479 .with_context(|| InvalidRecordBatchSnafu {
480 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
481 })?;
482 let offsets = primary_key_offsets(pk_dict_array)?;
483 if offsets.is_empty() {
484 return Ok(());
485 }
486
487 let keys = pk_dict_array.keys();
489 let pk_values = pk_dict_array
490 .values()
491 .as_any()
492 .downcast_ref::<BinaryArray>()
493 .with_context(|| InvalidRecordBatchSnafu {
494 reason: format!(
495 "values of primary key array should not be {:?}",
496 pk_dict_array.values().data_type()
497 ),
498 })?;
499 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
500 let end = offsets[i + 1];
501 let rows_in_batch = end - start;
502 let dict_key = keys.value(*start);
503 let primary_key = pk_values.value(dict_key as usize).to_vec();
504
505 let mut builder = BatchBuilder::new(primary_key);
506 builder
507 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
508 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
509 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
510 for batch_column in &field_batch_columns {
512 builder.push_field(BatchColumn {
513 column_id: batch_column.column_id,
514 data: batch_column.data.slice(*start, rows_in_batch),
515 });
516 }
517
518 let batch = builder.build()?;
519 batches.push_back(batch);
520 }
521
522 Ok(())
523 }
524
525 pub fn min_values(
527 &self,
528 row_groups: &[impl Borrow<RowGroupMetaData>],
529 column_id: ColumnId,
530 ) -> StatValues {
531 let Some(column) = self.metadata.column_by_id(column_id) else {
532 return StatValues::NoColumn;
534 };
535 match column.semantic_type {
536 SemanticType::Tag => self.tag_values(row_groups, column, true),
537 SemanticType::Field => {
538 let index = self.field_id_to_index.get(&column_id).unwrap();
540 let stats = ReadFormat::column_values(row_groups, column, *index, true);
541 StatValues::from_stats_opt(stats)
542 }
543 SemanticType::Timestamp => {
544 let index = self.time_index_position();
545 let stats = ReadFormat::column_values(row_groups, column, index, true);
546 StatValues::from_stats_opt(stats)
547 }
548 }
549 }
550
551 pub fn max_values(
553 &self,
554 row_groups: &[impl Borrow<RowGroupMetaData>],
555 column_id: ColumnId,
556 ) -> StatValues {
557 let Some(column) = self.metadata.column_by_id(column_id) else {
558 return StatValues::NoColumn;
560 };
561 match column.semantic_type {
562 SemanticType::Tag => self.tag_values(row_groups, column, false),
563 SemanticType::Field => {
564 let index = self.field_id_to_index.get(&column_id).unwrap();
566 let stats = ReadFormat::column_values(row_groups, column, *index, false);
567 StatValues::from_stats_opt(stats)
568 }
569 SemanticType::Timestamp => {
570 let index = self.time_index_position();
571 let stats = ReadFormat::column_values(row_groups, column, index, false);
572 StatValues::from_stats_opt(stats)
573 }
574 }
575 }
576
577 pub fn null_counts(
579 &self,
580 row_groups: &[impl Borrow<RowGroupMetaData>],
581 column_id: ColumnId,
582 ) -> StatValues {
583 let Some(column) = self.metadata.column_by_id(column_id) else {
584 return StatValues::NoColumn;
586 };
587 match column.semantic_type {
588 SemanticType::Tag => StatValues::NoStats,
589 SemanticType::Field => {
590 let index = self.field_id_to_index.get(&column_id).unwrap();
592 let stats = ReadFormat::column_null_counts(row_groups, *index);
593 StatValues::from_stats_opt(stats)
594 }
595 SemanticType::Timestamp => {
596 let index = self.time_index_position();
597 let stats = ReadFormat::column_null_counts(row_groups, index);
598 StatValues::from_stats_opt(stats)
599 }
600 }
601 }
602
603 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
605 record_batch
606 .columns()
607 .iter()
608 .zip(record_batch.schema().fields())
609 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
611 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
612 let column = self
613 .metadata
614 .column_by_name(field.name())
615 .with_context(|| InvalidRecordBatchSnafu {
616 reason: format!("column {} not found in metadata", field.name()),
617 })?;
618
619 Ok(BatchColumn {
620 column_id: column.column_id,
621 data: vector,
622 })
623 })
624 .collect()
625 }
626
627 fn tag_values(
629 &self,
630 row_groups: &[impl Borrow<RowGroupMetaData>],
631 column: &ColumnMetadata,
632 is_min: bool,
633 ) -> StatValues {
634 let is_first_tag = self
635 .metadata
636 .primary_key
637 .first()
638 .map(|id| *id == column.column_id)
639 .unwrap_or(false);
640 if !is_first_tag {
641 return StatValues::NoStats;
643 }
644
645 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
646 }
647
648 fn first_tag_values(
651 &self,
652 row_groups: &[impl Borrow<RowGroupMetaData>],
653 column: &ColumnMetadata,
654 is_min: bool,
655 ) -> Option<ArrayRef> {
656 debug_assert!(self
657 .metadata
658 .primary_key
659 .first()
660 .map(|id| *id == column.column_id)
661 .unwrap_or(false));
662
663 let primary_key_encoding = self.metadata.primary_key_encoding;
664 let converter = build_primary_key_codec_with_fields(
665 primary_key_encoding,
666 [(
667 column.column_id,
668 SortField::new(column.column_schema.data_type.clone()),
669 )]
670 .into_iter(),
671 );
672
673 let values = row_groups.iter().map(|meta| {
674 let stats = meta
675 .borrow()
676 .column(self.primary_key_position())
677 .statistics()?;
678 match stats {
679 Statistics::Boolean(_) => None,
680 Statistics::Int32(_) => None,
681 Statistics::Int64(_) => None,
682 Statistics::Int96(_) => None,
683 Statistics::Float(_) => None,
684 Statistics::Double(_) => None,
685 Statistics::ByteArray(s) => {
686 let bytes = if is_min {
687 s.min_bytes_opt()?
688 } else {
689 s.max_bytes_opt()?
690 };
691 converter.decode_leftmost(bytes).ok()?
692 }
693 Statistics::FixedLenByteArray(_) => None,
694 }
695 });
696 let mut builder = column
697 .column_schema
698 .data_type
699 .create_mutable_vector(row_groups.len());
700 for value_opt in values {
701 match value_opt {
702 Some(v) => builder.push_value_ref(v.as_value_ref()),
704 None => builder.push_null(),
705 }
706 }
707 let vector = builder.to_vector();
708
709 Some(vector.to_arrow_array())
710 }
711
712 fn primary_key_position(&self) -> usize {
714 self.arrow_schema.fields.len() - 3
715 }
716
717 fn time_index_position(&self) -> usize {
719 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
720 }
721
722 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
724 self.field_id_to_projected_index.get(&column_id).copied()
725 }
726}
727
728pub(crate) struct FormatProjection {
730 pub(crate) projection_indices: Vec<usize>,
732 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
735}
736
737impl FormatProjection {
738 pub(crate) fn compute_format_projection(
742 id_to_index: &HashMap<ColumnId, usize>,
743 sst_column_num: usize,
744 column_ids: impl Iterator<Item = ColumnId>,
745 ) -> Self {
746 let mut projected_schema: Vec<_> = column_ids
750 .filter_map(|column_id| {
751 id_to_index
752 .get(&column_id)
753 .copied()
754 .map(|index| (column_id, index))
755 })
756 .collect();
757 projected_schema.sort_unstable_by_key(|x| x.1);
760 projected_schema.dedup_by_key(|x| x.1);
762
763 let mut projection_indices: Vec<_> = projected_schema
766 .iter()
767 .map(|(_column_id, index)| *index)
768 .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
770 .collect();
771 projection_indices.sort_unstable();
772 projection_indices.dedup();
774
775 let column_id_to_projected_index = projected_schema
777 .into_iter()
778 .map(|(column_id, _)| column_id)
779 .enumerate()
780 .map(|(index, column_id)| (column_id, index))
781 .collect();
782
783 Self {
784 projection_indices,
785 column_id_to_projected_index,
786 }
787 }
788}
789
790pub enum StatValues {
795 Values(ArrayRef),
797 NoColumn,
799 NoStats,
801}
802
803impl StatValues {
804 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
806 match stats {
807 Some(stats) => StatValues::Values(stats),
808 None => StatValues::NoStats,
809 }
810 }
811}
812
813#[cfg(test)]
814impl PrimaryKeyReadFormat {
815 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
817 Self::new(
818 Arc::clone(&metadata),
819 metadata.column_metadatas.iter().map(|c| c.column_id),
820 )
821 }
822}
823
824fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
826 if pk_dict_array.is_empty() {
827 return Ok(Vec::new());
828 }
829
830 let mut offsets = vec![0];
832 let keys = pk_dict_array.keys();
833 let pk_indices = keys.values();
835 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
836 if *key != pk_indices[i + 1] {
838 offsets.push(i + 1);
840 }
841 }
842 offsets.push(keys.len());
843
844 Ok(offsets)
845}
846
847fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
849 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
850 let keys = UInt32Array::from_value(0, num_rows);
851
852 Arc::new(DictionaryArray::new(keys, values))
854}
855
856pub(crate) fn parquet_row_group_time_range(
859 file_meta: &FileMeta,
860 parquet_meta: &ParquetMetaData,
861 row_group_idx: usize,
862) -> Option<FileTimeRange> {
863 let row_group_meta = parquet_meta.row_group(row_group_idx);
864 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
865 assert!(
866 num_columns >= FIXED_POS_COLUMN_NUM,
867 "file only has {} columns",
868 num_columns
869 );
870 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
871
872 let stats = row_group_meta.column(time_index_pos).statistics()?;
873 let (min, max) = match stats {
875 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
876 Statistics::Int32(_)
877 | Statistics::Boolean(_)
878 | Statistics::Int96(_)
879 | Statistics::Float(_)
880 | Statistics::Double(_)
881 | Statistics::ByteArray(_)
882 | Statistics::FixedLenByteArray(_) => {
883 common_telemetry::warn!(
884 "Invalid statistics {:?} for time index in parquet in {}",
885 stats,
886 file_meta.file_id
887 );
888 return None;
889 }
890 };
891
892 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
893 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
894 let unit = file_meta.time_range.0.unit();
895
896 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
897}
898
899pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
902 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
903 if num_columns < FIXED_POS_COLUMN_NUM {
904 return false;
905 }
906
907 let sequence_pos = num_columns - 2;
909
910 for row_group in parquet_meta.row_groups() {
912 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
913 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
914 if *min_val != 0 || *max_val != 0 {
916 return false;
917 }
918 } else {
919 return false;
921 }
922 } else {
923 return false;
925 }
926 }
927
928 !parquet_meta.row_groups().is_empty()
930}
931
932#[cfg(test)]
933mod tests {
934 use std::sync::Arc;
935
936 use api::v1::OpType;
937 use datatypes::arrow::array::{
938 Int64Array, StringArray, TimestampMillisecondArray, UInt64Array, UInt8Array,
939 };
940 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
941 use datatypes::prelude::ConcreteDataType;
942 use datatypes::schema::ColumnSchema;
943 use datatypes::value::ValueRef;
944 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
945 use mito_codec::row_converter::{
946 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
947 };
948 use store_api::codec::PrimaryKeyEncoding;
949 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
950 use store_api::storage::consts::ReservedColumnId;
951 use store_api::storage::RegionId;
952
953 use super::*;
954 use crate::sst::parquet::flat_format::{sequence_column_index, FlatWriteFormat};
955 use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
956
957 const TEST_SEQUENCE: u64 = 1;
958 const TEST_OP_TYPE: u8 = OpType::Put as u8;
959
960 fn build_test_region_metadata() -> RegionMetadataRef {
961 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
962 builder
963 .push_column_metadata(ColumnMetadata {
964 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
965 semantic_type: SemanticType::Tag,
966 column_id: 1,
967 })
968 .push_column_metadata(ColumnMetadata {
969 column_schema: ColumnSchema::new(
970 "field1",
971 ConcreteDataType::int64_datatype(),
972 true,
973 ),
974 semantic_type: SemanticType::Field,
975 column_id: 4, })
977 .push_column_metadata(ColumnMetadata {
978 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
979 semantic_type: SemanticType::Tag,
980 column_id: 3,
981 })
982 .push_column_metadata(ColumnMetadata {
983 column_schema: ColumnSchema::new(
984 "field0",
985 ConcreteDataType::int64_datatype(),
986 true,
987 ),
988 semantic_type: SemanticType::Field,
989 column_id: 2,
990 })
991 .push_column_metadata(ColumnMetadata {
992 column_schema: ColumnSchema::new(
993 "ts",
994 ConcreteDataType::timestamp_millisecond_datatype(),
995 false,
996 ),
997 semantic_type: SemanticType::Timestamp,
998 column_id: 5,
999 })
1000 .primary_key(vec![1, 3]);
1001 Arc::new(builder.build().unwrap())
1002 }
1003
1004 fn build_test_arrow_schema() -> SchemaRef {
1005 let fields = vec![
1006 Field::new("field1", ArrowDataType::Int64, true),
1007 Field::new("field0", ArrowDataType::Int64, true),
1008 Field::new(
1009 "ts",
1010 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1011 false,
1012 ),
1013 Field::new(
1014 "__primary_key",
1015 ArrowDataType::Dictionary(
1016 Box::new(ArrowDataType::UInt32),
1017 Box::new(ArrowDataType::Binary),
1018 ),
1019 false,
1020 ),
1021 Field::new("__sequence", ArrowDataType::UInt64, false),
1022 Field::new("__op_type", ArrowDataType::UInt8, false),
1023 ];
1024 Arc::new(Schema::new(fields))
1025 }
1026
1027 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1028 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1029 }
1030
1031 fn new_batch_with_sequence(
1032 primary_key: &[u8],
1033 start_ts: i64,
1034 start_field: i64,
1035 num_rows: usize,
1036 sequence: u64,
1037 ) -> Batch {
1038 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1039 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1040 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1041 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1042 let fields = vec![
1043 BatchColumn {
1044 column_id: 4,
1045 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1046 }, BatchColumn {
1048 column_id: 2,
1049 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1050 }, ];
1052
1053 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1054 .with_fields(fields)
1055 .build()
1056 .unwrap()
1057 }
1058
1059 #[test]
1060 fn test_to_sst_arrow_schema() {
1061 let metadata = build_test_region_metadata();
1062 let write_format = PrimaryKeyWriteFormat::new(metadata);
1063 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1064 }
1065
1066 #[test]
1067 fn test_new_primary_key_array() {
1068 let array = new_primary_key_array(b"test", 3);
1069 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1070 assert_eq!(&expect, &array);
1071 }
1072
1073 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1074 let values = Arc::new(BinaryArray::from_iter_values(
1075 pk_row_nums.iter().map(|v| &v.0),
1076 ));
1077 let mut keys = vec![];
1078 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1079 keys.extend(std::iter::repeat_n(index as u32, num_rows));
1080 }
1081 let keys = UInt32Array::from(keys);
1082 Arc::new(DictionaryArray::new(keys, values))
1083 }
1084
1085 #[test]
1086 fn test_convert_batch() {
1087 let metadata = build_test_region_metadata();
1088 let write_format = PrimaryKeyWriteFormat::new(metadata);
1089
1090 let num_rows = 4;
1091 let batch = new_batch(b"test", 1, 2, num_rows);
1092 let columns: Vec<ArrayRef> = vec![
1093 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1100 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1101
1102 let actual = write_format.convert_batch(&batch).unwrap();
1103 assert_eq!(expect_record, actual);
1104 }
1105
1106 #[test]
1107 fn test_convert_batch_with_override_sequence() {
1108 let metadata = build_test_region_metadata();
1109 let write_format =
1110 PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1111
1112 let num_rows = 4;
1113 let batch = new_batch(b"test", 1, 2, num_rows);
1114 let columns: Vec<ArrayRef> = vec![
1115 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1122 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1123
1124 let actual = write_format.convert_batch(&batch).unwrap();
1125 assert_eq!(expect_record, actual);
1126 }
1127
1128 #[test]
1129 fn test_projection_indices() {
1130 let metadata = build_test_region_metadata();
1131 let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1133 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1134 let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1136 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1137 let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1139 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1140 let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1142 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1143 }
1144
1145 #[test]
1146 fn test_empty_primary_key_offsets() {
1147 let array = build_test_pk_array(&[]);
1148 assert!(primary_key_offsets(&array).unwrap().is_empty());
1149 }
1150
1151 #[test]
1152 fn test_primary_key_offsets_one_series() {
1153 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1154 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1155
1156 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1157 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1158
1159 let array = build_test_pk_array(&[
1160 (b"one".to_vec(), 1),
1161 (b"two".to_vec(), 1),
1162 (b"three".to_vec(), 1),
1163 ]);
1164 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1165 }
1166
1167 #[test]
1168 fn test_primary_key_offsets_multi_series() {
1169 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1170 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1171
1172 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1173 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1174
1175 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1176 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1177 }
1178
1179 #[test]
1180 fn test_convert_empty_record_batch() {
1181 let metadata = build_test_region_metadata();
1182 let arrow_schema = build_test_arrow_schema();
1183 let column_ids: Vec<_> = metadata
1184 .column_metadatas
1185 .iter()
1186 .map(|col| col.column_id)
1187 .collect();
1188 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1189 assert_eq!(arrow_schema, *read_format.arrow_schema());
1190
1191 let record_batch = RecordBatch::new_empty(arrow_schema);
1192 let mut batches = VecDeque::new();
1193 read_format
1194 .convert_record_batch(&record_batch, None, &mut batches)
1195 .unwrap();
1196 assert!(batches.is_empty());
1197 }
1198
1199 #[test]
1200 fn test_convert_record_batch() {
1201 let metadata = build_test_region_metadata();
1202 let column_ids: Vec<_> = metadata
1203 .column_metadatas
1204 .iter()
1205 .map(|col| col.column_id)
1206 .collect();
1207 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1208
1209 let columns: Vec<ArrayRef> = vec![
1210 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1217 let arrow_schema = build_test_arrow_schema();
1218 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1219 let mut batches = VecDeque::new();
1220 read_format
1221 .convert_record_batch(&record_batch, None, &mut batches)
1222 .unwrap();
1223
1224 assert_eq!(
1225 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1226 batches.into_iter().collect::<Vec<_>>(),
1227 );
1228 }
1229
1230 #[test]
1231 fn test_convert_record_batch_with_override_sequence() {
1232 let metadata = build_test_region_metadata();
1233 let column_ids: Vec<_> = metadata
1234 .column_metadatas
1235 .iter()
1236 .map(|col| col.column_id)
1237 .collect();
1238 let read_format = ReadFormat::new(metadata, column_ids.iter().copied(), false);
1239
1240 let columns: Vec<ArrayRef> = vec![
1241 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1248 let arrow_schema = build_test_arrow_schema();
1249 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1250
1251 let override_sequence: u64 = 12345;
1253 let override_sequence_array: ArrayRef =
1254 Arc::new(UInt64Array::from_value(override_sequence, 4));
1255
1256 let mut batches = VecDeque::new();
1257 read_format
1258 .as_primary_key()
1259 .unwrap()
1260 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1261 .unwrap();
1262
1263 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1265 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1266
1267 assert_eq!(
1268 vec![expected_batch1, expected_batch2],
1269 batches.into_iter().collect::<Vec<_>>(),
1270 );
1271 }
1272
1273 fn build_test_flat_sst_schema() -> SchemaRef {
1274 let fields = vec![
1275 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1277 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1279 Field::new(
1280 "ts",
1281 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1282 false,
1283 ),
1284 Field::new(
1285 "__primary_key",
1286 ArrowDataType::Dictionary(
1287 Box::new(ArrowDataType::UInt32),
1288 Box::new(ArrowDataType::Binary),
1289 ),
1290 false,
1291 ),
1292 Field::new("__sequence", ArrowDataType::UInt64, false),
1293 Field::new("__op_type", ArrowDataType::UInt8, false),
1294 ];
1295 Arc::new(Schema::new(fields))
1296 }
1297
1298 #[test]
1299 fn test_flat_to_sst_arrow_schema() {
1300 let metadata = build_test_region_metadata();
1301 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1302 assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1303 }
1304
1305 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1306 vec![
1307 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1316 }
1317
1318 #[test]
1319 fn test_flat_convert_batch() {
1320 let metadata = build_test_region_metadata();
1321 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1322
1323 let num_rows = 4;
1324 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1325 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1326 let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1327
1328 let actual = format.convert_batch(&batch).unwrap();
1329 assert_eq!(expect_record, actual);
1330 }
1331
1332 #[test]
1333 fn test_flat_convert_with_override_sequence() {
1334 let metadata = build_test_region_metadata();
1335 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1336 .with_override_sequence(Some(415411));
1337
1338 let num_rows = 4;
1339 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1340 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1341
1342 let expected_columns: Vec<ArrayRef> = vec![
1343 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1352 let expected_record =
1353 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1354
1355 let actual = format.convert_batch(&batch).unwrap();
1356 assert_eq!(expected_record, actual);
1357 }
1358
1359 #[test]
1360 fn test_flat_projection_indices() {
1361 let metadata = build_test_region_metadata();
1362 let read_format = ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), false);
1367 assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1368
1369 let read_format = ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), false);
1371 assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1372
1373 let read_format = ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), false);
1375 assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1376
1377 let read_format = ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), false);
1379 assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1380 }
1381
1382 #[test]
1383 fn test_flat_read_format_convert_batch() {
1384 let metadata = build_test_region_metadata();
1385 let mut format = FlatReadFormat::new(
1386 metadata,
1387 std::iter::once(1), false,
1389 );
1390
1391 let num_rows = 4;
1392 let original_sequence = 100u64;
1393 let override_sequence = 200u64;
1394
1395 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1397 let mut test_columns = columns.clone();
1398 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1400 let record_batch =
1401 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1402
1403 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1405 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1406 let sequence_array = sequence_column
1407 .as_any()
1408 .downcast_ref::<UInt64Array>()
1409 .unwrap();
1410
1411 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1412 assert_eq!(sequence_array, &expected_original);
1413
1414 format.set_override_sequence(Some(override_sequence));
1416 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1417 let result = format
1418 .convert_batch(record_batch, Some(&override_sequence_array))
1419 .unwrap();
1420 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1421 let sequence_array = sequence_column
1422 .as_any()
1423 .downcast_ref::<UInt64Array>()
1424 .unwrap();
1425
1426 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1427 assert_eq!(sequence_array, &expected_override);
1428 }
1429
1430 #[test]
1431 fn test_need_convert_to_flat() {
1432 let metadata = build_test_region_metadata();
1433
1434 let expected_columns = metadata.column_metadatas.len() + 3;
1437 let result =
1438 FlatReadFormat::need_convert_to_flat("test.parquet", expected_columns, &metadata)
1439 .unwrap();
1440 assert!(
1441 !result,
1442 "Should not need conversion when column counts match"
1443 );
1444
1445 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1448 let result =
1449 FlatReadFormat::need_convert_to_flat("test.parquet", num_columns_without_pk, &metadata)
1450 .unwrap();
1451 assert!(
1452 result,
1453 "Should need conversion when primary key columns are missing"
1454 );
1455
1456 let too_many_columns = expected_columns + 1;
1458 let err = FlatReadFormat::need_convert_to_flat("test.parquet", too_many_columns, &metadata)
1459 .unwrap_err();
1460 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1461
1462 let wrong_diff_columns = expected_columns - 1; let err =
1465 FlatReadFormat::need_convert_to_flat("test.parquet", wrong_diff_columns, &metadata)
1466 .unwrap_err();
1467 assert!(
1468 err.to_string().contains("Column number difference"),
1469 "{err:?}"
1470 );
1471 }
1472
1473 fn build_test_dense_pk_array(
1474 codec: &DensePrimaryKeyCodec,
1475 pk_values_per_row: &[&[Option<i64>]],
1476 ) -> Arc<PrimaryKeyArray> {
1477 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1478
1479 for pk_values_row in pk_values_per_row {
1480 let values: Vec<ValueRef> = pk_values_row
1481 .iter()
1482 .map(|opt| match opt {
1483 Some(val) => ValueRef::Int64(*val),
1484 None => ValueRef::Null,
1485 })
1486 .collect();
1487
1488 let encoded = codec.encode(values.into_iter()).unwrap();
1489 builder.append_value(&encoded);
1490 }
1491
1492 Arc::new(builder.finish())
1493 }
1494
1495 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1496 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1497 builder
1498 .push_column_metadata(ColumnMetadata {
1499 column_schema: ColumnSchema::new(
1500 "__table_id",
1501 ConcreteDataType::uint32_datatype(),
1502 false,
1503 ),
1504 semantic_type: SemanticType::Tag,
1505 column_id: ReservedColumnId::table_id(),
1506 })
1507 .push_column_metadata(ColumnMetadata {
1508 column_schema: ColumnSchema::new(
1509 "__tsid",
1510 ConcreteDataType::uint64_datatype(),
1511 false,
1512 ),
1513 semantic_type: SemanticType::Tag,
1514 column_id: ReservedColumnId::tsid(),
1515 })
1516 .push_column_metadata(ColumnMetadata {
1517 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1518 semantic_type: SemanticType::Tag,
1519 column_id: 1,
1520 })
1521 .push_column_metadata(ColumnMetadata {
1522 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1523 semantic_type: SemanticType::Tag,
1524 column_id: 3,
1525 })
1526 .push_column_metadata(ColumnMetadata {
1527 column_schema: ColumnSchema::new(
1528 "field1",
1529 ConcreteDataType::int64_datatype(),
1530 true,
1531 ),
1532 semantic_type: SemanticType::Field,
1533 column_id: 4,
1534 })
1535 .push_column_metadata(ColumnMetadata {
1536 column_schema: ColumnSchema::new(
1537 "field0",
1538 ConcreteDataType::int64_datatype(),
1539 true,
1540 ),
1541 semantic_type: SemanticType::Field,
1542 column_id: 2,
1543 })
1544 .push_column_metadata(ColumnMetadata {
1545 column_schema: ColumnSchema::new(
1546 "ts",
1547 ConcreteDataType::timestamp_millisecond_datatype(),
1548 false,
1549 ),
1550 semantic_type: SemanticType::Timestamp,
1551 column_id: 5,
1552 })
1553 .primary_key(vec![
1554 ReservedColumnId::table_id(),
1555 ReservedColumnId::tsid(),
1556 1,
1557 3,
1558 ])
1559 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1560 Arc::new(builder.build().unwrap())
1561 }
1562
1563 fn build_test_sparse_pk_array(
1564 codec: &SparsePrimaryKeyCodec,
1565 pk_values_per_row: &[SparseTestRow],
1566 ) -> Arc<PrimaryKeyArray> {
1567 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1568 for row in pk_values_per_row {
1569 let values = vec![
1570 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1571 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1572 (1, ValueRef::String(&row.tag0)),
1573 (3, ValueRef::String(&row.tag1)),
1574 ];
1575
1576 let mut buffer = Vec::new();
1577 codec.encode_value_refs(&values, &mut buffer).unwrap();
1578 builder.append_value(&buffer);
1579 }
1580
1581 Arc::new(builder.finish())
1582 }
1583
1584 #[derive(Clone)]
1585 struct SparseTestRow {
1586 table_id: u32,
1587 tsid: u64,
1588 tag0: String,
1589 tag1: String,
1590 }
1591
1592 #[test]
1593 fn test_flat_read_format_convert_format_with_dense_encoding() {
1594 let metadata = build_test_region_metadata();
1595
1596 let column_ids: Vec<_> = metadata
1597 .column_metadatas
1598 .iter()
1599 .map(|c| c.column_id)
1600 .collect();
1601 let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
1602
1603 let num_rows = 4;
1604 let original_sequence = 100u64;
1605
1606 let pk_values_per_row = vec![
1608 &[Some(1i64), Some(1i64)][..]; num_rows ];
1610
1611 let codec = DensePrimaryKeyCodec::new(&metadata);
1613 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1614 let columns: Vec<ArrayRef> = vec![
1615 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1622
1623 let old_format_fields = vec![
1625 Field::new("field1", ArrowDataType::Int64, true),
1626 Field::new("field0", ArrowDataType::Int64, true),
1627 Field::new(
1628 "ts",
1629 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1630 false,
1631 ),
1632 Field::new(
1633 "__primary_key",
1634 ArrowDataType::Dictionary(
1635 Box::new(ArrowDataType::UInt32),
1636 Box::new(ArrowDataType::Binary),
1637 ),
1638 false,
1639 ),
1640 Field::new("__sequence", ArrowDataType::UInt64, false),
1641 Field::new("__op_type", ArrowDataType::UInt8, false),
1642 ];
1643 let old_schema = Arc::new(Schema::new(old_format_fields));
1644 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1645
1646 let result = format.convert_batch(record_batch, None).unwrap();
1648
1649 let expected_columns: Vec<ArrayRef> = vec![
1651 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1660 let expected_record_batch =
1661 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1662
1663 assert_eq!(expected_record_batch, result);
1665 }
1666
1667 #[test]
1668 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1669 let metadata = build_test_sparse_region_metadata();
1670
1671 let column_ids: Vec<_> = metadata
1672 .column_metadatas
1673 .iter()
1674 .map(|c| c.column_id)
1675 .collect();
1676 let format = FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), true);
1677
1678 let num_rows = 4;
1679 let original_sequence = 100u64;
1680
1681 let pk_test_rows = vec![
1683 SparseTestRow {
1684 table_id: 1,
1685 tsid: 123,
1686 tag0: "frontend".to_string(),
1687 tag1: "pod1".to_string(),
1688 };
1689 num_rows
1690 ];
1691
1692 let codec = SparsePrimaryKeyCodec::new(&metadata);
1693 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1694 let columns: Vec<ArrayRef> = vec![
1696 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1703
1704 let old_format_fields = vec![
1706 Field::new("field1", ArrowDataType::Int64, true),
1707 Field::new("field0", ArrowDataType::Int64, true),
1708 Field::new(
1709 "ts",
1710 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1711 false,
1712 ),
1713 Field::new(
1714 "__primary_key",
1715 ArrowDataType::Dictionary(
1716 Box::new(ArrowDataType::UInt32),
1717 Box::new(ArrowDataType::Binary),
1718 ),
1719 false,
1720 ),
1721 Field::new("__sequence", ArrowDataType::UInt64, false),
1722 Field::new("__op_type", ArrowDataType::UInt8, false),
1723 ];
1724 let old_schema = Arc::new(Schema::new(old_format_fields));
1725 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1726
1727 let result = format.convert_batch(record_batch, None).unwrap();
1729
1730 let tag0_array = Arc::new(DictionaryArray::new(
1732 UInt32Array::from(vec![0; num_rows]),
1733 Arc::new(StringArray::from(vec!["frontend"])),
1734 ));
1735 let tag1_array = Arc::new(DictionaryArray::new(
1736 UInt32Array::from(vec![0; num_rows]),
1737 Arc::new(StringArray::from(vec!["pod1"])),
1738 ));
1739 let expected_columns: Vec<ArrayRef> = vec![
1740 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1751 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1752 let expected_record_batch =
1753 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1754
1755 assert_eq!(expected_record_batch, result);
1757 }
1758}