1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{SortField, build_primary_key_codec_with_fields};
44use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
45use parquet::file::statistics::Statistics;
46use snafu::{OptionExt, ResultExt, ensure};
47use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
48use store_api::storage::{ColumnId, SequenceNumber};
49
50use crate::error::{
51 ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
52};
53use crate::read::{Batch, BatchBuilder, BatchColumn};
54use crate::sst::file::{FileMeta, FileTimeRange};
55use crate::sst::parquet::flat_format::FlatReadFormat;
56use crate::sst::to_sst_arrow_schema;
57
58pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
60pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
62
63pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
67pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
69
70pub(crate) struct PrimaryKeyWriteFormat {
72 metadata: RegionMetadataRef,
73 arrow_schema: SchemaRef,
75 override_sequence: Option<SequenceNumber>,
76}
77
78impl PrimaryKeyWriteFormat {
79 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
81 let arrow_schema = to_sst_arrow_schema(&metadata);
82 PrimaryKeyWriteFormat {
83 metadata,
84 arrow_schema,
85 override_sequence: None,
86 }
87 }
88
89 pub(crate) fn with_override_sequence(
91 mut self,
92 override_sequence: Option<SequenceNumber>,
93 ) -> Self {
94 self.override_sequence = override_sequence;
95 self
96 }
97
98 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
100 &self.arrow_schema
101 }
102
103 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
105 debug_assert_eq!(
106 batch.fields().len() + FIXED_POS_COLUMN_NUM,
107 self.arrow_schema.fields().len()
108 );
109 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
110 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
112 ensure!(
113 column.column_id == column_metadata.column_id,
114 InvalidBatchSnafu {
115 reason: format!(
116 "Batch has column {} but metadata has column {}",
117 column.column_id, column_metadata.column_id
118 ),
119 }
120 );
121
122 columns.push(column.data.to_arrow_array());
123 }
124 columns.push(batch.timestamps().to_arrow_array());
126 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
128
129 if let Some(override_sequence) = self.override_sequence {
130 let sequence_array =
131 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
132 columns.push(sequence_array);
133 } else {
134 columns.push(batch.sequences().to_arrow_array());
135 }
136 columns.push(batch.op_types().to_arrow_array());
137
138 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
139 }
140}
141
142pub enum ReadFormat {
144 PrimaryKey(PrimaryKeyReadFormat),
146 Flat(FlatReadFormat),
148}
149
150impl ReadFormat {
151 pub fn new_primary_key(
153 metadata: RegionMetadataRef,
154 column_ids: impl Iterator<Item = ColumnId>,
155 ) -> Self {
156 ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
157 }
158
159 pub fn new_flat(
161 metadata: RegionMetadataRef,
162 column_ids: impl Iterator<Item = ColumnId>,
163 num_columns: Option<usize>,
164 file_path: &str,
165 skip_auto_convert: bool,
166 ) -> Result<Self> {
167 Ok(ReadFormat::Flat(FlatReadFormat::new(
168 metadata,
169 column_ids,
170 num_columns,
171 file_path,
172 skip_auto_convert,
173 )?))
174 }
175
176 pub fn new(
178 region_metadata: RegionMetadataRef,
179 projection: Option<&[ColumnId]>,
180 flat_format: bool,
181 num_columns: Option<usize>,
182 file_path: &str,
183 skip_auto_convert: bool,
184 ) -> Result<ReadFormat> {
185 if flat_format {
186 if let Some(column_ids) = projection {
187 ReadFormat::new_flat(
188 region_metadata,
189 column_ids.iter().copied(),
190 num_columns,
191 file_path,
192 skip_auto_convert,
193 )
194 } else {
195 ReadFormat::new_flat(
197 region_metadata.clone(),
198 region_metadata
199 .column_metadatas
200 .iter()
201 .map(|col| col.column_id),
202 num_columns,
203 file_path,
204 skip_auto_convert,
205 )
206 }
207 } else if let Some(column_ids) = projection {
208 Ok(ReadFormat::new_primary_key(
209 region_metadata,
210 column_ids.iter().copied(),
211 ))
212 } else {
213 Ok(ReadFormat::new_primary_key(
215 region_metadata.clone(),
216 region_metadata
217 .column_metadatas
218 .iter()
219 .map(|col| col.column_id),
220 ))
221 }
222 }
223
224 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
225 match self {
226 ReadFormat::PrimaryKey(format) => Some(format),
227 _ => None,
228 }
229 }
230
231 pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
232 match self {
233 ReadFormat::Flat(format) => Some(format),
234 _ => None,
235 }
236 }
237
238 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
243 match self {
244 ReadFormat::PrimaryKey(format) => format.arrow_schema(),
245 ReadFormat::Flat(format) => format.arrow_schema(),
246 }
247 }
248
249 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
251 match self {
252 ReadFormat::PrimaryKey(format) => format.metadata(),
253 ReadFormat::Flat(format) => format.metadata(),
254 }
255 }
256
257 pub(crate) fn projection_indices(&self) -> &[usize] {
259 match self {
260 ReadFormat::PrimaryKey(format) => format.projection_indices(),
261 ReadFormat::Flat(format) => format.projection_indices(),
262 }
263 }
264
265 pub fn min_values(
267 &self,
268 row_groups: &[impl Borrow<RowGroupMetaData>],
269 column_id: ColumnId,
270 ) -> StatValues {
271 match self {
272 ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
273 ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
274 }
275 }
276
277 pub fn max_values(
279 &self,
280 row_groups: &[impl Borrow<RowGroupMetaData>],
281 column_id: ColumnId,
282 ) -> StatValues {
283 match self {
284 ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
285 ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
286 }
287 }
288
289 pub fn null_counts(
291 &self,
292 row_groups: &[impl Borrow<RowGroupMetaData>],
293 column_id: ColumnId,
294 ) -> StatValues {
295 match self {
296 ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
297 ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
298 }
299 }
300
301 pub(crate) fn column_values(
305 row_groups: &[impl Borrow<RowGroupMetaData>],
306 column: &ColumnMetadata,
307 column_index: usize,
308 is_min: bool,
309 ) -> Option<ArrayRef> {
310 let null_scalar: ScalarValue = column
311 .column_schema
312 .data_type
313 .as_arrow_type()
314 .try_into()
315 .ok()?;
316 let scalar_values = row_groups
317 .iter()
318 .map(|meta| {
319 let stats = meta.borrow().column(column_index).statistics()?;
320 match stats {
321 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
322 *s.min_opt()?
323 } else {
324 *s.max_opt()?
325 }))),
326 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
327 *s.min_opt()?
328 } else {
329 *s.max_opt()?
330 }))),
331 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
332 *s.min_opt()?
333 } else {
334 *s.max_opt()?
335 }))),
336
337 Statistics::Int96(_) => None,
338 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
339 *s.min_opt()?
340 } else {
341 *s.max_opt()?
342 }))),
343 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
344 *s.min_opt()?
345 } else {
346 *s.max_opt()?
347 }))),
348 Statistics::ByteArray(s) => {
349 let bytes = if is_min {
350 s.min_bytes_opt()?
351 } else {
352 s.max_bytes_opt()?
353 };
354 let s = String::from_utf8(bytes.to_vec()).ok();
355 Some(ScalarValue::Utf8(s))
356 }
357
358 Statistics::FixedLenByteArray(_) => None,
359 }
360 })
361 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
362 .collect::<Vec<ScalarValue>>();
363 debug_assert_eq!(scalar_values.len(), row_groups.len());
364 ScalarValue::iter_to_array(scalar_values).ok()
365 }
366
367 pub(crate) fn column_null_counts(
370 row_groups: &[impl Borrow<RowGroupMetaData>],
371 column_index: usize,
372 ) -> Option<ArrayRef> {
373 let values = row_groups.iter().map(|meta| {
374 let col = meta.borrow().column(column_index);
375 let stat = col.statistics()?;
376 stat.null_count_opt()
377 });
378 Some(Arc::new(UInt64Array::from_iter(values)))
379 }
380
381 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
383 match self {
384 ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
385 ReadFormat::Flat(format) => format.set_override_sequence(sequence),
386 }
387 }
388
389 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
391 match self {
392 ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
393 ReadFormat::Flat(format) => format.new_override_sequence_array(length),
394 }
395 }
396}
397
398pub struct PrimaryKeyReadFormat {
400 metadata: RegionMetadataRef,
402 arrow_schema: SchemaRef,
404 field_id_to_index: HashMap<ColumnId, usize>,
407 projection_indices: Vec<usize>,
409 field_id_to_projected_index: HashMap<ColumnId, usize>,
412 override_sequence: Option<SequenceNumber>,
414}
415
416impl PrimaryKeyReadFormat {
417 pub fn new(
419 metadata: RegionMetadataRef,
420 column_ids: impl Iterator<Item = ColumnId>,
421 ) -> PrimaryKeyReadFormat {
422 let field_id_to_index: HashMap<_, _> = metadata
423 .field_columns()
424 .enumerate()
425 .map(|(index, column)| (column.column_id, index))
426 .collect();
427 let arrow_schema = to_sst_arrow_schema(&metadata);
428
429 let format_projection = FormatProjection::compute_format_projection(
430 &field_id_to_index,
431 arrow_schema.fields.len(),
432 column_ids,
433 );
434
435 PrimaryKeyReadFormat {
436 metadata,
437 arrow_schema,
438 field_id_to_index,
439 projection_indices: format_projection.projection_indices,
440 field_id_to_projected_index: format_projection.column_id_to_projected_index,
441 override_sequence: None,
442 }
443 }
444
445 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
447 self.override_sequence = sequence;
448 }
449
450 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
455 &self.arrow_schema
456 }
457
458 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
460 &self.metadata
461 }
462
463 pub(crate) fn projection_indices(&self) -> &[usize] {
465 &self.projection_indices
466 }
467
468 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
470 self.override_sequence
471 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
472 }
473
474 pub fn convert_record_batch(
479 &self,
480 record_batch: &RecordBatch,
481 override_sequence_array: Option<&ArrayRef>,
482 batches: &mut VecDeque<Batch>,
483 ) -> Result<()> {
484 debug_assert!(batches.is_empty());
485
486 ensure!(
488 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
489 InvalidRecordBatchSnafu {
490 reason: format!(
491 "record batch only has {} columns",
492 record_batch.num_columns()
493 ),
494 }
495 );
496
497 let mut fixed_pos_columns = record_batch
498 .columns()
499 .iter()
500 .rev()
501 .take(FIXED_POS_COLUMN_NUM);
502 let op_type_array = fixed_pos_columns.next().unwrap();
504 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
505 let pk_array = fixed_pos_columns.next().unwrap();
506 let ts_array = fixed_pos_columns.next().unwrap();
507 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
508
509 if let Some(override_array) = override_sequence_array {
511 assert!(override_array.len() >= sequence_array.len());
512 sequence_array = if override_array.len() > sequence_array.len() {
515 override_array.slice(0, sequence_array.len())
516 } else {
517 override_array.clone()
518 };
519 }
520
521 let pk_dict_array = pk_array
523 .as_any()
524 .downcast_ref::<PrimaryKeyArray>()
525 .with_context(|| InvalidRecordBatchSnafu {
526 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
527 })?;
528 let offsets = primary_key_offsets(pk_dict_array)?;
529 if offsets.is_empty() {
530 return Ok(());
531 }
532
533 let keys = pk_dict_array.keys();
535 let pk_values = pk_dict_array
536 .values()
537 .as_any()
538 .downcast_ref::<BinaryArray>()
539 .with_context(|| InvalidRecordBatchSnafu {
540 reason: format!(
541 "values of primary key array should not be {:?}",
542 pk_dict_array.values().data_type()
543 ),
544 })?;
545 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
546 let end = offsets[i + 1];
547 let rows_in_batch = end - start;
548 let dict_key = keys.value(*start);
549 let primary_key = pk_values.value(dict_key as usize).to_vec();
550
551 let mut builder = BatchBuilder::new(primary_key);
552 builder
553 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
554 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
555 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
556 for batch_column in &field_batch_columns {
558 builder.push_field(BatchColumn {
559 column_id: batch_column.column_id,
560 data: batch_column.data.slice(*start, rows_in_batch),
561 });
562 }
563
564 let batch = builder.build()?;
565 batches.push_back(batch);
566 }
567
568 Ok(())
569 }
570
571 pub fn min_values(
573 &self,
574 row_groups: &[impl Borrow<RowGroupMetaData>],
575 column_id: ColumnId,
576 ) -> StatValues {
577 let Some(column) = self.metadata.column_by_id(column_id) else {
578 return StatValues::NoColumn;
580 };
581 match column.semantic_type {
582 SemanticType::Tag => self.tag_values(row_groups, column, true),
583 SemanticType::Field => {
584 let index = self.field_id_to_index.get(&column_id).unwrap();
586 let stats = ReadFormat::column_values(row_groups, column, *index, true);
587 StatValues::from_stats_opt(stats)
588 }
589 SemanticType::Timestamp => {
590 let index = self.time_index_position();
591 let stats = ReadFormat::column_values(row_groups, column, index, true);
592 StatValues::from_stats_opt(stats)
593 }
594 }
595 }
596
597 pub fn max_values(
599 &self,
600 row_groups: &[impl Borrow<RowGroupMetaData>],
601 column_id: ColumnId,
602 ) -> StatValues {
603 let Some(column) = self.metadata.column_by_id(column_id) else {
604 return StatValues::NoColumn;
606 };
607 match column.semantic_type {
608 SemanticType::Tag => self.tag_values(row_groups, column, false),
609 SemanticType::Field => {
610 let index = self.field_id_to_index.get(&column_id).unwrap();
612 let stats = ReadFormat::column_values(row_groups, column, *index, false);
613 StatValues::from_stats_opt(stats)
614 }
615 SemanticType::Timestamp => {
616 let index = self.time_index_position();
617 let stats = ReadFormat::column_values(row_groups, column, index, false);
618 StatValues::from_stats_opt(stats)
619 }
620 }
621 }
622
623 pub fn null_counts(
625 &self,
626 row_groups: &[impl Borrow<RowGroupMetaData>],
627 column_id: ColumnId,
628 ) -> StatValues {
629 let Some(column) = self.metadata.column_by_id(column_id) else {
630 return StatValues::NoColumn;
632 };
633 match column.semantic_type {
634 SemanticType::Tag => StatValues::NoStats,
635 SemanticType::Field => {
636 let index = self.field_id_to_index.get(&column_id).unwrap();
638 let stats = ReadFormat::column_null_counts(row_groups, *index);
639 StatValues::from_stats_opt(stats)
640 }
641 SemanticType::Timestamp => {
642 let index = self.time_index_position();
643 let stats = ReadFormat::column_null_counts(row_groups, index);
644 StatValues::from_stats_opt(stats)
645 }
646 }
647 }
648
649 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
651 record_batch
652 .columns()
653 .iter()
654 .zip(record_batch.schema().fields())
655 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
657 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
658 let column = self
659 .metadata
660 .column_by_name(field.name())
661 .with_context(|| InvalidRecordBatchSnafu {
662 reason: format!("column {} not found in metadata", field.name()),
663 })?;
664
665 Ok(BatchColumn {
666 column_id: column.column_id,
667 data: vector,
668 })
669 })
670 .collect()
671 }
672
673 fn tag_values(
675 &self,
676 row_groups: &[impl Borrow<RowGroupMetaData>],
677 column: &ColumnMetadata,
678 is_min: bool,
679 ) -> StatValues {
680 let is_first_tag = self
681 .metadata
682 .primary_key
683 .first()
684 .map(|id| *id == column.column_id)
685 .unwrap_or(false);
686 if !is_first_tag {
687 return StatValues::NoStats;
689 }
690
691 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
692 }
693
694 fn first_tag_values(
697 &self,
698 row_groups: &[impl Borrow<RowGroupMetaData>],
699 column: &ColumnMetadata,
700 is_min: bool,
701 ) -> Option<ArrayRef> {
702 debug_assert!(
703 self.metadata
704 .primary_key
705 .first()
706 .map(|id| *id == column.column_id)
707 .unwrap_or(false)
708 );
709
710 let primary_key_encoding = self.metadata.primary_key_encoding;
711 let converter = build_primary_key_codec_with_fields(
712 primary_key_encoding,
713 [(
714 column.column_id,
715 SortField::new(column.column_schema.data_type.clone()),
716 )]
717 .into_iter(),
718 );
719
720 let values = row_groups.iter().map(|meta| {
721 let stats = meta
722 .borrow()
723 .column(self.primary_key_position())
724 .statistics()?;
725 match stats {
726 Statistics::Boolean(_) => None,
727 Statistics::Int32(_) => None,
728 Statistics::Int64(_) => None,
729 Statistics::Int96(_) => None,
730 Statistics::Float(_) => None,
731 Statistics::Double(_) => None,
732 Statistics::ByteArray(s) => {
733 let bytes = if is_min {
734 s.min_bytes_opt()?
735 } else {
736 s.max_bytes_opt()?
737 };
738 converter.decode_leftmost(bytes).ok()?
739 }
740 Statistics::FixedLenByteArray(_) => None,
741 }
742 });
743 let mut builder = column
744 .column_schema
745 .data_type
746 .create_mutable_vector(row_groups.len());
747 for value_opt in values {
748 match value_opt {
749 Some(v) => builder.push_value_ref(&v.as_value_ref()),
751 None => builder.push_null(),
752 }
753 }
754 let vector = builder.to_vector();
755
756 Some(vector.to_arrow_array())
757 }
758
759 fn primary_key_position(&self) -> usize {
761 self.arrow_schema.fields.len() - 3
762 }
763
764 fn time_index_position(&self) -> usize {
766 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
767 }
768
769 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
771 self.field_id_to_projected_index.get(&column_id).copied()
772 }
773}
774
775pub(crate) struct FormatProjection {
777 pub(crate) projection_indices: Vec<usize>,
779 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
784}
785
786impl FormatProjection {
787 pub(crate) fn compute_format_projection(
791 id_to_index: &HashMap<ColumnId, usize>,
792 sst_column_num: usize,
793 column_ids: impl Iterator<Item = ColumnId>,
794 ) -> Self {
795 let mut projected_schema: Vec<_> = column_ids
799 .filter_map(|column_id| {
800 id_to_index
801 .get(&column_id)
802 .copied()
803 .map(|index| (column_id, index))
804 })
805 .collect();
806 projected_schema.sort_unstable_by_key(|x| x.1);
809 projected_schema.dedup_by_key(|x| x.1);
811
812 let mut projection_indices: Vec<_> = projected_schema
815 .iter()
816 .map(|(_column_id, index)| *index)
817 .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
819 .collect();
820 projection_indices.sort_unstable();
821 projection_indices.dedup();
823
824 let column_id_to_projected_index = projected_schema
826 .into_iter()
827 .map(|(column_id, _)| column_id)
828 .enumerate()
829 .map(|(index, column_id)| (column_id, index))
830 .collect();
831
832 Self {
833 projection_indices,
834 column_id_to_projected_index,
835 }
836 }
837}
838
839pub enum StatValues {
844 Values(ArrayRef),
846 NoColumn,
848 NoStats,
850}
851
852impl StatValues {
853 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
855 match stats {
856 Some(stats) => StatValues::Values(stats),
857 None => StatValues::NoStats,
858 }
859 }
860}
861
862#[cfg(test)]
863impl PrimaryKeyReadFormat {
864 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
866 Self::new(
867 Arc::clone(&metadata),
868 metadata.column_metadatas.iter().map(|c| c.column_id),
869 )
870 }
871}
872
873fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
875 if pk_dict_array.is_empty() {
876 return Ok(Vec::new());
877 }
878
879 let mut offsets = vec![0];
881 let keys = pk_dict_array.keys();
882 let pk_indices = keys.values();
884 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
885 if *key != pk_indices[i + 1] {
887 offsets.push(i + 1);
889 }
890 }
891 offsets.push(keys.len());
892
893 Ok(offsets)
894}
895
896fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
898 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
899 let keys = UInt32Array::from_value(0, num_rows);
900
901 Arc::new(DictionaryArray::new(keys, values))
903}
904
905pub(crate) fn parquet_row_group_time_range(
908 file_meta: &FileMeta,
909 parquet_meta: &ParquetMetaData,
910 row_group_idx: usize,
911) -> Option<FileTimeRange> {
912 let row_group_meta = parquet_meta.row_group(row_group_idx);
913 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
914 assert!(
915 num_columns >= FIXED_POS_COLUMN_NUM,
916 "file only has {} columns",
917 num_columns
918 );
919 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
920
921 let stats = row_group_meta.column(time_index_pos).statistics()?;
922 let (min, max) = match stats {
924 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
925 Statistics::Int32(_)
926 | Statistics::Boolean(_)
927 | Statistics::Int96(_)
928 | Statistics::Float(_)
929 | Statistics::Double(_)
930 | Statistics::ByteArray(_)
931 | Statistics::FixedLenByteArray(_) => {
932 common_telemetry::warn!(
933 "Invalid statistics {:?} for time index in parquet in {}",
934 stats,
935 file_meta.file_id
936 );
937 return None;
938 }
939 };
940
941 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
942 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
943 let unit = file_meta.time_range.0.unit();
944
945 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
946}
947
948pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
951 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
952 if num_columns < FIXED_POS_COLUMN_NUM {
953 return false;
954 }
955
956 let sequence_pos = num_columns - 2;
958
959 for row_group in parquet_meta.row_groups() {
961 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
962 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
963 if *min_val != 0 || *max_val != 0 {
965 return false;
966 }
967 } else {
968 return false;
970 }
971 } else {
972 return false;
974 }
975 }
976
977 !parquet_meta.row_groups().is_empty()
979}
980
981#[cfg(test)]
982mod tests {
983 use std::sync::Arc;
984
985 use api::v1::OpType;
986 use datatypes::arrow::array::{
987 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
988 };
989 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
990 use datatypes::prelude::ConcreteDataType;
991 use datatypes::schema::ColumnSchema;
992 use datatypes::value::ValueRef;
993 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
994 use mito_codec::row_converter::{
995 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
996 };
997 use store_api::codec::PrimaryKeyEncoding;
998 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
999 use store_api::storage::RegionId;
1000 use store_api::storage::consts::ReservedColumnId;
1001
1002 use super::*;
1003 use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1004 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1005
1006 const TEST_SEQUENCE: u64 = 1;
1007 const TEST_OP_TYPE: u8 = OpType::Put as u8;
1008
1009 fn build_test_region_metadata() -> RegionMetadataRef {
1010 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1011 builder
1012 .push_column_metadata(ColumnMetadata {
1013 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1014 semantic_type: SemanticType::Tag,
1015 column_id: 1,
1016 })
1017 .push_column_metadata(ColumnMetadata {
1018 column_schema: ColumnSchema::new(
1019 "field1",
1020 ConcreteDataType::int64_datatype(),
1021 true,
1022 ),
1023 semantic_type: SemanticType::Field,
1024 column_id: 4, })
1026 .push_column_metadata(ColumnMetadata {
1027 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1028 semantic_type: SemanticType::Tag,
1029 column_id: 3,
1030 })
1031 .push_column_metadata(ColumnMetadata {
1032 column_schema: ColumnSchema::new(
1033 "field0",
1034 ConcreteDataType::int64_datatype(),
1035 true,
1036 ),
1037 semantic_type: SemanticType::Field,
1038 column_id: 2,
1039 })
1040 .push_column_metadata(ColumnMetadata {
1041 column_schema: ColumnSchema::new(
1042 "ts",
1043 ConcreteDataType::timestamp_millisecond_datatype(),
1044 false,
1045 ),
1046 semantic_type: SemanticType::Timestamp,
1047 column_id: 5,
1048 })
1049 .primary_key(vec![1, 3]);
1050 Arc::new(builder.build().unwrap())
1051 }
1052
1053 fn build_test_arrow_schema() -> SchemaRef {
1054 let fields = vec![
1055 Field::new("field1", ArrowDataType::Int64, true),
1056 Field::new("field0", ArrowDataType::Int64, true),
1057 Field::new(
1058 "ts",
1059 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1060 false,
1061 ),
1062 Field::new(
1063 "__primary_key",
1064 ArrowDataType::Dictionary(
1065 Box::new(ArrowDataType::UInt32),
1066 Box::new(ArrowDataType::Binary),
1067 ),
1068 false,
1069 ),
1070 Field::new("__sequence", ArrowDataType::UInt64, false),
1071 Field::new("__op_type", ArrowDataType::UInt8, false),
1072 ];
1073 Arc::new(Schema::new(fields))
1074 }
1075
1076 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1077 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1078 }
1079
1080 fn new_batch_with_sequence(
1081 primary_key: &[u8],
1082 start_ts: i64,
1083 start_field: i64,
1084 num_rows: usize,
1085 sequence: u64,
1086 ) -> Batch {
1087 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1088 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1089 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1090 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1091 let fields = vec![
1092 BatchColumn {
1093 column_id: 4,
1094 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1095 }, BatchColumn {
1097 column_id: 2,
1098 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1099 }, ];
1101
1102 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1103 .with_fields(fields)
1104 .build()
1105 .unwrap()
1106 }
1107
1108 #[test]
1109 fn test_to_sst_arrow_schema() {
1110 let metadata = build_test_region_metadata();
1111 let write_format = PrimaryKeyWriteFormat::new(metadata);
1112 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1113 }
1114
1115 #[test]
1116 fn test_new_primary_key_array() {
1117 let array = new_primary_key_array(b"test", 3);
1118 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1119 assert_eq!(&expect, &array);
1120 }
1121
1122 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1123 let values = Arc::new(BinaryArray::from_iter_values(
1124 pk_row_nums.iter().map(|v| &v.0),
1125 ));
1126 let mut keys = vec![];
1127 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1128 keys.extend(std::iter::repeat_n(index as u32, num_rows));
1129 }
1130 let keys = UInt32Array::from(keys);
1131 Arc::new(DictionaryArray::new(keys, values))
1132 }
1133
1134 #[test]
1135 fn test_convert_batch() {
1136 let metadata = build_test_region_metadata();
1137 let write_format = PrimaryKeyWriteFormat::new(metadata);
1138
1139 let num_rows = 4;
1140 let batch = new_batch(b"test", 1, 2, num_rows);
1141 let columns: Vec<ArrayRef> = vec![
1142 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1149 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1150
1151 let actual = write_format.convert_batch(&batch).unwrap();
1152 assert_eq!(expect_record, actual);
1153 }
1154
1155 #[test]
1156 fn test_convert_batch_with_override_sequence() {
1157 let metadata = build_test_region_metadata();
1158 let write_format =
1159 PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1160
1161 let num_rows = 4;
1162 let batch = new_batch(b"test", 1, 2, num_rows);
1163 let columns: Vec<ArrayRef> = vec![
1164 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1171 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1172
1173 let actual = write_format.convert_batch(&batch).unwrap();
1174 assert_eq!(expect_record, actual);
1175 }
1176
1177 #[test]
1178 fn test_projection_indices() {
1179 let metadata = build_test_region_metadata();
1180 let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1182 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1183 let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1185 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1186 let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1188 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1189 let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1191 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1192 }
1193
1194 #[test]
1195 fn test_empty_primary_key_offsets() {
1196 let array = build_test_pk_array(&[]);
1197 assert!(primary_key_offsets(&array).unwrap().is_empty());
1198 }
1199
1200 #[test]
1201 fn test_primary_key_offsets_one_series() {
1202 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1203 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1204
1205 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1206 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1207
1208 let array = build_test_pk_array(&[
1209 (b"one".to_vec(), 1),
1210 (b"two".to_vec(), 1),
1211 (b"three".to_vec(), 1),
1212 ]);
1213 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1214 }
1215
1216 #[test]
1217 fn test_primary_key_offsets_multi_series() {
1218 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1219 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1220
1221 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1222 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1223
1224 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1225 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1226 }
1227
1228 #[test]
1229 fn test_convert_empty_record_batch() {
1230 let metadata = build_test_region_metadata();
1231 let arrow_schema = build_test_arrow_schema();
1232 let column_ids: Vec<_> = metadata
1233 .column_metadatas
1234 .iter()
1235 .map(|col| col.column_id)
1236 .collect();
1237 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1238 assert_eq!(arrow_schema, *read_format.arrow_schema());
1239
1240 let record_batch = RecordBatch::new_empty(arrow_schema);
1241 let mut batches = VecDeque::new();
1242 read_format
1243 .convert_record_batch(&record_batch, None, &mut batches)
1244 .unwrap();
1245 assert!(batches.is_empty());
1246 }
1247
1248 #[test]
1249 fn test_convert_record_batch() {
1250 let metadata = build_test_region_metadata();
1251 let column_ids: Vec<_> = metadata
1252 .column_metadatas
1253 .iter()
1254 .map(|col| col.column_id)
1255 .collect();
1256 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1257
1258 let columns: Vec<ArrayRef> = vec![
1259 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1266 let arrow_schema = build_test_arrow_schema();
1267 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1268 let mut batches = VecDeque::new();
1269 read_format
1270 .convert_record_batch(&record_batch, None, &mut batches)
1271 .unwrap();
1272
1273 assert_eq!(
1274 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1275 batches.into_iter().collect::<Vec<_>>(),
1276 );
1277 }
1278
1279 #[test]
1280 fn test_convert_record_batch_with_override_sequence() {
1281 let metadata = build_test_region_metadata();
1282 let column_ids: Vec<_> = metadata
1283 .column_metadatas
1284 .iter()
1285 .map(|col| col.column_id)
1286 .collect();
1287 let read_format =
1288 ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1289
1290 let columns: Vec<ArrayRef> = vec![
1291 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1298 let arrow_schema = build_test_arrow_schema();
1299 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1300
1301 let override_sequence: u64 = 12345;
1303 let override_sequence_array: ArrayRef =
1304 Arc::new(UInt64Array::from_value(override_sequence, 4));
1305
1306 let mut batches = VecDeque::new();
1307 read_format
1308 .as_primary_key()
1309 .unwrap()
1310 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1311 .unwrap();
1312
1313 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1315 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1316
1317 assert_eq!(
1318 vec![expected_batch1, expected_batch2],
1319 batches.into_iter().collect::<Vec<_>>(),
1320 );
1321 }
1322
1323 fn build_test_flat_sst_schema() -> SchemaRef {
1324 let fields = vec![
1325 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1327 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1329 Field::new(
1330 "ts",
1331 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1332 false,
1333 ),
1334 Field::new(
1335 "__primary_key",
1336 ArrowDataType::Dictionary(
1337 Box::new(ArrowDataType::UInt32),
1338 Box::new(ArrowDataType::Binary),
1339 ),
1340 false,
1341 ),
1342 Field::new("__sequence", ArrowDataType::UInt64, false),
1343 Field::new("__op_type", ArrowDataType::UInt8, false),
1344 ];
1345 Arc::new(Schema::new(fields))
1346 }
1347
1348 #[test]
1349 fn test_flat_to_sst_arrow_schema() {
1350 let metadata = build_test_region_metadata();
1351 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1352 assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1353 }
1354
1355 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1356 vec![
1357 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1366 }
1367
1368 #[test]
1369 fn test_flat_convert_batch() {
1370 let metadata = build_test_region_metadata();
1371 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1372
1373 let num_rows = 4;
1374 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1375 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1376 let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1377
1378 let actual = format.convert_batch(&batch).unwrap();
1379 assert_eq!(expect_record, actual);
1380 }
1381
1382 #[test]
1383 fn test_flat_convert_with_override_sequence() {
1384 let metadata = build_test_region_metadata();
1385 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1386 .with_override_sequence(Some(415411));
1387
1388 let num_rows = 4;
1389 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1390 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1391
1392 let expected_columns: Vec<ArrayRef> = vec![
1393 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1402 let expected_record =
1403 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1404
1405 let actual = format.convert_batch(&batch).unwrap();
1406 assert_eq!(expected_record, actual);
1407 }
1408
1409 #[test]
1410 fn test_flat_projection_indices() {
1411 let metadata = build_test_region_metadata();
1412 let read_format =
1417 ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1418 .unwrap();
1419 assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1420
1421 let read_format =
1423 ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1424 .unwrap();
1425 assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1426
1427 let read_format =
1429 ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1430 .unwrap();
1431 assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1432
1433 let read_format =
1435 ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1436 assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1437 }
1438
1439 #[test]
1440 fn test_flat_read_format_convert_batch() {
1441 let metadata = build_test_region_metadata();
1442 let mut format = FlatReadFormat::new(
1443 metadata,
1444 std::iter::once(1), Some(8),
1446 "test",
1447 false,
1448 )
1449 .unwrap();
1450
1451 let num_rows = 4;
1452 let original_sequence = 100u64;
1453 let override_sequence = 200u64;
1454
1455 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1457 let mut test_columns = columns.clone();
1458 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1460 let record_batch =
1461 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1462
1463 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1465 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1466 let sequence_array = sequence_column
1467 .as_any()
1468 .downcast_ref::<UInt64Array>()
1469 .unwrap();
1470
1471 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1472 assert_eq!(sequence_array, &expected_original);
1473
1474 format.set_override_sequence(Some(override_sequence));
1476 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1477 let result = format
1478 .convert_batch(record_batch, Some(&override_sequence_array))
1479 .unwrap();
1480 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1481 let sequence_array = sequence_column
1482 .as_any()
1483 .downcast_ref::<UInt64Array>()
1484 .unwrap();
1485
1486 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1487 assert_eq!(sequence_array, &expected_override);
1488 }
1489
1490 #[test]
1491 fn test_need_convert_to_flat() {
1492 let metadata = build_test_region_metadata();
1493
1494 let expected_columns = metadata.column_metadatas.len() + 3;
1497 let result =
1498 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1499 assert!(
1500 !result,
1501 "Should not need conversion when column counts match"
1502 );
1503
1504 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1507 let result =
1508 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1509 .unwrap();
1510 assert!(
1511 result,
1512 "Should need conversion when primary key columns are missing"
1513 );
1514
1515 let too_many_columns = expected_columns + 1;
1517 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1518 .unwrap_err();
1519 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1520
1521 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1524 .unwrap_err();
1525 assert!(
1526 err.to_string().contains("Column number difference"),
1527 "{err:?}"
1528 );
1529 }
1530
1531 fn build_test_dense_pk_array(
1532 codec: &DensePrimaryKeyCodec,
1533 pk_values_per_row: &[&[Option<i64>]],
1534 ) -> Arc<PrimaryKeyArray> {
1535 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1536
1537 for pk_values_row in pk_values_per_row {
1538 let values: Vec<ValueRef> = pk_values_row
1539 .iter()
1540 .map(|opt| match opt {
1541 Some(val) => ValueRef::Int64(*val),
1542 None => ValueRef::Null,
1543 })
1544 .collect();
1545
1546 let encoded = codec.encode(values.into_iter()).unwrap();
1547 builder.append_value(&encoded);
1548 }
1549
1550 Arc::new(builder.finish())
1551 }
1552
1553 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1554 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1555 builder
1556 .push_column_metadata(ColumnMetadata {
1557 column_schema: ColumnSchema::new(
1558 "__table_id",
1559 ConcreteDataType::uint32_datatype(),
1560 false,
1561 ),
1562 semantic_type: SemanticType::Tag,
1563 column_id: ReservedColumnId::table_id(),
1564 })
1565 .push_column_metadata(ColumnMetadata {
1566 column_schema: ColumnSchema::new(
1567 "__tsid",
1568 ConcreteDataType::uint64_datatype(),
1569 false,
1570 ),
1571 semantic_type: SemanticType::Tag,
1572 column_id: ReservedColumnId::tsid(),
1573 })
1574 .push_column_metadata(ColumnMetadata {
1575 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1576 semantic_type: SemanticType::Tag,
1577 column_id: 1,
1578 })
1579 .push_column_metadata(ColumnMetadata {
1580 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1581 semantic_type: SemanticType::Tag,
1582 column_id: 3,
1583 })
1584 .push_column_metadata(ColumnMetadata {
1585 column_schema: ColumnSchema::new(
1586 "field1",
1587 ConcreteDataType::int64_datatype(),
1588 true,
1589 ),
1590 semantic_type: SemanticType::Field,
1591 column_id: 4,
1592 })
1593 .push_column_metadata(ColumnMetadata {
1594 column_schema: ColumnSchema::new(
1595 "field0",
1596 ConcreteDataType::int64_datatype(),
1597 true,
1598 ),
1599 semantic_type: SemanticType::Field,
1600 column_id: 2,
1601 })
1602 .push_column_metadata(ColumnMetadata {
1603 column_schema: ColumnSchema::new(
1604 "ts",
1605 ConcreteDataType::timestamp_millisecond_datatype(),
1606 false,
1607 ),
1608 semantic_type: SemanticType::Timestamp,
1609 column_id: 5,
1610 })
1611 .primary_key(vec![
1612 ReservedColumnId::table_id(),
1613 ReservedColumnId::tsid(),
1614 1,
1615 3,
1616 ])
1617 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1618 Arc::new(builder.build().unwrap())
1619 }
1620
1621 fn build_test_sparse_pk_array(
1622 codec: &SparsePrimaryKeyCodec,
1623 pk_values_per_row: &[SparseTestRow],
1624 ) -> Arc<PrimaryKeyArray> {
1625 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1626 for row in pk_values_per_row {
1627 let values = vec![
1628 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1629 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1630 (1, ValueRef::String(&row.tag0)),
1631 (3, ValueRef::String(&row.tag1)),
1632 ];
1633
1634 let mut buffer = Vec::new();
1635 codec.encode_value_refs(&values, &mut buffer).unwrap();
1636 builder.append_value(&buffer);
1637 }
1638
1639 Arc::new(builder.finish())
1640 }
1641
1642 #[derive(Clone)]
1643 struct SparseTestRow {
1644 table_id: u32,
1645 tsid: u64,
1646 tag0: String,
1647 tag1: String,
1648 }
1649
1650 #[test]
1651 fn test_flat_read_format_convert_format_with_dense_encoding() {
1652 let metadata = build_test_region_metadata();
1653
1654 let column_ids: Vec<_> = metadata
1655 .column_metadatas
1656 .iter()
1657 .map(|c| c.column_id)
1658 .collect();
1659 let format = FlatReadFormat::new(
1660 metadata.clone(),
1661 column_ids.into_iter(),
1662 Some(6),
1663 "test",
1664 false,
1665 )
1666 .unwrap();
1667
1668 let num_rows = 4;
1669 let original_sequence = 100u64;
1670
1671 let pk_values_per_row = vec![
1673 &[Some(1i64), Some(1i64)][..]; num_rows ];
1675
1676 let codec = DensePrimaryKeyCodec::new(&metadata);
1678 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1679 let columns: Vec<ArrayRef> = vec![
1680 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1687
1688 let old_format_fields = vec![
1690 Field::new("field1", ArrowDataType::Int64, true),
1691 Field::new("field0", ArrowDataType::Int64, true),
1692 Field::new(
1693 "ts",
1694 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1695 false,
1696 ),
1697 Field::new(
1698 "__primary_key",
1699 ArrowDataType::Dictionary(
1700 Box::new(ArrowDataType::UInt32),
1701 Box::new(ArrowDataType::Binary),
1702 ),
1703 false,
1704 ),
1705 Field::new("__sequence", ArrowDataType::UInt64, false),
1706 Field::new("__op_type", ArrowDataType::UInt8, false),
1707 ];
1708 let old_schema = Arc::new(Schema::new(old_format_fields));
1709 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1710
1711 let result = format.convert_batch(record_batch, None).unwrap();
1713
1714 let expected_columns: Vec<ArrayRef> = vec![
1716 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1725 let expected_record_batch =
1726 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1727
1728 assert_eq!(expected_record_batch, result);
1730 }
1731
1732 #[test]
1733 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1734 let metadata = build_test_sparse_region_metadata();
1735
1736 let column_ids: Vec<_> = metadata
1737 .column_metadatas
1738 .iter()
1739 .map(|c| c.column_id)
1740 .collect();
1741 let format = FlatReadFormat::new(
1742 metadata.clone(),
1743 column_ids.clone().into_iter(),
1744 None,
1745 "test",
1746 false,
1747 )
1748 .unwrap();
1749
1750 let num_rows = 4;
1751 let original_sequence = 100u64;
1752
1753 let pk_test_rows = vec![
1755 SparseTestRow {
1756 table_id: 1,
1757 tsid: 123,
1758 tag0: "frontend".to_string(),
1759 tag1: "pod1".to_string(),
1760 };
1761 num_rows
1762 ];
1763
1764 let codec = SparsePrimaryKeyCodec::new(&metadata);
1765 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1766 let columns: Vec<ArrayRef> = vec![
1768 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1775
1776 let old_format_fields = vec![
1778 Field::new("field1", ArrowDataType::Int64, true),
1779 Field::new("field0", ArrowDataType::Int64, true),
1780 Field::new(
1781 "ts",
1782 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1783 false,
1784 ),
1785 Field::new(
1786 "__primary_key",
1787 ArrowDataType::Dictionary(
1788 Box::new(ArrowDataType::UInt32),
1789 Box::new(ArrowDataType::Binary),
1790 ),
1791 false,
1792 ),
1793 Field::new("__sequence", ArrowDataType::UInt64, false),
1794 Field::new("__op_type", ArrowDataType::UInt8, false),
1795 ];
1796 let old_schema = Arc::new(Schema::new(old_format_fields));
1797 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1798
1799 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1801
1802 let tag0_array = Arc::new(DictionaryArray::new(
1804 UInt32Array::from(vec![0; num_rows]),
1805 Arc::new(StringArray::from(vec!["frontend"])),
1806 ));
1807 let tag1_array = Arc::new(DictionaryArray::new(
1808 UInt32Array::from(vec![0; num_rows]),
1809 Arc::new(StringArray::from(vec!["pod1"])),
1810 ));
1811 let expected_columns: Vec<ArrayRef> = vec![
1812 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1823 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1824 let expected_record_batch =
1825 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1826
1827 assert_eq!(expected_record_batch, result);
1829
1830 let format =
1831 FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1832 .unwrap();
1833 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1835 assert_eq!(record_batch, result);
1836 }
1837}