1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{
44 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45 build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54 ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
55 NewRecordBatchSnafu, Result,
56};
57use crate::read::{Batch, BatchBuilder, BatchColumn};
58use crate::sst::file::{FileMeta, FileTimeRange};
59use crate::sst::parquet::flat_format::FlatReadFormat;
60use crate::sst::to_sst_arrow_schema;
61
62pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
64pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
66
67pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
71pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
73
74pub(crate) struct PrimaryKeyWriteFormat {
76 metadata: RegionMetadataRef,
77 arrow_schema: SchemaRef,
79 override_sequence: Option<SequenceNumber>,
80}
81
82impl PrimaryKeyWriteFormat {
83 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
85 let arrow_schema = to_sst_arrow_schema(&metadata);
86 PrimaryKeyWriteFormat {
87 metadata,
88 arrow_schema,
89 override_sequence: None,
90 }
91 }
92
93 pub(crate) fn with_override_sequence(
95 mut self,
96 override_sequence: Option<SequenceNumber>,
97 ) -> Self {
98 self.override_sequence = override_sequence;
99 self
100 }
101
102 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
104 &self.arrow_schema
105 }
106
107 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
109 debug_assert_eq!(
110 batch.fields().len() + FIXED_POS_COLUMN_NUM,
111 self.arrow_schema.fields().len()
112 );
113 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
114 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
116 ensure!(
117 column.column_id == column_metadata.column_id,
118 InvalidBatchSnafu {
119 reason: format!(
120 "Batch has column {} but metadata has column {}",
121 column.column_id, column_metadata.column_id
122 ),
123 }
124 );
125
126 columns.push(column.data.to_arrow_array());
127 }
128 columns.push(batch.timestamps().to_arrow_array());
130 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
132
133 if let Some(override_sequence) = self.override_sequence {
134 let sequence_array =
135 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
136 columns.push(sequence_array);
137 } else {
138 columns.push(batch.sequences().to_arrow_array());
139 }
140 columns.push(batch.op_types().to_arrow_array());
141
142 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
143 }
144}
145
146pub enum ReadFormat {
148 PrimaryKey(PrimaryKeyReadFormat),
150 Flat(FlatReadFormat),
152}
153
154impl ReadFormat {
155 pub fn new_primary_key(
157 metadata: RegionMetadataRef,
158 column_ids: impl Iterator<Item = ColumnId>,
159 ) -> Self {
160 ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
161 }
162
163 pub fn new_flat(
165 metadata: RegionMetadataRef,
166 column_ids: impl Iterator<Item = ColumnId>,
167 num_columns: Option<usize>,
168 file_path: &str,
169 skip_auto_convert: bool,
170 ) -> Result<Self> {
171 Ok(ReadFormat::Flat(FlatReadFormat::new(
172 metadata,
173 column_ids,
174 num_columns,
175 file_path,
176 skip_auto_convert,
177 )?))
178 }
179
180 pub fn new(
182 region_metadata: RegionMetadataRef,
183 projection: Option<&[ColumnId]>,
184 flat_format: bool,
185 num_columns: Option<usize>,
186 file_path: &str,
187 skip_auto_convert: bool,
188 ) -> Result<ReadFormat> {
189 if flat_format {
190 if let Some(column_ids) = projection {
191 ReadFormat::new_flat(
192 region_metadata,
193 column_ids.iter().copied(),
194 num_columns,
195 file_path,
196 skip_auto_convert,
197 )
198 } else {
199 ReadFormat::new_flat(
201 region_metadata.clone(),
202 region_metadata
203 .column_metadatas
204 .iter()
205 .map(|col| col.column_id),
206 num_columns,
207 file_path,
208 skip_auto_convert,
209 )
210 }
211 } else if let Some(column_ids) = projection {
212 Ok(ReadFormat::new_primary_key(
213 region_metadata,
214 column_ids.iter().copied(),
215 ))
216 } else {
217 Ok(ReadFormat::new_primary_key(
219 region_metadata.clone(),
220 region_metadata
221 .column_metadatas
222 .iter()
223 .map(|col| col.column_id),
224 ))
225 }
226 }
227
228 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
229 match self {
230 ReadFormat::PrimaryKey(format) => Some(format),
231 _ => None,
232 }
233 }
234
235 pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
236 match self {
237 ReadFormat::Flat(format) => Some(format),
238 _ => None,
239 }
240 }
241
242 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
247 match self {
248 ReadFormat::PrimaryKey(format) => format.arrow_schema(),
249 ReadFormat::Flat(format) => format.arrow_schema(),
250 }
251 }
252
253 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
255 match self {
256 ReadFormat::PrimaryKey(format) => format.metadata(),
257 ReadFormat::Flat(format) => format.metadata(),
258 }
259 }
260
261 pub(crate) fn projection_indices(&self) -> &[usize] {
263 match self {
264 ReadFormat::PrimaryKey(format) => format.projection_indices(),
265 ReadFormat::Flat(format) => format.projection_indices(),
266 }
267 }
268
269 pub fn min_values(
271 &self,
272 row_groups: &[impl Borrow<RowGroupMetaData>],
273 column_id: ColumnId,
274 ) -> StatValues {
275 match self {
276 ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
277 ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
278 }
279 }
280
281 pub fn max_values(
283 &self,
284 row_groups: &[impl Borrow<RowGroupMetaData>],
285 column_id: ColumnId,
286 ) -> StatValues {
287 match self {
288 ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
289 ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
290 }
291 }
292
293 pub fn null_counts(
295 &self,
296 row_groups: &[impl Borrow<RowGroupMetaData>],
297 column_id: ColumnId,
298 ) -> StatValues {
299 match self {
300 ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
301 ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
302 }
303 }
304
305 pub(crate) fn column_values(
309 row_groups: &[impl Borrow<RowGroupMetaData>],
310 column: &ColumnMetadata,
311 column_index: usize,
312 is_min: bool,
313 ) -> Option<ArrayRef> {
314 let null_scalar: ScalarValue = column
315 .column_schema
316 .data_type
317 .as_arrow_type()
318 .try_into()
319 .ok()?;
320 let scalar_values = row_groups
321 .iter()
322 .map(|meta| {
323 let stats = meta.borrow().column(column_index).statistics()?;
324 match stats {
325 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
326 *s.min_opt()?
327 } else {
328 *s.max_opt()?
329 }))),
330 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
331 *s.min_opt()?
332 } else {
333 *s.max_opt()?
334 }))),
335 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
336 *s.min_opt()?
337 } else {
338 *s.max_opt()?
339 }))),
340
341 Statistics::Int96(_) => None,
342 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
343 *s.min_opt()?
344 } else {
345 *s.max_opt()?
346 }))),
347 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
348 *s.min_opt()?
349 } else {
350 *s.max_opt()?
351 }))),
352 Statistics::ByteArray(s) => {
353 let bytes = if is_min {
354 s.min_bytes_opt()?
355 } else {
356 s.max_bytes_opt()?
357 };
358 let s = String::from_utf8(bytes.to_vec()).ok();
359 Some(ScalarValue::Utf8(s))
360 }
361
362 Statistics::FixedLenByteArray(_) => None,
363 }
364 })
365 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
366 .collect::<Vec<ScalarValue>>();
367 debug_assert_eq!(scalar_values.len(), row_groups.len());
368 ScalarValue::iter_to_array(scalar_values).ok()
369 }
370
371 pub(crate) fn column_null_counts(
374 row_groups: &[impl Borrow<RowGroupMetaData>],
375 column_index: usize,
376 ) -> Option<ArrayRef> {
377 let values = row_groups.iter().map(|meta| {
378 let col = meta.borrow().column(column_index);
379 let stat = col.statistics()?;
380 stat.null_count_opt()
381 });
382 Some(Arc::new(UInt64Array::from_iter(values)))
383 }
384
385 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
387 match self {
388 ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
389 ReadFormat::Flat(format) => format.set_override_sequence(sequence),
390 }
391 }
392
393 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
395 if let ReadFormat::PrimaryKey(format) = self {
396 format.set_decode_primary_key_values(decode);
397 }
398 }
399
400 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
402 match self {
403 ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
404 ReadFormat::Flat(format) => format.new_override_sequence_array(length),
405 }
406 }
407}
408
409pub struct PrimaryKeyReadFormat {
411 metadata: RegionMetadataRef,
413 arrow_schema: SchemaRef,
415 field_id_to_index: HashMap<ColumnId, usize>,
418 projection_indices: Vec<usize>,
420 field_id_to_projected_index: HashMap<ColumnId, usize>,
423 override_sequence: Option<SequenceNumber>,
425 primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
427}
428
429impl PrimaryKeyReadFormat {
430 pub fn new(
432 metadata: RegionMetadataRef,
433 column_ids: impl Iterator<Item = ColumnId>,
434 ) -> PrimaryKeyReadFormat {
435 let field_id_to_index: HashMap<_, _> = metadata
436 .field_columns()
437 .enumerate()
438 .map(|(index, column)| (column.column_id, index))
439 .collect();
440 let arrow_schema = to_sst_arrow_schema(&metadata);
441
442 let format_projection = FormatProjection::compute_format_projection(
443 &field_id_to_index,
444 arrow_schema.fields.len(),
445 column_ids,
446 );
447
448 PrimaryKeyReadFormat {
449 metadata,
450 arrow_schema,
451 field_id_to_index,
452 projection_indices: format_projection.projection_indices,
453 field_id_to_projected_index: format_projection.column_id_to_projected_index,
454 override_sequence: None,
455 primary_key_codec: None,
456 }
457 }
458
459 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
461 self.override_sequence = sequence;
462 }
463
464 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
466 self.primary_key_codec = if decode {
467 Some(build_primary_key_codec(&self.metadata))
468 } else {
469 None
470 };
471 }
472
473 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
478 &self.arrow_schema
479 }
480
481 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
483 &self.metadata
484 }
485
486 pub(crate) fn projection_indices(&self) -> &[usize] {
488 &self.projection_indices
489 }
490
491 pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
493 &self.field_id_to_projected_index
494 }
495
496 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
498 self.override_sequence
499 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
500 }
501
502 pub fn convert_record_batch(
507 &self,
508 record_batch: &RecordBatch,
509 override_sequence_array: Option<&ArrayRef>,
510 batches: &mut VecDeque<Batch>,
511 ) -> Result<()> {
512 debug_assert!(batches.is_empty());
513
514 ensure!(
516 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
517 InvalidRecordBatchSnafu {
518 reason: format!(
519 "record batch only has {} columns",
520 record_batch.num_columns()
521 ),
522 }
523 );
524
525 let mut fixed_pos_columns = record_batch
526 .columns()
527 .iter()
528 .rev()
529 .take(FIXED_POS_COLUMN_NUM);
530 let op_type_array = fixed_pos_columns.next().unwrap();
532 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
533 let pk_array = fixed_pos_columns.next().unwrap();
534 let ts_array = fixed_pos_columns.next().unwrap();
535 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
536
537 if let Some(override_array) = override_sequence_array {
539 assert!(override_array.len() >= sequence_array.len());
540 sequence_array = if override_array.len() > sequence_array.len() {
543 override_array.slice(0, sequence_array.len())
544 } else {
545 override_array.clone()
546 };
547 }
548
549 let pk_dict_array = pk_array
551 .as_any()
552 .downcast_ref::<PrimaryKeyArray>()
553 .with_context(|| InvalidRecordBatchSnafu {
554 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
555 })?;
556 let offsets = primary_key_offsets(pk_dict_array)?;
557 if offsets.is_empty() {
558 return Ok(());
559 }
560
561 let keys = pk_dict_array.keys();
563 let pk_values = pk_dict_array
564 .values()
565 .as_any()
566 .downcast_ref::<BinaryArray>()
567 .with_context(|| InvalidRecordBatchSnafu {
568 reason: format!(
569 "values of primary key array should not be {:?}",
570 pk_dict_array.values().data_type()
571 ),
572 })?;
573 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
574 let end = offsets[i + 1];
575 let rows_in_batch = end - start;
576 let dict_key = keys.value(*start);
577 let primary_key = pk_values.value(dict_key as usize).to_vec();
578
579 let mut builder = BatchBuilder::new(primary_key);
580 builder
581 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
582 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
583 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
584 for batch_column in &field_batch_columns {
586 builder.push_field(BatchColumn {
587 column_id: batch_column.column_id,
588 data: batch_column.data.slice(*start, rows_in_batch),
589 });
590 }
591
592 let mut batch = builder.build()?;
593 if let Some(codec) = &self.primary_key_codec {
594 let pk_values: CompositeValues =
595 codec.decode(batch.primary_key()).context(DecodeSnafu)?;
596 batch.set_pk_values(pk_values);
597 }
598 batches.push_back(batch);
599 }
600
601 Ok(())
602 }
603
604 pub fn min_values(
606 &self,
607 row_groups: &[impl Borrow<RowGroupMetaData>],
608 column_id: ColumnId,
609 ) -> StatValues {
610 let Some(column) = self.metadata.column_by_id(column_id) else {
611 return StatValues::NoColumn;
613 };
614 match column.semantic_type {
615 SemanticType::Tag => self.tag_values(row_groups, column, true),
616 SemanticType::Field => {
617 let index = self.field_id_to_index.get(&column_id).unwrap();
619 let stats = ReadFormat::column_values(row_groups, column, *index, true);
620 StatValues::from_stats_opt(stats)
621 }
622 SemanticType::Timestamp => {
623 let index = self.time_index_position();
624 let stats = ReadFormat::column_values(row_groups, column, index, true);
625 StatValues::from_stats_opt(stats)
626 }
627 }
628 }
629
630 pub fn max_values(
632 &self,
633 row_groups: &[impl Borrow<RowGroupMetaData>],
634 column_id: ColumnId,
635 ) -> StatValues {
636 let Some(column) = self.metadata.column_by_id(column_id) else {
637 return StatValues::NoColumn;
639 };
640 match column.semantic_type {
641 SemanticType::Tag => self.tag_values(row_groups, column, false),
642 SemanticType::Field => {
643 let index = self.field_id_to_index.get(&column_id).unwrap();
645 let stats = ReadFormat::column_values(row_groups, column, *index, false);
646 StatValues::from_stats_opt(stats)
647 }
648 SemanticType::Timestamp => {
649 let index = self.time_index_position();
650 let stats = ReadFormat::column_values(row_groups, column, index, false);
651 StatValues::from_stats_opt(stats)
652 }
653 }
654 }
655
656 pub fn null_counts(
658 &self,
659 row_groups: &[impl Borrow<RowGroupMetaData>],
660 column_id: ColumnId,
661 ) -> StatValues {
662 let Some(column) = self.metadata.column_by_id(column_id) else {
663 return StatValues::NoColumn;
665 };
666 match column.semantic_type {
667 SemanticType::Tag => StatValues::NoStats,
668 SemanticType::Field => {
669 let index = self.field_id_to_index.get(&column_id).unwrap();
671 let stats = ReadFormat::column_null_counts(row_groups, *index);
672 StatValues::from_stats_opt(stats)
673 }
674 SemanticType::Timestamp => {
675 let index = self.time_index_position();
676 let stats = ReadFormat::column_null_counts(row_groups, index);
677 StatValues::from_stats_opt(stats)
678 }
679 }
680 }
681
682 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
684 record_batch
685 .columns()
686 .iter()
687 .zip(record_batch.schema().fields())
688 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
690 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
691 let column = self
692 .metadata
693 .column_by_name(field.name())
694 .with_context(|| InvalidRecordBatchSnafu {
695 reason: format!("column {} not found in metadata", field.name()),
696 })?;
697
698 Ok(BatchColumn {
699 column_id: column.column_id,
700 data: vector,
701 })
702 })
703 .collect()
704 }
705
706 fn tag_values(
708 &self,
709 row_groups: &[impl Borrow<RowGroupMetaData>],
710 column: &ColumnMetadata,
711 is_min: bool,
712 ) -> StatValues {
713 let is_first_tag = self
714 .metadata
715 .primary_key
716 .first()
717 .map(|id| *id == column.column_id)
718 .unwrap_or(false);
719 if !is_first_tag {
720 return StatValues::NoStats;
722 }
723
724 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
725 }
726
727 fn first_tag_values(
730 &self,
731 row_groups: &[impl Borrow<RowGroupMetaData>],
732 column: &ColumnMetadata,
733 is_min: bool,
734 ) -> Option<ArrayRef> {
735 debug_assert!(
736 self.metadata
737 .primary_key
738 .first()
739 .map(|id| *id == column.column_id)
740 .unwrap_or(false)
741 );
742
743 let primary_key_encoding = self.metadata.primary_key_encoding;
744 let converter = build_primary_key_codec_with_fields(
745 primary_key_encoding,
746 [(
747 column.column_id,
748 SortField::new(column.column_schema.data_type.clone()),
749 )]
750 .into_iter(),
751 );
752
753 let values = row_groups.iter().map(|meta| {
754 let stats = meta
755 .borrow()
756 .column(self.primary_key_position())
757 .statistics()?;
758 match stats {
759 Statistics::Boolean(_) => None,
760 Statistics::Int32(_) => None,
761 Statistics::Int64(_) => None,
762 Statistics::Int96(_) => None,
763 Statistics::Float(_) => None,
764 Statistics::Double(_) => None,
765 Statistics::ByteArray(s) => {
766 let bytes = if is_min {
767 s.min_bytes_opt()?
768 } else {
769 s.max_bytes_opt()?
770 };
771 converter.decode_leftmost(bytes).ok()?
772 }
773 Statistics::FixedLenByteArray(_) => None,
774 }
775 });
776 let mut builder = column
777 .column_schema
778 .data_type
779 .create_mutable_vector(row_groups.len());
780 for value_opt in values {
781 match value_opt {
782 Some(v) => builder.push_value_ref(&v.as_value_ref()),
784 None => builder.push_null(),
785 }
786 }
787 let vector = builder.to_vector();
788
789 Some(vector.to_arrow_array())
790 }
791
792 fn primary_key_position(&self) -> usize {
794 self.arrow_schema.fields.len() - 3
795 }
796
797 fn time_index_position(&self) -> usize {
799 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
800 }
801
802 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
804 self.field_id_to_projected_index.get(&column_id).copied()
805 }
806}
807
808pub(crate) struct FormatProjection {
810 pub(crate) projection_indices: Vec<usize>,
812 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
817}
818
819impl FormatProjection {
820 pub(crate) fn compute_format_projection(
824 id_to_index: &HashMap<ColumnId, usize>,
825 sst_column_num: usize,
826 column_ids: impl Iterator<Item = ColumnId>,
827 ) -> Self {
828 let mut projected_schema: Vec<_> = column_ids
832 .filter_map(|column_id| {
833 id_to_index
834 .get(&column_id)
835 .copied()
836 .map(|index| (column_id, index))
837 })
838 .collect();
839 projected_schema.sort_unstable_by_key(|x| x.1);
842 projected_schema.dedup_by_key(|x| x.1);
844
845 let mut projection_indices: Vec<_> = projected_schema
848 .iter()
849 .map(|(_column_id, index)| *index)
850 .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
852 .collect();
853 projection_indices.sort_unstable();
854 projection_indices.dedup();
856
857 let column_id_to_projected_index = projected_schema
859 .into_iter()
860 .map(|(column_id, _)| column_id)
861 .enumerate()
862 .map(|(index, column_id)| (column_id, index))
863 .collect();
864
865 Self {
866 projection_indices,
867 column_id_to_projected_index,
868 }
869 }
870}
871
872pub enum StatValues {
877 Values(ArrayRef),
879 NoColumn,
881 NoStats,
883}
884
885impl StatValues {
886 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
888 match stats {
889 Some(stats) => StatValues::Values(stats),
890 None => StatValues::NoStats,
891 }
892 }
893}
894
895#[cfg(test)]
896impl PrimaryKeyReadFormat {
897 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
899 Self::new(
900 Arc::clone(&metadata),
901 metadata.column_metadatas.iter().map(|c| c.column_id),
902 )
903 }
904}
905
906fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
908 if pk_dict_array.is_empty() {
909 return Ok(Vec::new());
910 }
911
912 let mut offsets = vec![0];
914 let keys = pk_dict_array.keys();
915 let pk_indices = keys.values();
917 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
918 if *key != pk_indices[i + 1] {
920 offsets.push(i + 1);
922 }
923 }
924 offsets.push(keys.len());
925
926 Ok(offsets)
927}
928
929fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
931 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
932 let keys = UInt32Array::from_value(0, num_rows);
933
934 Arc::new(DictionaryArray::new(keys, values))
936}
937
938pub(crate) fn parquet_row_group_time_range(
941 file_meta: &FileMeta,
942 parquet_meta: &ParquetMetaData,
943 row_group_idx: usize,
944) -> Option<FileTimeRange> {
945 let row_group_meta = parquet_meta.row_group(row_group_idx);
946 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
947 assert!(
948 num_columns >= FIXED_POS_COLUMN_NUM,
949 "file only has {} columns",
950 num_columns
951 );
952 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
953
954 let stats = row_group_meta.column(time_index_pos).statistics()?;
955 let (min, max) = match stats {
957 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
958 Statistics::Int32(_)
959 | Statistics::Boolean(_)
960 | Statistics::Int96(_)
961 | Statistics::Float(_)
962 | Statistics::Double(_)
963 | Statistics::ByteArray(_)
964 | Statistics::FixedLenByteArray(_) => {
965 common_telemetry::warn!(
966 "Invalid statistics {:?} for time index in parquet in {}",
967 stats,
968 file_meta.file_id
969 );
970 return None;
971 }
972 };
973
974 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
975 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
976 let unit = file_meta.time_range.0.unit();
977
978 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
979}
980
981pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
984 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
985 if num_columns < FIXED_POS_COLUMN_NUM {
986 return false;
987 }
988
989 let sequence_pos = num_columns - 2;
991
992 for row_group in parquet_meta.row_groups() {
994 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
995 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
996 if *min_val != 0 || *max_val != 0 {
998 return false;
999 }
1000 } else {
1001 return false;
1003 }
1004 } else {
1005 return false;
1007 }
1008 }
1009
1010 !parquet_meta.row_groups().is_empty()
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use std::sync::Arc;
1017
1018 use api::v1::OpType;
1019 use datatypes::arrow::array::{
1020 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
1021 };
1022 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
1023 use datatypes::prelude::ConcreteDataType;
1024 use datatypes::schema::ColumnSchema;
1025 use datatypes::value::ValueRef;
1026 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1027 use mito_codec::row_converter::{
1028 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1029 };
1030 use store_api::codec::PrimaryKeyEncoding;
1031 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1032 use store_api::storage::RegionId;
1033 use store_api::storage::consts::ReservedColumnId;
1034
1035 use super::*;
1036 use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1037 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1038
1039 const TEST_SEQUENCE: u64 = 1;
1040 const TEST_OP_TYPE: u8 = OpType::Put as u8;
1041
1042 fn build_test_region_metadata() -> RegionMetadataRef {
1043 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1044 builder
1045 .push_column_metadata(ColumnMetadata {
1046 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1047 semantic_type: SemanticType::Tag,
1048 column_id: 1,
1049 })
1050 .push_column_metadata(ColumnMetadata {
1051 column_schema: ColumnSchema::new(
1052 "field1",
1053 ConcreteDataType::int64_datatype(),
1054 true,
1055 ),
1056 semantic_type: SemanticType::Field,
1057 column_id: 4, })
1059 .push_column_metadata(ColumnMetadata {
1060 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1061 semantic_type: SemanticType::Tag,
1062 column_id: 3,
1063 })
1064 .push_column_metadata(ColumnMetadata {
1065 column_schema: ColumnSchema::new(
1066 "field0",
1067 ConcreteDataType::int64_datatype(),
1068 true,
1069 ),
1070 semantic_type: SemanticType::Field,
1071 column_id: 2,
1072 })
1073 .push_column_metadata(ColumnMetadata {
1074 column_schema: ColumnSchema::new(
1075 "ts",
1076 ConcreteDataType::timestamp_millisecond_datatype(),
1077 false,
1078 ),
1079 semantic_type: SemanticType::Timestamp,
1080 column_id: 5,
1081 })
1082 .primary_key(vec![1, 3]);
1083 Arc::new(builder.build().unwrap())
1084 }
1085
1086 fn build_test_arrow_schema() -> SchemaRef {
1087 let fields = vec![
1088 Field::new("field1", ArrowDataType::Int64, true),
1089 Field::new("field0", ArrowDataType::Int64, true),
1090 Field::new(
1091 "ts",
1092 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1093 false,
1094 ),
1095 Field::new(
1096 "__primary_key",
1097 ArrowDataType::Dictionary(
1098 Box::new(ArrowDataType::UInt32),
1099 Box::new(ArrowDataType::Binary),
1100 ),
1101 false,
1102 ),
1103 Field::new("__sequence", ArrowDataType::UInt64, false),
1104 Field::new("__op_type", ArrowDataType::UInt8, false),
1105 ];
1106 Arc::new(Schema::new(fields))
1107 }
1108
1109 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1110 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1111 }
1112
1113 fn new_batch_with_sequence(
1114 primary_key: &[u8],
1115 start_ts: i64,
1116 start_field: i64,
1117 num_rows: usize,
1118 sequence: u64,
1119 ) -> Batch {
1120 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1121 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1122 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1123 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1124 let fields = vec![
1125 BatchColumn {
1126 column_id: 4,
1127 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1128 }, BatchColumn {
1130 column_id: 2,
1131 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1132 }, ];
1134
1135 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1136 .with_fields(fields)
1137 .build()
1138 .unwrap()
1139 }
1140
1141 #[test]
1142 fn test_to_sst_arrow_schema() {
1143 let metadata = build_test_region_metadata();
1144 let write_format = PrimaryKeyWriteFormat::new(metadata);
1145 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1146 }
1147
1148 #[test]
1149 fn test_new_primary_key_array() {
1150 let array = new_primary_key_array(b"test", 3);
1151 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1152 assert_eq!(&expect, &array);
1153 }
1154
1155 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1156 let values = Arc::new(BinaryArray::from_iter_values(
1157 pk_row_nums.iter().map(|v| &v.0),
1158 ));
1159 let mut keys = vec![];
1160 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1161 keys.extend(std::iter::repeat_n(index as u32, num_rows));
1162 }
1163 let keys = UInt32Array::from(keys);
1164 Arc::new(DictionaryArray::new(keys, values))
1165 }
1166
1167 #[test]
1168 fn test_convert_batch() {
1169 let metadata = build_test_region_metadata();
1170 let write_format = PrimaryKeyWriteFormat::new(metadata);
1171
1172 let num_rows = 4;
1173 let batch = new_batch(b"test", 1, 2, num_rows);
1174 let columns: Vec<ArrayRef> = vec![
1175 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1182 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1183
1184 let actual = write_format.convert_batch(&batch).unwrap();
1185 assert_eq!(expect_record, actual);
1186 }
1187
1188 #[test]
1189 fn test_convert_batch_with_override_sequence() {
1190 let metadata = build_test_region_metadata();
1191 let write_format =
1192 PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1193
1194 let num_rows = 4;
1195 let batch = new_batch(b"test", 1, 2, num_rows);
1196 let columns: Vec<ArrayRef> = vec![
1197 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1204 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1205
1206 let actual = write_format.convert_batch(&batch).unwrap();
1207 assert_eq!(expect_record, actual);
1208 }
1209
1210 #[test]
1211 fn test_projection_indices() {
1212 let metadata = build_test_region_metadata();
1213 let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1215 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1216 let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1218 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1219 let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1221 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1222 let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1224 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1225 }
1226
1227 #[test]
1228 fn test_empty_primary_key_offsets() {
1229 let array = build_test_pk_array(&[]);
1230 assert!(primary_key_offsets(&array).unwrap().is_empty());
1231 }
1232
1233 #[test]
1234 fn test_primary_key_offsets_one_series() {
1235 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1236 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1237
1238 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1239 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1240
1241 let array = build_test_pk_array(&[
1242 (b"one".to_vec(), 1),
1243 (b"two".to_vec(), 1),
1244 (b"three".to_vec(), 1),
1245 ]);
1246 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1247 }
1248
1249 #[test]
1250 fn test_primary_key_offsets_multi_series() {
1251 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1252 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1253
1254 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1255 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1256
1257 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1258 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1259 }
1260
1261 #[test]
1262 fn test_convert_empty_record_batch() {
1263 let metadata = build_test_region_metadata();
1264 let arrow_schema = build_test_arrow_schema();
1265 let column_ids: Vec<_> = metadata
1266 .column_metadatas
1267 .iter()
1268 .map(|col| col.column_id)
1269 .collect();
1270 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1271 assert_eq!(arrow_schema, *read_format.arrow_schema());
1272
1273 let record_batch = RecordBatch::new_empty(arrow_schema);
1274 let mut batches = VecDeque::new();
1275 read_format
1276 .convert_record_batch(&record_batch, None, &mut batches)
1277 .unwrap();
1278 assert!(batches.is_empty());
1279 }
1280
1281 #[test]
1282 fn test_convert_record_batch() {
1283 let metadata = build_test_region_metadata();
1284 let column_ids: Vec<_> = metadata
1285 .column_metadatas
1286 .iter()
1287 .map(|col| col.column_id)
1288 .collect();
1289 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1290
1291 let columns: Vec<ArrayRef> = vec![
1292 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1299 let arrow_schema = build_test_arrow_schema();
1300 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1301 let mut batches = VecDeque::new();
1302 read_format
1303 .convert_record_batch(&record_batch, None, &mut batches)
1304 .unwrap();
1305
1306 assert_eq!(
1307 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1308 batches.into_iter().collect::<Vec<_>>(),
1309 );
1310 }
1311
1312 #[test]
1313 fn test_convert_record_batch_with_override_sequence() {
1314 let metadata = build_test_region_metadata();
1315 let column_ids: Vec<_> = metadata
1316 .column_metadatas
1317 .iter()
1318 .map(|col| col.column_id)
1319 .collect();
1320 let read_format =
1321 ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1322
1323 let columns: Vec<ArrayRef> = vec![
1324 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1331 let arrow_schema = build_test_arrow_schema();
1332 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1333
1334 let override_sequence: u64 = 12345;
1336 let override_sequence_array: ArrayRef =
1337 Arc::new(UInt64Array::from_value(override_sequence, 4));
1338
1339 let mut batches = VecDeque::new();
1340 read_format
1341 .as_primary_key()
1342 .unwrap()
1343 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1344 .unwrap();
1345
1346 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1348 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1349
1350 assert_eq!(
1351 vec![expected_batch1, expected_batch2],
1352 batches.into_iter().collect::<Vec<_>>(),
1353 );
1354 }
1355
1356 fn build_test_flat_sst_schema() -> SchemaRef {
1357 let fields = vec![
1358 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1360 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1362 Field::new(
1363 "ts",
1364 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1365 false,
1366 ),
1367 Field::new(
1368 "__primary_key",
1369 ArrowDataType::Dictionary(
1370 Box::new(ArrowDataType::UInt32),
1371 Box::new(ArrowDataType::Binary),
1372 ),
1373 false,
1374 ),
1375 Field::new("__sequence", ArrowDataType::UInt64, false),
1376 Field::new("__op_type", ArrowDataType::UInt8, false),
1377 ];
1378 Arc::new(Schema::new(fields))
1379 }
1380
1381 #[test]
1382 fn test_flat_to_sst_arrow_schema() {
1383 let metadata = build_test_region_metadata();
1384 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1385 assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1386 }
1387
1388 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1389 vec![
1390 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1399 }
1400
1401 #[test]
1402 fn test_flat_convert_batch() {
1403 let metadata = build_test_region_metadata();
1404 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1405
1406 let num_rows = 4;
1407 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1408 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1409 let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1410
1411 let actual = format.convert_batch(&batch).unwrap();
1412 assert_eq!(expect_record, actual);
1413 }
1414
1415 #[test]
1416 fn test_flat_convert_with_override_sequence() {
1417 let metadata = build_test_region_metadata();
1418 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1419 .with_override_sequence(Some(415411));
1420
1421 let num_rows = 4;
1422 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1423 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1424
1425 let expected_columns: Vec<ArrayRef> = vec![
1426 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1435 let expected_record =
1436 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1437
1438 let actual = format.convert_batch(&batch).unwrap();
1439 assert_eq!(expected_record, actual);
1440 }
1441
1442 #[test]
1443 fn test_flat_projection_indices() {
1444 let metadata = build_test_region_metadata();
1445 let read_format =
1450 ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1451 .unwrap();
1452 assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1453
1454 let read_format =
1456 ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1457 .unwrap();
1458 assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1459
1460 let read_format =
1462 ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1463 .unwrap();
1464 assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1465
1466 let read_format =
1468 ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1469 assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1470 }
1471
1472 #[test]
1473 fn test_flat_read_format_convert_batch() {
1474 let metadata = build_test_region_metadata();
1475 let mut format = FlatReadFormat::new(
1476 metadata,
1477 std::iter::once(1), Some(8),
1479 "test",
1480 false,
1481 )
1482 .unwrap();
1483
1484 let num_rows = 4;
1485 let original_sequence = 100u64;
1486 let override_sequence = 200u64;
1487
1488 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1490 let mut test_columns = columns.clone();
1491 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1493 let record_batch =
1494 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1495
1496 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1498 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1499 let sequence_array = sequence_column
1500 .as_any()
1501 .downcast_ref::<UInt64Array>()
1502 .unwrap();
1503
1504 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1505 assert_eq!(sequence_array, &expected_original);
1506
1507 format.set_override_sequence(Some(override_sequence));
1509 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1510 let result = format
1511 .convert_batch(record_batch, Some(&override_sequence_array))
1512 .unwrap();
1513 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1514 let sequence_array = sequence_column
1515 .as_any()
1516 .downcast_ref::<UInt64Array>()
1517 .unwrap();
1518
1519 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1520 assert_eq!(sequence_array, &expected_override);
1521 }
1522
1523 #[test]
1524 fn test_need_convert_to_flat() {
1525 let metadata = build_test_region_metadata();
1526
1527 let expected_columns = metadata.column_metadatas.len() + 3;
1530 let result =
1531 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1532 assert!(
1533 !result,
1534 "Should not need conversion when column counts match"
1535 );
1536
1537 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1540 let result =
1541 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1542 .unwrap();
1543 assert!(
1544 result,
1545 "Should need conversion when primary key columns are missing"
1546 );
1547
1548 let too_many_columns = expected_columns + 1;
1550 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1551 .unwrap_err();
1552 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1553
1554 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1557 .unwrap_err();
1558 assert!(
1559 err.to_string().contains("Column number difference"),
1560 "{err:?}"
1561 );
1562 }
1563
1564 fn build_test_dense_pk_array(
1565 codec: &DensePrimaryKeyCodec,
1566 pk_values_per_row: &[&[Option<i64>]],
1567 ) -> Arc<PrimaryKeyArray> {
1568 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1569
1570 for pk_values_row in pk_values_per_row {
1571 let values: Vec<ValueRef> = pk_values_row
1572 .iter()
1573 .map(|opt| match opt {
1574 Some(val) => ValueRef::Int64(*val),
1575 None => ValueRef::Null,
1576 })
1577 .collect();
1578
1579 let encoded = codec.encode(values.into_iter()).unwrap();
1580 builder.append_value(&encoded);
1581 }
1582
1583 Arc::new(builder.finish())
1584 }
1585
1586 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1587 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1588 builder
1589 .push_column_metadata(ColumnMetadata {
1590 column_schema: ColumnSchema::new(
1591 "__table_id",
1592 ConcreteDataType::uint32_datatype(),
1593 false,
1594 ),
1595 semantic_type: SemanticType::Tag,
1596 column_id: ReservedColumnId::table_id(),
1597 })
1598 .push_column_metadata(ColumnMetadata {
1599 column_schema: ColumnSchema::new(
1600 "__tsid",
1601 ConcreteDataType::uint64_datatype(),
1602 false,
1603 ),
1604 semantic_type: SemanticType::Tag,
1605 column_id: ReservedColumnId::tsid(),
1606 })
1607 .push_column_metadata(ColumnMetadata {
1608 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1609 semantic_type: SemanticType::Tag,
1610 column_id: 1,
1611 })
1612 .push_column_metadata(ColumnMetadata {
1613 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1614 semantic_type: SemanticType::Tag,
1615 column_id: 3,
1616 })
1617 .push_column_metadata(ColumnMetadata {
1618 column_schema: ColumnSchema::new(
1619 "field1",
1620 ConcreteDataType::int64_datatype(),
1621 true,
1622 ),
1623 semantic_type: SemanticType::Field,
1624 column_id: 4,
1625 })
1626 .push_column_metadata(ColumnMetadata {
1627 column_schema: ColumnSchema::new(
1628 "field0",
1629 ConcreteDataType::int64_datatype(),
1630 true,
1631 ),
1632 semantic_type: SemanticType::Field,
1633 column_id: 2,
1634 })
1635 .push_column_metadata(ColumnMetadata {
1636 column_schema: ColumnSchema::new(
1637 "ts",
1638 ConcreteDataType::timestamp_millisecond_datatype(),
1639 false,
1640 ),
1641 semantic_type: SemanticType::Timestamp,
1642 column_id: 5,
1643 })
1644 .primary_key(vec![
1645 ReservedColumnId::table_id(),
1646 ReservedColumnId::tsid(),
1647 1,
1648 3,
1649 ])
1650 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1651 Arc::new(builder.build().unwrap())
1652 }
1653
1654 fn build_test_sparse_pk_array(
1655 codec: &SparsePrimaryKeyCodec,
1656 pk_values_per_row: &[SparseTestRow],
1657 ) -> Arc<PrimaryKeyArray> {
1658 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1659 for row in pk_values_per_row {
1660 let values = vec![
1661 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1662 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1663 (1, ValueRef::String(&row.tag0)),
1664 (3, ValueRef::String(&row.tag1)),
1665 ];
1666
1667 let mut buffer = Vec::new();
1668 codec.encode_value_refs(&values, &mut buffer).unwrap();
1669 builder.append_value(&buffer);
1670 }
1671
1672 Arc::new(builder.finish())
1673 }
1674
1675 #[derive(Clone)]
1676 struct SparseTestRow {
1677 table_id: u32,
1678 tsid: u64,
1679 tag0: String,
1680 tag1: String,
1681 }
1682
1683 #[test]
1684 fn test_flat_read_format_convert_format_with_dense_encoding() {
1685 let metadata = build_test_region_metadata();
1686
1687 let column_ids: Vec<_> = metadata
1688 .column_metadatas
1689 .iter()
1690 .map(|c| c.column_id)
1691 .collect();
1692 let format = FlatReadFormat::new(
1693 metadata.clone(),
1694 column_ids.into_iter(),
1695 Some(6),
1696 "test",
1697 false,
1698 )
1699 .unwrap();
1700
1701 let num_rows = 4;
1702 let original_sequence = 100u64;
1703
1704 let pk_values_per_row = vec![
1706 &[Some(1i64), Some(1i64)][..]; num_rows ];
1708
1709 let codec = DensePrimaryKeyCodec::new(&metadata);
1711 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1712 let columns: Vec<ArrayRef> = vec![
1713 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1720
1721 let old_format_fields = vec![
1723 Field::new("field1", ArrowDataType::Int64, true),
1724 Field::new("field0", ArrowDataType::Int64, true),
1725 Field::new(
1726 "ts",
1727 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1728 false,
1729 ),
1730 Field::new(
1731 "__primary_key",
1732 ArrowDataType::Dictionary(
1733 Box::new(ArrowDataType::UInt32),
1734 Box::new(ArrowDataType::Binary),
1735 ),
1736 false,
1737 ),
1738 Field::new("__sequence", ArrowDataType::UInt64, false),
1739 Field::new("__op_type", ArrowDataType::UInt8, false),
1740 ];
1741 let old_schema = Arc::new(Schema::new(old_format_fields));
1742 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1743
1744 let result = format.convert_batch(record_batch, None).unwrap();
1746
1747 let expected_columns: Vec<ArrayRef> = vec![
1749 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1758 let expected_record_batch =
1759 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1760
1761 assert_eq!(expected_record_batch, result);
1763 }
1764
1765 #[test]
1766 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1767 let metadata = build_test_sparse_region_metadata();
1768
1769 let column_ids: Vec<_> = metadata
1770 .column_metadatas
1771 .iter()
1772 .map(|c| c.column_id)
1773 .collect();
1774 let format = FlatReadFormat::new(
1775 metadata.clone(),
1776 column_ids.clone().into_iter(),
1777 None,
1778 "test",
1779 false,
1780 )
1781 .unwrap();
1782
1783 let num_rows = 4;
1784 let original_sequence = 100u64;
1785
1786 let pk_test_rows = vec![
1788 SparseTestRow {
1789 table_id: 1,
1790 tsid: 123,
1791 tag0: "frontend".to_string(),
1792 tag1: "pod1".to_string(),
1793 };
1794 num_rows
1795 ];
1796
1797 let codec = SparsePrimaryKeyCodec::new(&metadata);
1798 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1799 let columns: Vec<ArrayRef> = vec![
1801 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1808
1809 let old_format_fields = vec![
1811 Field::new("field1", ArrowDataType::Int64, true),
1812 Field::new("field0", ArrowDataType::Int64, true),
1813 Field::new(
1814 "ts",
1815 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1816 false,
1817 ),
1818 Field::new(
1819 "__primary_key",
1820 ArrowDataType::Dictionary(
1821 Box::new(ArrowDataType::UInt32),
1822 Box::new(ArrowDataType::Binary),
1823 ),
1824 false,
1825 ),
1826 Field::new("__sequence", ArrowDataType::UInt64, false),
1827 Field::new("__op_type", ArrowDataType::UInt8, false),
1828 ];
1829 let old_schema = Arc::new(Schema::new(old_format_fields));
1830 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1831
1832 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1834
1835 let tag0_array = Arc::new(DictionaryArray::new(
1837 UInt32Array::from(vec![0; num_rows]),
1838 Arc::new(StringArray::from(vec!["frontend"])),
1839 ));
1840 let tag1_array = Arc::new(DictionaryArray::new(
1841 UInt32Array::from(vec![0; num_rows]),
1842 Arc::new(StringArray::from(vec!["pod1"])),
1843 ));
1844 let expected_columns: Vec<ArrayRef> = vec![
1845 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1856 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1857 let expected_record_batch =
1858 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1859
1860 assert_eq!(expected_record_batch, result);
1862
1863 let format =
1864 FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1865 .unwrap();
1866 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1868 assert_eq!(record_batch, result);
1869 }
1870}