1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt32Array, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::{Helper, Vector};
43use mito_codec::row_converter::{
44 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45 build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54 ConvertVectorSnafu, DecodeSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu,
55 NewRecordBatchSnafu, Result,
56};
57use crate::read::{Batch, BatchBuilder, BatchColumn};
58use crate::sst::file::{FileMeta, FileTimeRange};
59use crate::sst::parquet::flat_format::FlatReadFormat;
60use crate::sst::to_sst_arrow_schema;
61
62pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
64pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
66
67pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
71pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
73
74pub(crate) struct PrimaryKeyWriteFormat {
76 metadata: RegionMetadataRef,
77 arrow_schema: SchemaRef,
79 override_sequence: Option<SequenceNumber>,
80}
81
82impl PrimaryKeyWriteFormat {
83 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
85 let arrow_schema = to_sst_arrow_schema(&metadata);
86 PrimaryKeyWriteFormat {
87 metadata,
88 arrow_schema,
89 override_sequence: None,
90 }
91 }
92
93 pub(crate) fn with_override_sequence(
95 mut self,
96 override_sequence: Option<SequenceNumber>,
97 ) -> Self {
98 self.override_sequence = override_sequence;
99 self
100 }
101
102 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
104 &self.arrow_schema
105 }
106
107 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
109 debug_assert_eq!(
110 batch.fields().len() + FIXED_POS_COLUMN_NUM,
111 self.arrow_schema.fields().len()
112 );
113 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
114 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
116 ensure!(
117 column.column_id == column_metadata.column_id,
118 InvalidBatchSnafu {
119 reason: format!(
120 "Batch has column {} but metadata has column {}",
121 column.column_id, column_metadata.column_id
122 ),
123 }
124 );
125
126 columns.push(column.data.to_arrow_array());
127 }
128 columns.push(batch.timestamps().to_arrow_array());
130 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
132
133 if let Some(override_sequence) = self.override_sequence {
134 let sequence_array =
135 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
136 columns.push(sequence_array);
137 } else {
138 columns.push(batch.sequences().to_arrow_array());
139 }
140 columns.push(batch.op_types().to_arrow_array());
141
142 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
143 }
144}
145
146pub enum ReadFormat {
148 PrimaryKey(PrimaryKeyReadFormat),
150 Flat(FlatReadFormat),
152}
153
154impl ReadFormat {
155 pub fn new_primary_key(
157 metadata: RegionMetadataRef,
158 column_ids: impl Iterator<Item = ColumnId>,
159 ) -> Self {
160 ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
161 }
162
163 pub fn new_flat(
165 metadata: RegionMetadataRef,
166 column_ids: impl Iterator<Item = ColumnId>,
167 num_columns: Option<usize>,
168 file_path: &str,
169 skip_auto_convert: bool,
170 ) -> Result<Self> {
171 Ok(ReadFormat::Flat(FlatReadFormat::new(
172 metadata,
173 column_ids,
174 num_columns,
175 file_path,
176 skip_auto_convert,
177 )?))
178 }
179
180 pub fn new(
182 region_metadata: RegionMetadataRef,
183 projection: Option<&[ColumnId]>,
184 flat_format: bool,
185 num_columns: Option<usize>,
186 file_path: &str,
187 skip_auto_convert: bool,
188 ) -> Result<ReadFormat> {
189 if flat_format {
190 if let Some(column_ids) = projection {
191 ReadFormat::new_flat(
192 region_metadata,
193 column_ids.iter().copied(),
194 num_columns,
195 file_path,
196 skip_auto_convert,
197 )
198 } else {
199 ReadFormat::new_flat(
201 region_metadata.clone(),
202 region_metadata
203 .column_metadatas
204 .iter()
205 .map(|col| col.column_id),
206 num_columns,
207 file_path,
208 skip_auto_convert,
209 )
210 }
211 } else if let Some(column_ids) = projection {
212 Ok(ReadFormat::new_primary_key(
213 region_metadata,
214 column_ids.iter().copied(),
215 ))
216 } else {
217 Ok(ReadFormat::new_primary_key(
219 region_metadata.clone(),
220 region_metadata
221 .column_metadatas
222 .iter()
223 .map(|col| col.column_id),
224 ))
225 }
226 }
227
228 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
229 match self {
230 ReadFormat::PrimaryKey(format) => Some(format),
231 _ => None,
232 }
233 }
234
235 pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
236 match self {
237 ReadFormat::Flat(format) => Some(format),
238 _ => None,
239 }
240 }
241
242 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
247 match self {
248 ReadFormat::PrimaryKey(format) => format.arrow_schema(),
249 ReadFormat::Flat(format) => format.arrow_schema(),
250 }
251 }
252
253 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
255 match self {
256 ReadFormat::PrimaryKey(format) => format.metadata(),
257 ReadFormat::Flat(format) => format.metadata(),
258 }
259 }
260
261 pub(crate) fn projection_indices(&self) -> &[usize] {
263 match self {
264 ReadFormat::PrimaryKey(format) => format.projection_indices(),
265 ReadFormat::Flat(format) => format.projection_indices(),
266 }
267 }
268
269 pub fn min_values(
271 &self,
272 row_groups: &[impl Borrow<RowGroupMetaData>],
273 column_id: ColumnId,
274 ) -> StatValues {
275 match self {
276 ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
277 ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
278 }
279 }
280
281 pub fn max_values(
283 &self,
284 row_groups: &[impl Borrow<RowGroupMetaData>],
285 column_id: ColumnId,
286 ) -> StatValues {
287 match self {
288 ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
289 ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
290 }
291 }
292
293 pub fn null_counts(
295 &self,
296 row_groups: &[impl Borrow<RowGroupMetaData>],
297 column_id: ColumnId,
298 ) -> StatValues {
299 match self {
300 ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
301 ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
302 }
303 }
304
305 pub(crate) fn column_values(
309 row_groups: &[impl Borrow<RowGroupMetaData>],
310 column: &ColumnMetadata,
311 column_index: usize,
312 is_min: bool,
313 ) -> Option<ArrayRef> {
314 let null_scalar: ScalarValue = column
315 .column_schema
316 .data_type
317 .as_arrow_type()
318 .try_into()
319 .ok()?;
320 let scalar_values = row_groups
321 .iter()
322 .map(|meta| {
323 let stats = meta.borrow().column(column_index).statistics()?;
324 match stats {
325 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
326 *s.min_opt()?
327 } else {
328 *s.max_opt()?
329 }))),
330 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
331 *s.min_opt()?
332 } else {
333 *s.max_opt()?
334 }))),
335 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
336 *s.min_opt()?
337 } else {
338 *s.max_opt()?
339 }))),
340
341 Statistics::Int96(_) => None,
342 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
343 *s.min_opt()?
344 } else {
345 *s.max_opt()?
346 }))),
347 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
348 *s.min_opt()?
349 } else {
350 *s.max_opt()?
351 }))),
352 Statistics::ByteArray(s) => {
353 let bytes = if is_min {
354 s.min_bytes_opt()?
355 } else {
356 s.max_bytes_opt()?
357 };
358 let s = String::from_utf8(bytes.to_vec()).ok();
359 Some(ScalarValue::Utf8(s))
360 }
361
362 Statistics::FixedLenByteArray(_) => None,
363 }
364 })
365 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
366 .collect::<Vec<ScalarValue>>();
367 debug_assert_eq!(scalar_values.len(), row_groups.len());
368 ScalarValue::iter_to_array(scalar_values).ok()
369 }
370
371 pub(crate) fn column_null_counts(
374 row_groups: &[impl Borrow<RowGroupMetaData>],
375 column_index: usize,
376 ) -> Option<ArrayRef> {
377 let values = row_groups.iter().map(|meta| {
378 let col = meta.borrow().column(column_index);
379 let stat = col.statistics()?;
380 stat.null_count_opt()
381 });
382 Some(Arc::new(UInt64Array::from_iter(values)))
383 }
384
385 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
387 match self {
388 ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
389 ReadFormat::Flat(format) => format.set_override_sequence(sequence),
390 }
391 }
392
393 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
395 if let ReadFormat::PrimaryKey(format) = self {
396 format.set_decode_primary_key_values(decode);
397 }
398 }
399
400 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
402 match self {
403 ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
404 ReadFormat::Flat(format) => format.new_override_sequence_array(length),
405 }
406 }
407}
408
409pub struct PrimaryKeyReadFormat {
411 metadata: RegionMetadataRef,
413 arrow_schema: SchemaRef,
415 field_id_to_index: HashMap<ColumnId, usize>,
418 projection_indices: Vec<usize>,
420 field_id_to_projected_index: HashMap<ColumnId, usize>,
423 override_sequence: Option<SequenceNumber>,
425 primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
427}
428
429impl PrimaryKeyReadFormat {
430 pub fn new(
432 metadata: RegionMetadataRef,
433 column_ids: impl Iterator<Item = ColumnId>,
434 ) -> PrimaryKeyReadFormat {
435 let field_id_to_index: HashMap<_, _> = metadata
436 .field_columns()
437 .enumerate()
438 .map(|(index, column)| (column.column_id, index))
439 .collect();
440 let arrow_schema = to_sst_arrow_schema(&metadata);
441
442 let format_projection = FormatProjection::compute_format_projection(
443 &field_id_to_index,
444 arrow_schema.fields.len(),
445 column_ids,
446 );
447
448 PrimaryKeyReadFormat {
449 metadata,
450 arrow_schema,
451 field_id_to_index,
452 projection_indices: format_projection.projection_indices,
453 field_id_to_projected_index: format_projection.column_id_to_projected_index,
454 override_sequence: None,
455 primary_key_codec: None,
456 }
457 }
458
459 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
461 self.override_sequence = sequence;
462 }
463
464 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
466 self.primary_key_codec = if decode {
467 Some(build_primary_key_codec(&self.metadata))
468 } else {
469 None
470 };
471 }
472
473 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
478 &self.arrow_schema
479 }
480
481 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
483 &self.metadata
484 }
485
486 pub(crate) fn projection_indices(&self) -> &[usize] {
488 &self.projection_indices
489 }
490
491 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
493 self.override_sequence
494 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
495 }
496
497 pub fn convert_record_batch(
502 &self,
503 record_batch: &RecordBatch,
504 override_sequence_array: Option<&ArrayRef>,
505 batches: &mut VecDeque<Batch>,
506 ) -> Result<()> {
507 debug_assert!(batches.is_empty());
508
509 ensure!(
511 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
512 InvalidRecordBatchSnafu {
513 reason: format!(
514 "record batch only has {} columns",
515 record_batch.num_columns()
516 ),
517 }
518 );
519
520 let mut fixed_pos_columns = record_batch
521 .columns()
522 .iter()
523 .rev()
524 .take(FIXED_POS_COLUMN_NUM);
525 let op_type_array = fixed_pos_columns.next().unwrap();
527 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
528 let pk_array = fixed_pos_columns.next().unwrap();
529 let ts_array = fixed_pos_columns.next().unwrap();
530 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
531
532 if let Some(override_array) = override_sequence_array {
534 assert!(override_array.len() >= sequence_array.len());
535 sequence_array = if override_array.len() > sequence_array.len() {
538 override_array.slice(0, sequence_array.len())
539 } else {
540 override_array.clone()
541 };
542 }
543
544 let pk_dict_array = pk_array
546 .as_any()
547 .downcast_ref::<PrimaryKeyArray>()
548 .with_context(|| InvalidRecordBatchSnafu {
549 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
550 })?;
551 let offsets = primary_key_offsets(pk_dict_array)?;
552 if offsets.is_empty() {
553 return Ok(());
554 }
555
556 let keys = pk_dict_array.keys();
558 let pk_values = pk_dict_array
559 .values()
560 .as_any()
561 .downcast_ref::<BinaryArray>()
562 .with_context(|| InvalidRecordBatchSnafu {
563 reason: format!(
564 "values of primary key array should not be {:?}",
565 pk_dict_array.values().data_type()
566 ),
567 })?;
568 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
569 let end = offsets[i + 1];
570 let rows_in_batch = end - start;
571 let dict_key = keys.value(*start);
572 let primary_key = pk_values.value(dict_key as usize).to_vec();
573
574 let mut builder = BatchBuilder::new(primary_key);
575 builder
576 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
577 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
578 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
579 for batch_column in &field_batch_columns {
581 builder.push_field(BatchColumn {
582 column_id: batch_column.column_id,
583 data: batch_column.data.slice(*start, rows_in_batch),
584 });
585 }
586
587 let mut batch = builder.build()?;
588 if let Some(codec) = &self.primary_key_codec {
589 let pk_values: CompositeValues =
590 codec.decode(batch.primary_key()).context(DecodeSnafu)?;
591 batch.set_pk_values(pk_values);
592 }
593 batches.push_back(batch);
594 }
595
596 Ok(())
597 }
598
599 pub fn min_values(
601 &self,
602 row_groups: &[impl Borrow<RowGroupMetaData>],
603 column_id: ColumnId,
604 ) -> StatValues {
605 let Some(column) = self.metadata.column_by_id(column_id) else {
606 return StatValues::NoColumn;
608 };
609 match column.semantic_type {
610 SemanticType::Tag => self.tag_values(row_groups, column, true),
611 SemanticType::Field => {
612 let index = self.field_id_to_index.get(&column_id).unwrap();
614 let stats = ReadFormat::column_values(row_groups, column, *index, true);
615 StatValues::from_stats_opt(stats)
616 }
617 SemanticType::Timestamp => {
618 let index = self.time_index_position();
619 let stats = ReadFormat::column_values(row_groups, column, index, true);
620 StatValues::from_stats_opt(stats)
621 }
622 }
623 }
624
625 pub fn max_values(
627 &self,
628 row_groups: &[impl Borrow<RowGroupMetaData>],
629 column_id: ColumnId,
630 ) -> StatValues {
631 let Some(column) = self.metadata.column_by_id(column_id) else {
632 return StatValues::NoColumn;
634 };
635 match column.semantic_type {
636 SemanticType::Tag => self.tag_values(row_groups, column, false),
637 SemanticType::Field => {
638 let index = self.field_id_to_index.get(&column_id).unwrap();
640 let stats = ReadFormat::column_values(row_groups, column, *index, false);
641 StatValues::from_stats_opt(stats)
642 }
643 SemanticType::Timestamp => {
644 let index = self.time_index_position();
645 let stats = ReadFormat::column_values(row_groups, column, index, false);
646 StatValues::from_stats_opt(stats)
647 }
648 }
649 }
650
651 pub fn null_counts(
653 &self,
654 row_groups: &[impl Borrow<RowGroupMetaData>],
655 column_id: ColumnId,
656 ) -> StatValues {
657 let Some(column) = self.metadata.column_by_id(column_id) else {
658 return StatValues::NoColumn;
660 };
661 match column.semantic_type {
662 SemanticType::Tag => StatValues::NoStats,
663 SemanticType::Field => {
664 let index = self.field_id_to_index.get(&column_id).unwrap();
666 let stats = ReadFormat::column_null_counts(row_groups, *index);
667 StatValues::from_stats_opt(stats)
668 }
669 SemanticType::Timestamp => {
670 let index = self.time_index_position();
671 let stats = ReadFormat::column_null_counts(row_groups, index);
672 StatValues::from_stats_opt(stats)
673 }
674 }
675 }
676
677 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
679 record_batch
680 .columns()
681 .iter()
682 .zip(record_batch.schema().fields())
683 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
685 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
686 let column = self
687 .metadata
688 .column_by_name(field.name())
689 .with_context(|| InvalidRecordBatchSnafu {
690 reason: format!("column {} not found in metadata", field.name()),
691 })?;
692
693 Ok(BatchColumn {
694 column_id: column.column_id,
695 data: vector,
696 })
697 })
698 .collect()
699 }
700
701 fn tag_values(
703 &self,
704 row_groups: &[impl Borrow<RowGroupMetaData>],
705 column: &ColumnMetadata,
706 is_min: bool,
707 ) -> StatValues {
708 let is_first_tag = self
709 .metadata
710 .primary_key
711 .first()
712 .map(|id| *id == column.column_id)
713 .unwrap_or(false);
714 if !is_first_tag {
715 return StatValues::NoStats;
717 }
718
719 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
720 }
721
722 fn first_tag_values(
725 &self,
726 row_groups: &[impl Borrow<RowGroupMetaData>],
727 column: &ColumnMetadata,
728 is_min: bool,
729 ) -> Option<ArrayRef> {
730 debug_assert!(
731 self.metadata
732 .primary_key
733 .first()
734 .map(|id| *id == column.column_id)
735 .unwrap_or(false)
736 );
737
738 let primary_key_encoding = self.metadata.primary_key_encoding;
739 let converter = build_primary_key_codec_with_fields(
740 primary_key_encoding,
741 [(
742 column.column_id,
743 SortField::new(column.column_schema.data_type.clone()),
744 )]
745 .into_iter(),
746 );
747
748 let values = row_groups.iter().map(|meta| {
749 let stats = meta
750 .borrow()
751 .column(self.primary_key_position())
752 .statistics()?;
753 match stats {
754 Statistics::Boolean(_) => None,
755 Statistics::Int32(_) => None,
756 Statistics::Int64(_) => None,
757 Statistics::Int96(_) => None,
758 Statistics::Float(_) => None,
759 Statistics::Double(_) => None,
760 Statistics::ByteArray(s) => {
761 let bytes = if is_min {
762 s.min_bytes_opt()?
763 } else {
764 s.max_bytes_opt()?
765 };
766 converter.decode_leftmost(bytes).ok()?
767 }
768 Statistics::FixedLenByteArray(_) => None,
769 }
770 });
771 let mut builder = column
772 .column_schema
773 .data_type
774 .create_mutable_vector(row_groups.len());
775 for value_opt in values {
776 match value_opt {
777 Some(v) => builder.push_value_ref(&v.as_value_ref()),
779 None => builder.push_null(),
780 }
781 }
782 let vector = builder.to_vector();
783
784 Some(vector.to_arrow_array())
785 }
786
787 fn primary_key_position(&self) -> usize {
789 self.arrow_schema.fields.len() - 3
790 }
791
792 fn time_index_position(&self) -> usize {
794 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
795 }
796
797 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
799 self.field_id_to_projected_index.get(&column_id).copied()
800 }
801}
802
803pub(crate) struct FormatProjection {
805 pub(crate) projection_indices: Vec<usize>,
807 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
812}
813
814impl FormatProjection {
815 pub(crate) fn compute_format_projection(
819 id_to_index: &HashMap<ColumnId, usize>,
820 sst_column_num: usize,
821 column_ids: impl Iterator<Item = ColumnId>,
822 ) -> Self {
823 let mut projected_schema: Vec<_> = column_ids
827 .filter_map(|column_id| {
828 id_to_index
829 .get(&column_id)
830 .copied()
831 .map(|index| (column_id, index))
832 })
833 .collect();
834 projected_schema.sort_unstable_by_key(|x| x.1);
837 projected_schema.dedup_by_key(|x| x.1);
839
840 let mut projection_indices: Vec<_> = projected_schema
843 .iter()
844 .map(|(_column_id, index)| *index)
845 .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
847 .collect();
848 projection_indices.sort_unstable();
849 projection_indices.dedup();
851
852 let column_id_to_projected_index = projected_schema
854 .into_iter()
855 .map(|(column_id, _)| column_id)
856 .enumerate()
857 .map(|(index, column_id)| (column_id, index))
858 .collect();
859
860 Self {
861 projection_indices,
862 column_id_to_projected_index,
863 }
864 }
865}
866
867pub enum StatValues {
872 Values(ArrayRef),
874 NoColumn,
876 NoStats,
878}
879
880impl StatValues {
881 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
883 match stats {
884 Some(stats) => StatValues::Values(stats),
885 None => StatValues::NoStats,
886 }
887 }
888}
889
890#[cfg(test)]
891impl PrimaryKeyReadFormat {
892 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
894 Self::new(
895 Arc::clone(&metadata),
896 metadata.column_metadatas.iter().map(|c| c.column_id),
897 )
898 }
899}
900
901fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
903 if pk_dict_array.is_empty() {
904 return Ok(Vec::new());
905 }
906
907 let mut offsets = vec![0];
909 let keys = pk_dict_array.keys();
910 let pk_indices = keys.values();
912 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
913 if *key != pk_indices[i + 1] {
915 offsets.push(i + 1);
917 }
918 }
919 offsets.push(keys.len());
920
921 Ok(offsets)
922}
923
924fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
926 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
927 let keys = UInt32Array::from_value(0, num_rows);
928
929 Arc::new(DictionaryArray::new(keys, values))
931}
932
933pub(crate) fn parquet_row_group_time_range(
936 file_meta: &FileMeta,
937 parquet_meta: &ParquetMetaData,
938 row_group_idx: usize,
939) -> Option<FileTimeRange> {
940 let row_group_meta = parquet_meta.row_group(row_group_idx);
941 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
942 assert!(
943 num_columns >= FIXED_POS_COLUMN_NUM,
944 "file only has {} columns",
945 num_columns
946 );
947 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
948
949 let stats = row_group_meta.column(time_index_pos).statistics()?;
950 let (min, max) = match stats {
952 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
953 Statistics::Int32(_)
954 | Statistics::Boolean(_)
955 | Statistics::Int96(_)
956 | Statistics::Float(_)
957 | Statistics::Double(_)
958 | Statistics::ByteArray(_)
959 | Statistics::FixedLenByteArray(_) => {
960 common_telemetry::warn!(
961 "Invalid statistics {:?} for time index in parquet in {}",
962 stats,
963 file_meta.file_id
964 );
965 return None;
966 }
967 };
968
969 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
970 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
971 let unit = file_meta.time_range.0.unit();
972
973 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
974}
975
976pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
979 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
980 if num_columns < FIXED_POS_COLUMN_NUM {
981 return false;
982 }
983
984 let sequence_pos = num_columns - 2;
986
987 for row_group in parquet_meta.row_groups() {
989 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
990 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
991 if *min_val != 0 || *max_val != 0 {
993 return false;
994 }
995 } else {
996 return false;
998 }
999 } else {
1000 return false;
1002 }
1003 }
1004
1005 !parquet_meta.row_groups().is_empty()
1007}
1008
1009#[cfg(test)]
1010mod tests {
1011 use std::sync::Arc;
1012
1013 use api::v1::OpType;
1014 use datatypes::arrow::array::{
1015 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt64Array,
1016 };
1017 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
1018 use datatypes::prelude::ConcreteDataType;
1019 use datatypes::schema::ColumnSchema;
1020 use datatypes::value::ValueRef;
1021 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1022 use mito_codec::row_converter::{
1023 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1024 };
1025 use store_api::codec::PrimaryKeyEncoding;
1026 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1027 use store_api::storage::RegionId;
1028 use store_api::storage::consts::ReservedColumnId;
1029
1030 use super::*;
1031 use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1032 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1033
1034 const TEST_SEQUENCE: u64 = 1;
1035 const TEST_OP_TYPE: u8 = OpType::Put as u8;
1036
1037 fn build_test_region_metadata() -> RegionMetadataRef {
1038 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1039 builder
1040 .push_column_metadata(ColumnMetadata {
1041 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1042 semantic_type: SemanticType::Tag,
1043 column_id: 1,
1044 })
1045 .push_column_metadata(ColumnMetadata {
1046 column_schema: ColumnSchema::new(
1047 "field1",
1048 ConcreteDataType::int64_datatype(),
1049 true,
1050 ),
1051 semantic_type: SemanticType::Field,
1052 column_id: 4, })
1054 .push_column_metadata(ColumnMetadata {
1055 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1056 semantic_type: SemanticType::Tag,
1057 column_id: 3,
1058 })
1059 .push_column_metadata(ColumnMetadata {
1060 column_schema: ColumnSchema::new(
1061 "field0",
1062 ConcreteDataType::int64_datatype(),
1063 true,
1064 ),
1065 semantic_type: SemanticType::Field,
1066 column_id: 2,
1067 })
1068 .push_column_metadata(ColumnMetadata {
1069 column_schema: ColumnSchema::new(
1070 "ts",
1071 ConcreteDataType::timestamp_millisecond_datatype(),
1072 false,
1073 ),
1074 semantic_type: SemanticType::Timestamp,
1075 column_id: 5,
1076 })
1077 .primary_key(vec![1, 3]);
1078 Arc::new(builder.build().unwrap())
1079 }
1080
1081 fn build_test_arrow_schema() -> SchemaRef {
1082 let fields = vec![
1083 Field::new("field1", ArrowDataType::Int64, true),
1084 Field::new("field0", ArrowDataType::Int64, true),
1085 Field::new(
1086 "ts",
1087 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1088 false,
1089 ),
1090 Field::new(
1091 "__primary_key",
1092 ArrowDataType::Dictionary(
1093 Box::new(ArrowDataType::UInt32),
1094 Box::new(ArrowDataType::Binary),
1095 ),
1096 false,
1097 ),
1098 Field::new("__sequence", ArrowDataType::UInt64, false),
1099 Field::new("__op_type", ArrowDataType::UInt8, false),
1100 ];
1101 Arc::new(Schema::new(fields))
1102 }
1103
1104 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1105 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1106 }
1107
1108 fn new_batch_with_sequence(
1109 primary_key: &[u8],
1110 start_ts: i64,
1111 start_field: i64,
1112 num_rows: usize,
1113 sequence: u64,
1114 ) -> Batch {
1115 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1116 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1117 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1118 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1119 let fields = vec![
1120 BatchColumn {
1121 column_id: 4,
1122 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1123 }, BatchColumn {
1125 column_id: 2,
1126 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1127 }, ];
1129
1130 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1131 .with_fields(fields)
1132 .build()
1133 .unwrap()
1134 }
1135
1136 #[test]
1137 fn test_to_sst_arrow_schema() {
1138 let metadata = build_test_region_metadata();
1139 let write_format = PrimaryKeyWriteFormat::new(metadata);
1140 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1141 }
1142
1143 #[test]
1144 fn test_new_primary_key_array() {
1145 let array = new_primary_key_array(b"test", 3);
1146 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
1147 assert_eq!(&expect, &array);
1148 }
1149
1150 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1151 let values = Arc::new(BinaryArray::from_iter_values(
1152 pk_row_nums.iter().map(|v| &v.0),
1153 ));
1154 let mut keys = vec![];
1155 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1156 keys.extend(std::iter::repeat_n(index as u32, num_rows));
1157 }
1158 let keys = UInt32Array::from(keys);
1159 Arc::new(DictionaryArray::new(keys, values))
1160 }
1161
1162 #[test]
1163 fn test_convert_batch() {
1164 let metadata = build_test_region_metadata();
1165 let write_format = PrimaryKeyWriteFormat::new(metadata);
1166
1167 let num_rows = 4;
1168 let batch = new_batch(b"test", 1, 2, num_rows);
1169 let columns: Vec<ArrayRef> = vec![
1170 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1177 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1178
1179 let actual = write_format.convert_batch(&batch).unwrap();
1180 assert_eq!(expect_record, actual);
1181 }
1182
1183 #[test]
1184 fn test_convert_batch_with_override_sequence() {
1185 let metadata = build_test_region_metadata();
1186 let write_format =
1187 PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(415411));
1188
1189 let num_rows = 4;
1190 let batch = new_batch(b"test", 1, 2, num_rows);
1191 let columns: Vec<ArrayRef> = vec![
1192 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1199 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
1200
1201 let actual = write_format.convert_batch(&batch).unwrap();
1202 assert_eq!(expect_record, actual);
1203 }
1204
1205 #[test]
1206 fn test_projection_indices() {
1207 let metadata = build_test_region_metadata();
1208 let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1210 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1211 let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1213 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1214 let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1216 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1217 let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1219 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1220 }
1221
1222 #[test]
1223 fn test_empty_primary_key_offsets() {
1224 let array = build_test_pk_array(&[]);
1225 assert!(primary_key_offsets(&array).unwrap().is_empty());
1226 }
1227
1228 #[test]
1229 fn test_primary_key_offsets_one_series() {
1230 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1231 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1232
1233 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1234 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1235
1236 let array = build_test_pk_array(&[
1237 (b"one".to_vec(), 1),
1238 (b"two".to_vec(), 1),
1239 (b"three".to_vec(), 1),
1240 ]);
1241 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1242 }
1243
1244 #[test]
1245 fn test_primary_key_offsets_multi_series() {
1246 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1247 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1248
1249 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1250 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1251
1252 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1253 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1254 }
1255
1256 #[test]
1257 fn test_convert_empty_record_batch() {
1258 let metadata = build_test_region_metadata();
1259 let arrow_schema = build_test_arrow_schema();
1260 let column_ids: Vec<_> = metadata
1261 .column_metadatas
1262 .iter()
1263 .map(|col| col.column_id)
1264 .collect();
1265 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1266 assert_eq!(arrow_schema, *read_format.arrow_schema());
1267
1268 let record_batch = RecordBatch::new_empty(arrow_schema);
1269 let mut batches = VecDeque::new();
1270 read_format
1271 .convert_record_batch(&record_batch, None, &mut batches)
1272 .unwrap();
1273 assert!(batches.is_empty());
1274 }
1275
1276 #[test]
1277 fn test_convert_record_batch() {
1278 let metadata = build_test_region_metadata();
1279 let column_ids: Vec<_> = metadata
1280 .column_metadatas
1281 .iter()
1282 .map(|col| col.column_id)
1283 .collect();
1284 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1285
1286 let columns: Vec<ArrayRef> = vec![
1287 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1294 let arrow_schema = build_test_arrow_schema();
1295 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1296 let mut batches = VecDeque::new();
1297 read_format
1298 .convert_record_batch(&record_batch, None, &mut batches)
1299 .unwrap();
1300
1301 assert_eq!(
1302 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1303 batches.into_iter().collect::<Vec<_>>(),
1304 );
1305 }
1306
1307 #[test]
1308 fn test_convert_record_batch_with_override_sequence() {
1309 let metadata = build_test_region_metadata();
1310 let column_ids: Vec<_> = metadata
1311 .column_metadatas
1312 .iter()
1313 .map(|col| col.column_id)
1314 .collect();
1315 let read_format =
1316 ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1317
1318 let columns: Vec<ArrayRef> = vec![
1319 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1326 let arrow_schema = build_test_arrow_schema();
1327 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1328
1329 let override_sequence: u64 = 12345;
1331 let override_sequence_array: ArrayRef =
1332 Arc::new(UInt64Array::from_value(override_sequence, 4));
1333
1334 let mut batches = VecDeque::new();
1335 read_format
1336 .as_primary_key()
1337 .unwrap()
1338 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1339 .unwrap();
1340
1341 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1343 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1344
1345 assert_eq!(
1346 vec![expected_batch1, expected_batch2],
1347 batches.into_iter().collect::<Vec<_>>(),
1348 );
1349 }
1350
1351 fn build_test_flat_sst_schema() -> SchemaRef {
1352 let fields = vec![
1353 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1355 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1357 Field::new(
1358 "ts",
1359 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1360 false,
1361 ),
1362 Field::new(
1363 "__primary_key",
1364 ArrowDataType::Dictionary(
1365 Box::new(ArrowDataType::UInt32),
1366 Box::new(ArrowDataType::Binary),
1367 ),
1368 false,
1369 ),
1370 Field::new("__sequence", ArrowDataType::UInt64, false),
1371 Field::new("__op_type", ArrowDataType::UInt8, false),
1372 ];
1373 Arc::new(Schema::new(fields))
1374 }
1375
1376 #[test]
1377 fn test_flat_to_sst_arrow_schema() {
1378 let metadata = build_test_region_metadata();
1379 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1380 assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1381 }
1382
1383 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1384 vec![
1385 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1394 }
1395
1396 #[test]
1397 fn test_flat_convert_batch() {
1398 let metadata = build_test_region_metadata();
1399 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1400
1401 let num_rows = 4;
1402 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1403 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1404 let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1405
1406 let actual = format.convert_batch(&batch).unwrap();
1407 assert_eq!(expect_record, actual);
1408 }
1409
1410 #[test]
1411 fn test_flat_convert_with_override_sequence() {
1412 let metadata = build_test_region_metadata();
1413 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1414 .with_override_sequence(Some(415411));
1415
1416 let num_rows = 4;
1417 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1418 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1419
1420 let expected_columns: Vec<ArrayRef> = vec![
1421 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1430 let expected_record =
1431 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1432
1433 let actual = format.convert_batch(&batch).unwrap();
1434 assert_eq!(expected_record, actual);
1435 }
1436
1437 #[test]
1438 fn test_flat_projection_indices() {
1439 let metadata = build_test_region_metadata();
1440 let read_format =
1445 ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1446 .unwrap();
1447 assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1448
1449 let read_format =
1451 ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1452 .unwrap();
1453 assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1454
1455 let read_format =
1457 ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1458 .unwrap();
1459 assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1460
1461 let read_format =
1463 ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1464 assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1465 }
1466
1467 #[test]
1468 fn test_flat_read_format_convert_batch() {
1469 let metadata = build_test_region_metadata();
1470 let mut format = FlatReadFormat::new(
1471 metadata,
1472 std::iter::once(1), Some(8),
1474 "test",
1475 false,
1476 )
1477 .unwrap();
1478
1479 let num_rows = 4;
1480 let original_sequence = 100u64;
1481 let override_sequence = 200u64;
1482
1483 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1485 let mut test_columns = columns.clone();
1486 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1488 let record_batch =
1489 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1490
1491 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1493 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1494 let sequence_array = sequence_column
1495 .as_any()
1496 .downcast_ref::<UInt64Array>()
1497 .unwrap();
1498
1499 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1500 assert_eq!(sequence_array, &expected_original);
1501
1502 format.set_override_sequence(Some(override_sequence));
1504 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1505 let result = format
1506 .convert_batch(record_batch, Some(&override_sequence_array))
1507 .unwrap();
1508 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1509 let sequence_array = sequence_column
1510 .as_any()
1511 .downcast_ref::<UInt64Array>()
1512 .unwrap();
1513
1514 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1515 assert_eq!(sequence_array, &expected_override);
1516 }
1517
1518 #[test]
1519 fn test_need_convert_to_flat() {
1520 let metadata = build_test_region_metadata();
1521
1522 let expected_columns = metadata.column_metadatas.len() + 3;
1525 let result =
1526 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1527 assert!(
1528 !result,
1529 "Should not need conversion when column counts match"
1530 );
1531
1532 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1535 let result =
1536 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1537 .unwrap();
1538 assert!(
1539 result,
1540 "Should need conversion when primary key columns are missing"
1541 );
1542
1543 let too_many_columns = expected_columns + 1;
1545 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1546 .unwrap_err();
1547 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1548
1549 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1552 .unwrap_err();
1553 assert!(
1554 err.to_string().contains("Column number difference"),
1555 "{err:?}"
1556 );
1557 }
1558
1559 fn build_test_dense_pk_array(
1560 codec: &DensePrimaryKeyCodec,
1561 pk_values_per_row: &[&[Option<i64>]],
1562 ) -> Arc<PrimaryKeyArray> {
1563 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1564
1565 for pk_values_row in pk_values_per_row {
1566 let values: Vec<ValueRef> = pk_values_row
1567 .iter()
1568 .map(|opt| match opt {
1569 Some(val) => ValueRef::Int64(*val),
1570 None => ValueRef::Null,
1571 })
1572 .collect();
1573
1574 let encoded = codec.encode(values.into_iter()).unwrap();
1575 builder.append_value(&encoded);
1576 }
1577
1578 Arc::new(builder.finish())
1579 }
1580
1581 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1582 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1583 builder
1584 .push_column_metadata(ColumnMetadata {
1585 column_schema: ColumnSchema::new(
1586 "__table_id",
1587 ConcreteDataType::uint32_datatype(),
1588 false,
1589 ),
1590 semantic_type: SemanticType::Tag,
1591 column_id: ReservedColumnId::table_id(),
1592 })
1593 .push_column_metadata(ColumnMetadata {
1594 column_schema: ColumnSchema::new(
1595 "__tsid",
1596 ConcreteDataType::uint64_datatype(),
1597 false,
1598 ),
1599 semantic_type: SemanticType::Tag,
1600 column_id: ReservedColumnId::tsid(),
1601 })
1602 .push_column_metadata(ColumnMetadata {
1603 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1604 semantic_type: SemanticType::Tag,
1605 column_id: 1,
1606 })
1607 .push_column_metadata(ColumnMetadata {
1608 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1609 semantic_type: SemanticType::Tag,
1610 column_id: 3,
1611 })
1612 .push_column_metadata(ColumnMetadata {
1613 column_schema: ColumnSchema::new(
1614 "field1",
1615 ConcreteDataType::int64_datatype(),
1616 true,
1617 ),
1618 semantic_type: SemanticType::Field,
1619 column_id: 4,
1620 })
1621 .push_column_metadata(ColumnMetadata {
1622 column_schema: ColumnSchema::new(
1623 "field0",
1624 ConcreteDataType::int64_datatype(),
1625 true,
1626 ),
1627 semantic_type: SemanticType::Field,
1628 column_id: 2,
1629 })
1630 .push_column_metadata(ColumnMetadata {
1631 column_schema: ColumnSchema::new(
1632 "ts",
1633 ConcreteDataType::timestamp_millisecond_datatype(),
1634 false,
1635 ),
1636 semantic_type: SemanticType::Timestamp,
1637 column_id: 5,
1638 })
1639 .primary_key(vec![
1640 ReservedColumnId::table_id(),
1641 ReservedColumnId::tsid(),
1642 1,
1643 3,
1644 ])
1645 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1646 Arc::new(builder.build().unwrap())
1647 }
1648
1649 fn build_test_sparse_pk_array(
1650 codec: &SparsePrimaryKeyCodec,
1651 pk_values_per_row: &[SparseTestRow],
1652 ) -> Arc<PrimaryKeyArray> {
1653 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1654 for row in pk_values_per_row {
1655 let values = vec![
1656 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1657 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1658 (1, ValueRef::String(&row.tag0)),
1659 (3, ValueRef::String(&row.tag1)),
1660 ];
1661
1662 let mut buffer = Vec::new();
1663 codec.encode_value_refs(&values, &mut buffer).unwrap();
1664 builder.append_value(&buffer);
1665 }
1666
1667 Arc::new(builder.finish())
1668 }
1669
1670 #[derive(Clone)]
1671 struct SparseTestRow {
1672 table_id: u32,
1673 tsid: u64,
1674 tag0: String,
1675 tag1: String,
1676 }
1677
1678 #[test]
1679 fn test_flat_read_format_convert_format_with_dense_encoding() {
1680 let metadata = build_test_region_metadata();
1681
1682 let column_ids: Vec<_> = metadata
1683 .column_metadatas
1684 .iter()
1685 .map(|c| c.column_id)
1686 .collect();
1687 let format = FlatReadFormat::new(
1688 metadata.clone(),
1689 column_ids.into_iter(),
1690 Some(6),
1691 "test",
1692 false,
1693 )
1694 .unwrap();
1695
1696 let num_rows = 4;
1697 let original_sequence = 100u64;
1698
1699 let pk_values_per_row = vec![
1701 &[Some(1i64), Some(1i64)][..]; num_rows ];
1703
1704 let codec = DensePrimaryKeyCodec::new(&metadata);
1706 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1707 let columns: Vec<ArrayRef> = vec![
1708 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1715
1716 let old_format_fields = vec![
1718 Field::new("field1", ArrowDataType::Int64, true),
1719 Field::new("field0", ArrowDataType::Int64, true),
1720 Field::new(
1721 "ts",
1722 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1723 false,
1724 ),
1725 Field::new(
1726 "__primary_key",
1727 ArrowDataType::Dictionary(
1728 Box::new(ArrowDataType::UInt32),
1729 Box::new(ArrowDataType::Binary),
1730 ),
1731 false,
1732 ),
1733 Field::new("__sequence", ArrowDataType::UInt64, false),
1734 Field::new("__op_type", ArrowDataType::UInt8, false),
1735 ];
1736 let old_schema = Arc::new(Schema::new(old_format_fields));
1737 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1738
1739 let result = format.convert_batch(record_batch, None).unwrap();
1741
1742 let expected_columns: Vec<ArrayRef> = vec![
1744 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1753 let expected_record_batch =
1754 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1755
1756 assert_eq!(expected_record_batch, result);
1758 }
1759
1760 #[test]
1761 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1762 let metadata = build_test_sparse_region_metadata();
1763
1764 let column_ids: Vec<_> = metadata
1765 .column_metadatas
1766 .iter()
1767 .map(|c| c.column_id)
1768 .collect();
1769 let format = FlatReadFormat::new(
1770 metadata.clone(),
1771 column_ids.clone().into_iter(),
1772 None,
1773 "test",
1774 false,
1775 )
1776 .unwrap();
1777
1778 let num_rows = 4;
1779 let original_sequence = 100u64;
1780
1781 let pk_test_rows = vec![
1783 SparseTestRow {
1784 table_id: 1,
1785 tsid: 123,
1786 tag0: "frontend".to_string(),
1787 tag1: "pod1".to_string(),
1788 };
1789 num_rows
1790 ];
1791
1792 let codec = SparsePrimaryKeyCodec::new(&metadata);
1793 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1794 let columns: Vec<ArrayRef> = vec![
1796 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1803
1804 let old_format_fields = vec![
1806 Field::new("field1", ArrowDataType::Int64, true),
1807 Field::new("field0", ArrowDataType::Int64, true),
1808 Field::new(
1809 "ts",
1810 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1811 false,
1812 ),
1813 Field::new(
1814 "__primary_key",
1815 ArrowDataType::Dictionary(
1816 Box::new(ArrowDataType::UInt32),
1817 Box::new(ArrowDataType::Binary),
1818 ),
1819 false,
1820 ),
1821 Field::new("__sequence", ArrowDataType::UInt64, false),
1822 Field::new("__op_type", ArrowDataType::UInt8, false),
1823 ];
1824 let old_schema = Arc::new(Schema::new(old_format_fields));
1825 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1826
1827 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1829
1830 let tag0_array = Arc::new(DictionaryArray::new(
1832 UInt32Array::from(vec![0; num_rows]),
1833 Arc::new(StringArray::from(vec!["frontend"])),
1834 ));
1835 let tag1_array = Arc::new(DictionaryArray::new(
1836 UInt32Array::from(vec![0; num_rows]),
1837 Arc::new(StringArray::from(vec!["pod1"])),
1838 ));
1839 let expected_columns: Vec<ArrayRef> = vec![
1840 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1851 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1852 let expected_record_batch =
1853 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1854
1855 assert_eq!(expected_record_batch, result);
1857
1858 let format =
1859 FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1860 .unwrap();
1861 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1863 assert_eq!(record_batch, result);
1864 }
1865}