1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::Helper;
43use mito_codec::row_converter::{
44 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
45 build_primary_key_codec_with_fields,
46};
47use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
48use parquet::file::statistics::Statistics;
49use snafu::{OptionExt, ResultExt, ensure};
50use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
51use store_api::storage::{ColumnId, SequenceNumber};
52
53use crate::error::{
54 ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
55};
56use crate::read::{Batch, BatchBuilder, BatchColumn};
57use crate::sst::file::{FileMeta, FileTimeRange};
58use crate::sst::parquet::flat_format::FlatReadFormat;
59use crate::sst::to_sst_arrow_schema;
60
61pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
63pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
65
66pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
70pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
72
73pub(crate) struct PrimaryKeyWriteFormat {
75 arrow_schema: SchemaRef,
77 override_sequence: Option<SequenceNumber>,
78}
79
80impl PrimaryKeyWriteFormat {
81 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
83 let arrow_schema = to_sst_arrow_schema(&metadata);
84 PrimaryKeyWriteFormat {
85 arrow_schema,
86 override_sequence: None,
87 }
88 }
89
90 pub(crate) fn with_override_sequence(
92 mut self,
93 override_sequence: Option<SequenceNumber>,
94 ) -> Self {
95 self.override_sequence = override_sequence;
96 self
97 }
98
99 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
101 &self.arrow_schema
102 }
103
104 pub(crate) fn convert_flat_batch(
110 &self,
111 batch: &RecordBatch,
112 num_fields: usize,
113 ) -> Result<RecordBatch> {
114 let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
115 let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
116
117 if let Some(override_sequence) = self.override_sequence {
118 let num_cols = columns.len();
119 columns[num_cols - 2] =
121 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
122 }
123
124 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
125 }
126}
127
128pub enum ReadFormat {
130 PrimaryKey(PrimaryKeyReadFormat),
132 Flat(FlatReadFormat),
134}
135
136impl ReadFormat {
137 pub fn new_primary_key(
139 metadata: RegionMetadataRef,
140 column_ids: impl Iterator<Item = ColumnId>,
141 ) -> Self {
142 ReadFormat::PrimaryKey(PrimaryKeyReadFormat::new(metadata, column_ids))
143 }
144
145 pub fn new_flat(
147 metadata: RegionMetadataRef,
148 column_ids: impl Iterator<Item = ColumnId>,
149 num_columns: Option<usize>,
150 file_path: &str,
151 skip_auto_convert: bool,
152 ) -> Result<Self> {
153 Ok(ReadFormat::Flat(FlatReadFormat::new(
154 metadata,
155 column_ids,
156 num_columns,
157 file_path,
158 skip_auto_convert,
159 )?))
160 }
161
162 pub fn new(
164 region_metadata: RegionMetadataRef,
165 projection: Option<&[ColumnId]>,
166 flat_format: bool,
167 num_columns: Option<usize>,
168 file_path: &str,
169 skip_auto_convert: bool,
170 ) -> Result<ReadFormat> {
171 if flat_format {
172 if let Some(column_ids) = projection {
173 ReadFormat::new_flat(
174 region_metadata,
175 column_ids.iter().copied(),
176 num_columns,
177 file_path,
178 skip_auto_convert,
179 )
180 } else {
181 ReadFormat::new_flat(
183 region_metadata.clone(),
184 region_metadata
185 .column_metadatas
186 .iter()
187 .map(|col| col.column_id),
188 num_columns,
189 file_path,
190 skip_auto_convert,
191 )
192 }
193 } else if let Some(column_ids) = projection {
194 Ok(ReadFormat::new_primary_key(
195 region_metadata,
196 column_ids.iter().copied(),
197 ))
198 } else {
199 Ok(ReadFormat::new_primary_key(
201 region_metadata.clone(),
202 region_metadata
203 .column_metadatas
204 .iter()
205 .map(|col| col.column_id),
206 ))
207 }
208 }
209
210 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyReadFormat> {
211 match self {
212 ReadFormat::PrimaryKey(format) => Some(format),
213 _ => None,
214 }
215 }
216
217 pub(crate) fn as_flat(&self) -> Option<&FlatReadFormat> {
218 match self {
219 ReadFormat::Flat(format) => Some(format),
220 _ => None,
221 }
222 }
223
224 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
229 match self {
230 ReadFormat::PrimaryKey(format) => format.arrow_schema(),
231 ReadFormat::Flat(format) => format.arrow_schema(),
232 }
233 }
234
235 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
237 match self {
238 ReadFormat::PrimaryKey(format) => format.metadata(),
239 ReadFormat::Flat(format) => format.metadata(),
240 }
241 }
242
243 pub(crate) fn projection_indices(&self) -> &[usize] {
245 match self {
246 ReadFormat::PrimaryKey(format) => format.projection_indices(),
247 ReadFormat::Flat(format) => format.projection_indices(),
248 }
249 }
250
251 pub fn min_values(
253 &self,
254 row_groups: &[impl Borrow<RowGroupMetaData>],
255 column_id: ColumnId,
256 ) -> StatValues {
257 match self {
258 ReadFormat::PrimaryKey(format) => format.min_values(row_groups, column_id),
259 ReadFormat::Flat(format) => format.min_values(row_groups, column_id),
260 }
261 }
262
263 pub fn max_values(
265 &self,
266 row_groups: &[impl Borrow<RowGroupMetaData>],
267 column_id: ColumnId,
268 ) -> StatValues {
269 match self {
270 ReadFormat::PrimaryKey(format) => format.max_values(row_groups, column_id),
271 ReadFormat::Flat(format) => format.max_values(row_groups, column_id),
272 }
273 }
274
275 pub fn null_counts(
277 &self,
278 row_groups: &[impl Borrow<RowGroupMetaData>],
279 column_id: ColumnId,
280 ) -> StatValues {
281 match self {
282 ReadFormat::PrimaryKey(format) => format.null_counts(row_groups, column_id),
283 ReadFormat::Flat(format) => format.null_counts(row_groups, column_id),
284 }
285 }
286
287 pub(crate) fn column_values(
291 row_groups: &[impl Borrow<RowGroupMetaData>],
292 column: &ColumnMetadata,
293 column_index: usize,
294 is_min: bool,
295 ) -> Option<ArrayRef> {
296 let null_scalar: ScalarValue = column
297 .column_schema
298 .data_type
299 .as_arrow_type()
300 .try_into()
301 .ok()?;
302 let scalar_values = row_groups
303 .iter()
304 .map(|meta| {
305 let stats = meta.borrow().column(column_index).statistics()?;
306 match stats {
307 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
308 *s.min_opt()?
309 } else {
310 *s.max_opt()?
311 }))),
312 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
313 *s.min_opt()?
314 } else {
315 *s.max_opt()?
316 }))),
317 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
318 *s.min_opt()?
319 } else {
320 *s.max_opt()?
321 }))),
322
323 Statistics::Int96(_) => None,
324 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
325 *s.min_opt()?
326 } else {
327 *s.max_opt()?
328 }))),
329 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
330 *s.min_opt()?
331 } else {
332 *s.max_opt()?
333 }))),
334 Statistics::ByteArray(s) => {
335 let bytes = if is_min {
336 s.min_bytes_opt()?
337 } else {
338 s.max_bytes_opt()?
339 };
340 let s = String::from_utf8(bytes.to_vec()).ok();
341 Some(ScalarValue::Utf8(s))
342 }
343
344 Statistics::FixedLenByteArray(_) => None,
345 }
346 })
347 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
348 .collect::<Vec<ScalarValue>>();
349 debug_assert_eq!(scalar_values.len(), row_groups.len());
350 ScalarValue::iter_to_array(scalar_values).ok()
351 }
352
353 pub(crate) fn column_null_counts(
356 row_groups: &[impl Borrow<RowGroupMetaData>],
357 column_index: usize,
358 ) -> Option<ArrayRef> {
359 let values = row_groups.iter().map(|meta| {
360 let col = meta.borrow().column(column_index);
361 let stat = col.statistics()?;
362 stat.null_count_opt()
363 });
364 Some(Arc::new(UInt64Array::from_iter(values)))
365 }
366
367 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
369 match self {
370 ReadFormat::PrimaryKey(format) => format.set_override_sequence(sequence),
371 ReadFormat::Flat(format) => format.set_override_sequence(sequence),
372 }
373 }
374
375 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
377 if let ReadFormat::PrimaryKey(format) = self {
378 format.set_decode_primary_key_values(decode);
379 }
380 }
381
382 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
384 match self {
385 ReadFormat::PrimaryKey(format) => format.new_override_sequence_array(length),
386 ReadFormat::Flat(format) => format.new_override_sequence_array(length),
387 }
388 }
389}
390
391pub struct PrimaryKeyReadFormat {
393 metadata: RegionMetadataRef,
395 arrow_schema: SchemaRef,
397 field_id_to_index: HashMap<ColumnId, usize>,
400 projection_indices: Vec<usize>,
402 field_id_to_projected_index: HashMap<ColumnId, usize>,
405 override_sequence: Option<SequenceNumber>,
407 primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
409}
410
411impl PrimaryKeyReadFormat {
412 pub fn new(
414 metadata: RegionMetadataRef,
415 column_ids: impl Iterator<Item = ColumnId>,
416 ) -> PrimaryKeyReadFormat {
417 let field_id_to_index: HashMap<_, _> = metadata
418 .field_columns()
419 .enumerate()
420 .map(|(index, column)| (column.column_id, index))
421 .collect();
422 let arrow_schema = to_sst_arrow_schema(&metadata);
423
424 let format_projection = FormatProjection::compute_format_projection(
425 &field_id_to_index,
426 arrow_schema.fields.len(),
427 column_ids,
428 );
429
430 PrimaryKeyReadFormat {
431 metadata,
432 arrow_schema,
433 field_id_to_index,
434 projection_indices: format_projection.projection_indices,
435 field_id_to_projected_index: format_projection.column_id_to_projected_index,
436 override_sequence: None,
437 primary_key_codec: None,
438 }
439 }
440
441 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
443 self.override_sequence = sequence;
444 }
445
446 pub(crate) fn set_decode_primary_key_values(&mut self, decode: bool) {
448 self.primary_key_codec = if decode {
449 Some(build_primary_key_codec(&self.metadata))
450 } else {
451 None
452 };
453 }
454
455 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
460 &self.arrow_schema
461 }
462
463 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
465 &self.metadata
466 }
467
468 pub(crate) fn projection_indices(&self) -> &[usize] {
470 &self.projection_indices
471 }
472
473 pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
475 &self.field_id_to_projected_index
476 }
477
478 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
480 self.override_sequence
481 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
482 }
483
484 pub fn convert_record_batch(
489 &self,
490 record_batch: &RecordBatch,
491 override_sequence_array: Option<&ArrayRef>,
492 batches: &mut VecDeque<Batch>,
493 ) -> Result<()> {
494 debug_assert!(batches.is_empty());
495
496 ensure!(
498 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
499 InvalidRecordBatchSnafu {
500 reason: format!(
501 "record batch only has {} columns",
502 record_batch.num_columns()
503 ),
504 }
505 );
506
507 let mut fixed_pos_columns = record_batch
508 .columns()
509 .iter()
510 .rev()
511 .take(FIXED_POS_COLUMN_NUM);
512 let op_type_array = fixed_pos_columns.next().unwrap();
514 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
515 let pk_array = fixed_pos_columns.next().unwrap();
516 let ts_array = fixed_pos_columns.next().unwrap();
517 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
518
519 if let Some(override_array) = override_sequence_array {
521 assert!(override_array.len() >= sequence_array.len());
522 sequence_array = if override_array.len() > sequence_array.len() {
525 override_array.slice(0, sequence_array.len())
526 } else {
527 override_array.clone()
528 };
529 }
530
531 let pk_dict_array = pk_array
533 .as_any()
534 .downcast_ref::<PrimaryKeyArray>()
535 .with_context(|| InvalidRecordBatchSnafu {
536 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
537 })?;
538 let offsets = primary_key_offsets(pk_dict_array)?;
539 if offsets.is_empty() {
540 return Ok(());
541 }
542
543 let keys = pk_dict_array.keys();
545 let pk_values = pk_dict_array
546 .values()
547 .as_any()
548 .downcast_ref::<BinaryArray>()
549 .with_context(|| InvalidRecordBatchSnafu {
550 reason: format!(
551 "values of primary key array should not be {:?}",
552 pk_dict_array.values().data_type()
553 ),
554 })?;
555 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
556 let end = offsets[i + 1];
557 let rows_in_batch = end - start;
558 let dict_key = keys.value(*start);
559 let primary_key = pk_values.value(dict_key as usize).to_vec();
560
561 let mut builder = BatchBuilder::new(primary_key);
562 builder
563 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
564 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
565 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
566 for batch_column in &field_batch_columns {
568 builder.push_field(BatchColumn {
569 column_id: batch_column.column_id,
570 data: batch_column.data.slice(*start, rows_in_batch),
571 });
572 }
573
574 let mut batch = builder.build()?;
575 if let Some(codec) = &self.primary_key_codec {
576 let pk_values: CompositeValues =
577 codec.decode(batch.primary_key()).context(DecodeSnafu)?;
578 batch.set_pk_values(pk_values);
579 }
580 batches.push_back(batch);
581 }
582
583 Ok(())
584 }
585
586 pub fn min_values(
588 &self,
589 row_groups: &[impl Borrow<RowGroupMetaData>],
590 column_id: ColumnId,
591 ) -> StatValues {
592 let Some(column) = self.metadata.column_by_id(column_id) else {
593 return StatValues::NoColumn;
595 };
596 match column.semantic_type {
597 SemanticType::Tag => self.tag_values(row_groups, column, true),
598 SemanticType::Field => {
599 let index = self.field_id_to_index.get(&column_id).unwrap();
601 let stats = ReadFormat::column_values(row_groups, column, *index, true);
602 StatValues::from_stats_opt(stats)
603 }
604 SemanticType::Timestamp => {
605 let index = self.time_index_position();
606 let stats = ReadFormat::column_values(row_groups, column, index, true);
607 StatValues::from_stats_opt(stats)
608 }
609 }
610 }
611
612 pub fn max_values(
614 &self,
615 row_groups: &[impl Borrow<RowGroupMetaData>],
616 column_id: ColumnId,
617 ) -> StatValues {
618 let Some(column) = self.metadata.column_by_id(column_id) else {
619 return StatValues::NoColumn;
621 };
622 match column.semantic_type {
623 SemanticType::Tag => self.tag_values(row_groups, column, false),
624 SemanticType::Field => {
625 let index = self.field_id_to_index.get(&column_id).unwrap();
627 let stats = ReadFormat::column_values(row_groups, column, *index, false);
628 StatValues::from_stats_opt(stats)
629 }
630 SemanticType::Timestamp => {
631 let index = self.time_index_position();
632 let stats = ReadFormat::column_values(row_groups, column, index, false);
633 StatValues::from_stats_opt(stats)
634 }
635 }
636 }
637
638 pub fn null_counts(
640 &self,
641 row_groups: &[impl Borrow<RowGroupMetaData>],
642 column_id: ColumnId,
643 ) -> StatValues {
644 let Some(column) = self.metadata.column_by_id(column_id) else {
645 return StatValues::NoColumn;
647 };
648 match column.semantic_type {
649 SemanticType::Tag => StatValues::NoStats,
650 SemanticType::Field => {
651 let index = self.field_id_to_index.get(&column_id).unwrap();
653 let stats = ReadFormat::column_null_counts(row_groups, *index);
654 StatValues::from_stats_opt(stats)
655 }
656 SemanticType::Timestamp => {
657 let index = self.time_index_position();
658 let stats = ReadFormat::column_null_counts(row_groups, index);
659 StatValues::from_stats_opt(stats)
660 }
661 }
662 }
663
664 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
666 record_batch
667 .columns()
668 .iter()
669 .zip(record_batch.schema().fields())
670 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
672 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
673 let column = self
674 .metadata
675 .column_by_name(field.name())
676 .with_context(|| InvalidRecordBatchSnafu {
677 reason: format!("column {} not found in metadata", field.name()),
678 })?;
679
680 Ok(BatchColumn {
681 column_id: column.column_id,
682 data: vector,
683 })
684 })
685 .collect()
686 }
687
688 fn tag_values(
690 &self,
691 row_groups: &[impl Borrow<RowGroupMetaData>],
692 column: &ColumnMetadata,
693 is_min: bool,
694 ) -> StatValues {
695 let is_first_tag = self
696 .metadata
697 .primary_key
698 .first()
699 .map(|id| *id == column.column_id)
700 .unwrap_or(false);
701 if !is_first_tag {
702 return StatValues::NoStats;
704 }
705
706 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
707 }
708
709 fn first_tag_values(
712 &self,
713 row_groups: &[impl Borrow<RowGroupMetaData>],
714 column: &ColumnMetadata,
715 is_min: bool,
716 ) -> Option<ArrayRef> {
717 debug_assert!(
718 self.metadata
719 .primary_key
720 .first()
721 .map(|id| *id == column.column_id)
722 .unwrap_or(false)
723 );
724
725 let primary_key_encoding = self.metadata.primary_key_encoding;
726 let converter = build_primary_key_codec_with_fields(
727 primary_key_encoding,
728 [(
729 column.column_id,
730 SortField::new(column.column_schema.data_type.clone()),
731 )]
732 .into_iter(),
733 );
734
735 let values = row_groups.iter().map(|meta| {
736 let stats = meta
737 .borrow()
738 .column(self.primary_key_position())
739 .statistics()?;
740 match stats {
741 Statistics::Boolean(_) => None,
742 Statistics::Int32(_) => None,
743 Statistics::Int64(_) => None,
744 Statistics::Int96(_) => None,
745 Statistics::Float(_) => None,
746 Statistics::Double(_) => None,
747 Statistics::ByteArray(s) => {
748 let bytes = if is_min {
749 s.min_bytes_opt()?
750 } else {
751 s.max_bytes_opt()?
752 };
753 converter.decode_leftmost(bytes).ok()?
754 }
755 Statistics::FixedLenByteArray(_) => None,
756 }
757 });
758 let mut builder = column
759 .column_schema
760 .data_type
761 .create_mutable_vector(row_groups.len());
762 for value_opt in values {
763 match value_opt {
764 Some(v) => builder.push_value_ref(&v.as_value_ref()),
766 None => builder.push_null(),
767 }
768 }
769 let vector = builder.to_vector();
770
771 Some(vector.to_arrow_array())
772 }
773
774 fn primary_key_position(&self) -> usize {
776 self.arrow_schema.fields.len() - 3
777 }
778
779 fn time_index_position(&self) -> usize {
781 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
782 }
783
784 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
786 self.field_id_to_projected_index.get(&column_id).copied()
787 }
788}
789
790pub(crate) struct FormatProjection {
792 pub(crate) projection_indices: Vec<usize>,
794 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
799}
800
801impl FormatProjection {
802 pub(crate) fn compute_format_projection(
806 id_to_index: &HashMap<ColumnId, usize>,
807 sst_column_num: usize,
808 column_ids: impl Iterator<Item = ColumnId>,
809 ) -> Self {
810 let mut projected_schema: Vec<_> = column_ids
814 .filter_map(|column_id| {
815 id_to_index
816 .get(&column_id)
817 .copied()
818 .map(|index| (column_id, index))
819 })
820 .collect();
821 projected_schema.sort_unstable_by_key(|x| x.1);
824 projected_schema.dedup_by_key(|x| x.1);
826
827 let mut projection_indices: Vec<_> = projected_schema
830 .iter()
831 .map(|(_column_id, index)| *index)
832 .chain(sst_column_num - FIXED_POS_COLUMN_NUM..sst_column_num)
834 .collect();
835 projection_indices.sort_unstable();
836 projection_indices.dedup();
838
839 let column_id_to_projected_index = projected_schema
841 .into_iter()
842 .map(|(column_id, _)| column_id)
843 .enumerate()
844 .map(|(index, column_id)| (column_id, index))
845 .collect();
846
847 Self {
848 projection_indices,
849 column_id_to_projected_index,
850 }
851 }
852}
853
854pub enum StatValues {
859 Values(ArrayRef),
861 NoColumn,
863 NoStats,
865}
866
867impl StatValues {
868 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
870 match stats {
871 Some(stats) => StatValues::Values(stats),
872 None => StatValues::NoStats,
873 }
874 }
875}
876
877#[cfg(test)]
878impl PrimaryKeyReadFormat {
879 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
881 Self::new(
882 Arc::clone(&metadata),
883 metadata.column_metadatas.iter().map(|c| c.column_id),
884 )
885 }
886}
887
888pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
890 if pk_dict_array.is_empty() {
891 return Ok(Vec::new());
892 }
893
894 let mut offsets = vec![0];
896 let keys = pk_dict_array.keys();
897 let pk_indices = keys.values();
899 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
900 if *key != pk_indices[i + 1] {
902 offsets.push(i + 1);
904 }
905 }
906 offsets.push(keys.len());
907
908 Ok(offsets)
909}
910
911pub(crate) fn parquet_row_group_time_range(
914 file_meta: &FileMeta,
915 parquet_meta: &ParquetMetaData,
916 row_group_idx: usize,
917) -> Option<FileTimeRange> {
918 let row_group_meta = parquet_meta.row_group(row_group_idx);
919 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
920 assert!(
921 num_columns >= FIXED_POS_COLUMN_NUM,
922 "file only has {} columns",
923 num_columns
924 );
925 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
926
927 let stats = row_group_meta.column(time_index_pos).statistics()?;
928 let (min, max) = match stats {
930 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
931 Statistics::Int32(_)
932 | Statistics::Boolean(_)
933 | Statistics::Int96(_)
934 | Statistics::Float(_)
935 | Statistics::Double(_)
936 | Statistics::ByteArray(_)
937 | Statistics::FixedLenByteArray(_) => {
938 common_telemetry::warn!(
939 "Invalid statistics {:?} for time index in parquet in {}",
940 stats,
941 file_meta.file_id
942 );
943 return None;
944 }
945 };
946
947 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
948 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
949 let unit = file_meta.time_range.0.unit();
950
951 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
952}
953
954pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
957 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
958 if num_columns < FIXED_POS_COLUMN_NUM {
959 return false;
960 }
961
962 let sequence_pos = num_columns - 2;
964
965 for row_group in parquet_meta.row_groups() {
967 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
968 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
969 if *min_val != 0 || *max_val != 0 {
971 return false;
972 }
973 } else {
974 return false;
976 }
977 } else {
978 return false;
980 }
981 }
982
983 !parquet_meta.row_groups().is_empty()
985}
986
987#[cfg(test)]
988mod tests {
989 use std::sync::Arc;
990
991 use api::v1::OpType;
992 use datatypes::arrow::array::{
993 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
994 };
995 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
996 use datatypes::prelude::ConcreteDataType;
997 use datatypes::schema::ColumnSchema;
998 use datatypes::value::ValueRef;
999 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
1000 use mito_codec::row_converter::{
1001 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
1002 };
1003 use store_api::codec::PrimaryKeyEncoding;
1004 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1005 use store_api::storage::RegionId;
1006 use store_api::storage::consts::ReservedColumnId;
1007
1008 use super::*;
1009 use crate::sst::parquet::flat_format::{FlatWriteFormat, sequence_column_index};
1010 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1011
1012 const TEST_SEQUENCE: u64 = 1;
1013 const TEST_OP_TYPE: u8 = OpType::Put as u8;
1014
1015 fn build_test_region_metadata() -> RegionMetadataRef {
1016 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1017 builder
1018 .push_column_metadata(ColumnMetadata {
1019 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
1020 semantic_type: SemanticType::Tag,
1021 column_id: 1,
1022 })
1023 .push_column_metadata(ColumnMetadata {
1024 column_schema: ColumnSchema::new(
1025 "field1",
1026 ConcreteDataType::int64_datatype(),
1027 true,
1028 ),
1029 semantic_type: SemanticType::Field,
1030 column_id: 4, })
1032 .push_column_metadata(ColumnMetadata {
1033 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
1034 semantic_type: SemanticType::Tag,
1035 column_id: 3,
1036 })
1037 .push_column_metadata(ColumnMetadata {
1038 column_schema: ColumnSchema::new(
1039 "field0",
1040 ConcreteDataType::int64_datatype(),
1041 true,
1042 ),
1043 semantic_type: SemanticType::Field,
1044 column_id: 2,
1045 })
1046 .push_column_metadata(ColumnMetadata {
1047 column_schema: ColumnSchema::new(
1048 "ts",
1049 ConcreteDataType::timestamp_millisecond_datatype(),
1050 false,
1051 ),
1052 semantic_type: SemanticType::Timestamp,
1053 column_id: 5,
1054 })
1055 .primary_key(vec![1, 3]);
1056 Arc::new(builder.build().unwrap())
1057 }
1058
1059 fn build_test_arrow_schema() -> SchemaRef {
1060 let fields = vec![
1061 Field::new("field1", ArrowDataType::Int64, true),
1062 Field::new("field0", ArrowDataType::Int64, true),
1063 Field::new(
1064 "ts",
1065 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1066 false,
1067 ),
1068 Field::new(
1069 "__primary_key",
1070 ArrowDataType::Dictionary(
1071 Box::new(ArrowDataType::UInt32),
1072 Box::new(ArrowDataType::Binary),
1073 ),
1074 false,
1075 ),
1076 Field::new("__sequence", ArrowDataType::UInt64, false),
1077 Field::new("__op_type", ArrowDataType::UInt8, false),
1078 ];
1079 Arc::new(Schema::new(fields))
1080 }
1081
1082 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
1083 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
1084 }
1085
1086 fn new_batch_with_sequence(
1087 primary_key: &[u8],
1088 start_ts: i64,
1089 start_field: i64,
1090 num_rows: usize,
1091 sequence: u64,
1092 ) -> Batch {
1093 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
1094 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
1095 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
1096 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
1097 let fields = vec![
1098 BatchColumn {
1099 column_id: 4,
1100 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
1101 }, BatchColumn {
1103 column_id: 2,
1104 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
1105 }, ];
1107
1108 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
1109 .with_fields(fields)
1110 .build()
1111 .unwrap()
1112 }
1113
1114 #[test]
1115 fn test_to_sst_arrow_schema() {
1116 let metadata = build_test_region_metadata();
1117 let write_format = PrimaryKeyWriteFormat::new(metadata);
1118 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
1119 }
1120
1121 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
1122 let values = Arc::new(BinaryArray::from_iter_values(
1123 pk_row_nums.iter().map(|v| &v.0),
1124 ));
1125 let mut keys = vec![];
1126 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
1127 keys.extend(std::iter::repeat_n(index as u32, num_rows));
1128 }
1129 let keys = UInt32Array::from(keys);
1130 Arc::new(DictionaryArray::new(keys, values))
1131 }
1132
1133 #[test]
1134 fn test_projection_indices() {
1135 let metadata = build_test_region_metadata();
1136 let read_format = ReadFormat::new_primary_key(metadata.clone(), [3].iter().copied());
1138 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1139 let read_format = ReadFormat::new_primary_key(metadata.clone(), [4].iter().copied());
1141 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
1142 let read_format = ReadFormat::new_primary_key(metadata.clone(), [5].iter().copied());
1144 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
1145 let read_format = ReadFormat::new_primary_key(metadata, [2, 1, 5].iter().copied());
1147 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
1148 }
1149
1150 #[test]
1151 fn test_empty_primary_key_offsets() {
1152 let array = build_test_pk_array(&[]);
1153 assert!(primary_key_offsets(&array).unwrap().is_empty());
1154 }
1155
1156 #[test]
1157 fn test_primary_key_offsets_one_series() {
1158 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1159 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1160
1161 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1162 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1163
1164 let array = build_test_pk_array(&[
1165 (b"one".to_vec(), 1),
1166 (b"two".to_vec(), 1),
1167 (b"three".to_vec(), 1),
1168 ]);
1169 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1170 }
1171
1172 #[test]
1173 fn test_primary_key_offsets_multi_series() {
1174 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1175 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1176
1177 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1178 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1179
1180 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1181 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1182 }
1183
1184 #[test]
1185 fn test_convert_empty_record_batch() {
1186 let metadata = build_test_region_metadata();
1187 let arrow_schema = build_test_arrow_schema();
1188 let column_ids: Vec<_> = metadata
1189 .column_metadatas
1190 .iter()
1191 .map(|col| col.column_id)
1192 .collect();
1193 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1194 assert_eq!(arrow_schema, *read_format.arrow_schema());
1195
1196 let record_batch = RecordBatch::new_empty(arrow_schema);
1197 let mut batches = VecDeque::new();
1198 read_format
1199 .convert_record_batch(&record_batch, None, &mut batches)
1200 .unwrap();
1201 assert!(batches.is_empty());
1202 }
1203
1204 #[test]
1205 fn test_convert_record_batch() {
1206 let metadata = build_test_region_metadata();
1207 let column_ids: Vec<_> = metadata
1208 .column_metadatas
1209 .iter()
1210 .map(|col| col.column_id)
1211 .collect();
1212 let read_format = PrimaryKeyReadFormat::new(metadata, column_ids.iter().copied());
1213
1214 let columns: Vec<ArrayRef> = vec![
1215 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1222 let arrow_schema = build_test_arrow_schema();
1223 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1224 let mut batches = VecDeque::new();
1225 read_format
1226 .convert_record_batch(&record_batch, None, &mut batches)
1227 .unwrap();
1228
1229 assert_eq!(
1230 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1231 batches.into_iter().collect::<Vec<_>>(),
1232 );
1233 }
1234
1235 #[test]
1236 fn test_convert_record_batch_with_override_sequence() {
1237 let metadata = build_test_region_metadata();
1238 let column_ids: Vec<_> = metadata
1239 .column_metadatas
1240 .iter()
1241 .map(|col| col.column_id)
1242 .collect();
1243 let read_format =
1244 ReadFormat::new(metadata, Some(&column_ids), false, None, "test", false).unwrap();
1245
1246 let columns: Vec<ArrayRef> = vec![
1247 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1254 let arrow_schema = build_test_arrow_schema();
1255 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1256
1257 let override_sequence: u64 = 12345;
1259 let override_sequence_array: ArrayRef =
1260 Arc::new(UInt64Array::from_value(override_sequence, 4));
1261
1262 let mut batches = VecDeque::new();
1263 read_format
1264 .as_primary_key()
1265 .unwrap()
1266 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1267 .unwrap();
1268
1269 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1271 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1272
1273 assert_eq!(
1274 vec![expected_batch1, expected_batch2],
1275 batches.into_iter().collect::<Vec<_>>(),
1276 );
1277 }
1278
1279 fn build_test_flat_sst_schema() -> SchemaRef {
1280 let fields = vec![
1281 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1283 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1285 Field::new(
1286 "ts",
1287 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1288 false,
1289 ),
1290 Field::new(
1291 "__primary_key",
1292 ArrowDataType::Dictionary(
1293 Box::new(ArrowDataType::UInt32),
1294 Box::new(ArrowDataType::Binary),
1295 ),
1296 false,
1297 ),
1298 Field::new("__sequence", ArrowDataType::UInt64, false),
1299 Field::new("__op_type", ArrowDataType::UInt8, false),
1300 ];
1301 Arc::new(Schema::new(fields))
1302 }
1303
1304 #[test]
1305 fn test_flat_to_sst_arrow_schema() {
1306 let metadata = build_test_region_metadata();
1307 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1308 assert_eq!(&build_test_flat_sst_schema(), format.arrow_schema());
1309 }
1310
1311 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1312 vec![
1313 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1322 }
1323
1324 #[test]
1325 fn test_flat_convert_batch() {
1326 let metadata = build_test_region_metadata();
1327 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1328
1329 let num_rows = 4;
1330 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1331 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns.clone()).unwrap();
1332 let expect_record = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1333
1334 let actual = format.convert_batch(&batch).unwrap();
1335 assert_eq!(expect_record, actual);
1336 }
1337
1338 #[test]
1339 fn test_flat_convert_with_override_sequence() {
1340 let metadata = build_test_region_metadata();
1341 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1342 .with_override_sequence(Some(415411));
1343
1344 let num_rows = 4;
1345 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1346 let batch = RecordBatch::try_new(build_test_flat_sst_schema(), columns).unwrap();
1347
1348 let expected_columns: Vec<ArrayRef> = vec![
1349 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1358 let expected_record =
1359 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1360
1361 let actual = format.convert_batch(&batch).unwrap();
1362 assert_eq!(expected_record, actual);
1363 }
1364
1365 #[test]
1366 fn test_flat_projection_indices() {
1367 let metadata = build_test_region_metadata();
1368 let read_format =
1373 ReadFormat::new_flat(metadata.clone(), [3].iter().copied(), None, "test", false)
1374 .unwrap();
1375 assert_eq!(&[1, 4, 5, 6, 7], read_format.projection_indices());
1376
1377 let read_format =
1379 ReadFormat::new_flat(metadata.clone(), [4].iter().copied(), None, "test", false)
1380 .unwrap();
1381 assert_eq!(&[2, 4, 5, 6, 7], read_format.projection_indices());
1382
1383 let read_format =
1385 ReadFormat::new_flat(metadata.clone(), [5].iter().copied(), None, "test", false)
1386 .unwrap();
1387 assert_eq!(&[4, 5, 6, 7], read_format.projection_indices());
1388
1389 let read_format =
1391 ReadFormat::new_flat(metadata, [2, 1, 5].iter().copied(), None, "test", false).unwrap();
1392 assert_eq!(&[0, 3, 4, 5, 6, 7], read_format.projection_indices());
1393 }
1394
1395 #[test]
1396 fn test_flat_read_format_convert_batch() {
1397 let metadata = build_test_region_metadata();
1398 let mut format = FlatReadFormat::new(
1399 metadata,
1400 std::iter::once(1), Some(8),
1402 "test",
1403 false,
1404 )
1405 .unwrap();
1406
1407 let num_rows = 4;
1408 let original_sequence = 100u64;
1409 let override_sequence = 200u64;
1410
1411 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1413 let mut test_columns = columns.clone();
1414 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1416 let record_batch =
1417 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1418
1419 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1421 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1422 let sequence_array = sequence_column
1423 .as_any()
1424 .downcast_ref::<UInt64Array>()
1425 .unwrap();
1426
1427 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1428 assert_eq!(sequence_array, &expected_original);
1429
1430 format.set_override_sequence(Some(override_sequence));
1432 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1433 let result = format
1434 .convert_batch(record_batch, Some(&override_sequence_array))
1435 .unwrap();
1436 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1437 let sequence_array = sequence_column
1438 .as_any()
1439 .downcast_ref::<UInt64Array>()
1440 .unwrap();
1441
1442 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1443 assert_eq!(sequence_array, &expected_override);
1444 }
1445
1446 #[test]
1447 fn test_need_convert_to_flat() {
1448 let metadata = build_test_region_metadata();
1449
1450 let expected_columns = metadata.column_metadatas.len() + 3;
1453 let result =
1454 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1455 assert!(
1456 !result,
1457 "Should not need conversion when column counts match"
1458 );
1459
1460 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1463 let result =
1464 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1465 .unwrap();
1466 assert!(
1467 result,
1468 "Should need conversion when primary key columns are missing"
1469 );
1470
1471 let too_many_columns = expected_columns + 1;
1473 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1474 .unwrap_err();
1475 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1476
1477 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1480 .unwrap_err();
1481 assert!(
1482 err.to_string().contains("Column number difference"),
1483 "{err:?}"
1484 );
1485 }
1486
1487 fn build_test_dense_pk_array(
1488 codec: &DensePrimaryKeyCodec,
1489 pk_values_per_row: &[&[Option<i64>]],
1490 ) -> Arc<PrimaryKeyArray> {
1491 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1492
1493 for pk_values_row in pk_values_per_row {
1494 let values: Vec<ValueRef> = pk_values_row
1495 .iter()
1496 .map(|opt| match opt {
1497 Some(val) => ValueRef::Int64(*val),
1498 None => ValueRef::Null,
1499 })
1500 .collect();
1501
1502 let encoded = codec.encode(values.into_iter()).unwrap();
1503 builder.append_value(&encoded);
1504 }
1505
1506 Arc::new(builder.finish())
1507 }
1508
1509 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1510 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1511 builder
1512 .push_column_metadata(ColumnMetadata {
1513 column_schema: ColumnSchema::new(
1514 "__table_id",
1515 ConcreteDataType::uint32_datatype(),
1516 false,
1517 ),
1518 semantic_type: SemanticType::Tag,
1519 column_id: ReservedColumnId::table_id(),
1520 })
1521 .push_column_metadata(ColumnMetadata {
1522 column_schema: ColumnSchema::new(
1523 "__tsid",
1524 ConcreteDataType::uint64_datatype(),
1525 false,
1526 ),
1527 semantic_type: SemanticType::Tag,
1528 column_id: ReservedColumnId::tsid(),
1529 })
1530 .push_column_metadata(ColumnMetadata {
1531 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1532 semantic_type: SemanticType::Tag,
1533 column_id: 1,
1534 })
1535 .push_column_metadata(ColumnMetadata {
1536 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1537 semantic_type: SemanticType::Tag,
1538 column_id: 3,
1539 })
1540 .push_column_metadata(ColumnMetadata {
1541 column_schema: ColumnSchema::new(
1542 "field1",
1543 ConcreteDataType::int64_datatype(),
1544 true,
1545 ),
1546 semantic_type: SemanticType::Field,
1547 column_id: 4,
1548 })
1549 .push_column_metadata(ColumnMetadata {
1550 column_schema: ColumnSchema::new(
1551 "field0",
1552 ConcreteDataType::int64_datatype(),
1553 true,
1554 ),
1555 semantic_type: SemanticType::Field,
1556 column_id: 2,
1557 })
1558 .push_column_metadata(ColumnMetadata {
1559 column_schema: ColumnSchema::new(
1560 "ts",
1561 ConcreteDataType::timestamp_millisecond_datatype(),
1562 false,
1563 ),
1564 semantic_type: SemanticType::Timestamp,
1565 column_id: 5,
1566 })
1567 .primary_key(vec![
1568 ReservedColumnId::table_id(),
1569 ReservedColumnId::tsid(),
1570 1,
1571 3,
1572 ])
1573 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1574 Arc::new(builder.build().unwrap())
1575 }
1576
1577 fn build_test_sparse_pk_array(
1578 codec: &SparsePrimaryKeyCodec,
1579 pk_values_per_row: &[SparseTestRow],
1580 ) -> Arc<PrimaryKeyArray> {
1581 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1582 for row in pk_values_per_row {
1583 let values = vec![
1584 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1585 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1586 (1, ValueRef::String(&row.tag0)),
1587 (3, ValueRef::String(&row.tag1)),
1588 ];
1589
1590 let mut buffer = Vec::new();
1591 codec.encode_value_refs(&values, &mut buffer).unwrap();
1592 builder.append_value(&buffer);
1593 }
1594
1595 Arc::new(builder.finish())
1596 }
1597
1598 #[derive(Clone)]
1599 struct SparseTestRow {
1600 table_id: u32,
1601 tsid: u64,
1602 tag0: String,
1603 tag1: String,
1604 }
1605
1606 #[test]
1607 fn test_flat_read_format_convert_format_with_dense_encoding() {
1608 let metadata = build_test_region_metadata();
1609
1610 let column_ids: Vec<_> = metadata
1611 .column_metadatas
1612 .iter()
1613 .map(|c| c.column_id)
1614 .collect();
1615 let format = FlatReadFormat::new(
1616 metadata.clone(),
1617 column_ids.into_iter(),
1618 Some(6),
1619 "test",
1620 false,
1621 )
1622 .unwrap();
1623
1624 let num_rows = 4;
1625 let original_sequence = 100u64;
1626
1627 let pk_values_per_row = vec![
1629 &[Some(1i64), Some(1i64)][..]; num_rows ];
1631
1632 let codec = DensePrimaryKeyCodec::new(&metadata);
1634 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1635 let columns: Vec<ArrayRef> = vec![
1636 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1643
1644 let old_format_fields = vec![
1646 Field::new("field1", ArrowDataType::Int64, true),
1647 Field::new("field0", ArrowDataType::Int64, true),
1648 Field::new(
1649 "ts",
1650 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1651 false,
1652 ),
1653 Field::new(
1654 "__primary_key",
1655 ArrowDataType::Dictionary(
1656 Box::new(ArrowDataType::UInt32),
1657 Box::new(ArrowDataType::Binary),
1658 ),
1659 false,
1660 ),
1661 Field::new("__sequence", ArrowDataType::UInt64, false),
1662 Field::new("__op_type", ArrowDataType::UInt8, false),
1663 ];
1664 let old_schema = Arc::new(Schema::new(old_format_fields));
1665 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1666
1667 let result = format.convert_batch(record_batch, None).unwrap();
1669
1670 let expected_columns: Vec<ArrayRef> = vec![
1672 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1681 let expected_record_batch =
1682 RecordBatch::try_new(build_test_flat_sst_schema(), expected_columns).unwrap();
1683
1684 assert_eq!(expected_record_batch, result);
1686 }
1687
1688 #[test]
1689 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1690 let metadata = build_test_sparse_region_metadata();
1691
1692 let column_ids: Vec<_> = metadata
1693 .column_metadatas
1694 .iter()
1695 .map(|c| c.column_id)
1696 .collect();
1697 let format = FlatReadFormat::new(
1698 metadata.clone(),
1699 column_ids.clone().into_iter(),
1700 None,
1701 "test",
1702 false,
1703 )
1704 .unwrap();
1705
1706 let num_rows = 4;
1707 let original_sequence = 100u64;
1708
1709 let pk_test_rows = vec![
1711 SparseTestRow {
1712 table_id: 1,
1713 tsid: 123,
1714 tag0: "frontend".to_string(),
1715 tag1: "pod1".to_string(),
1716 };
1717 num_rows
1718 ];
1719
1720 let codec = SparsePrimaryKeyCodec::new(&metadata);
1721 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1722 let columns: Vec<ArrayRef> = vec![
1724 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1731
1732 let old_format_fields = vec![
1734 Field::new("field1", ArrowDataType::Int64, true),
1735 Field::new("field0", ArrowDataType::Int64, true),
1736 Field::new(
1737 "ts",
1738 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1739 false,
1740 ),
1741 Field::new(
1742 "__primary_key",
1743 ArrowDataType::Dictionary(
1744 Box::new(ArrowDataType::UInt32),
1745 Box::new(ArrowDataType::Binary),
1746 ),
1747 false,
1748 ),
1749 Field::new("__sequence", ArrowDataType::UInt64, false),
1750 Field::new("__op_type", ArrowDataType::UInt8, false),
1751 ];
1752 let old_schema = Arc::new(Schema::new(old_format_fields));
1753 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1754
1755 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1757
1758 let tag0_array = Arc::new(DictionaryArray::new(
1760 UInt32Array::from(vec![0; num_rows]),
1761 Arc::new(StringArray::from(vec!["frontend"])),
1762 ));
1763 let tag1_array = Arc::new(DictionaryArray::new(
1764 UInt32Array::from(vec![0; num_rows]),
1765 Arc::new(StringArray::from(vec!["pod1"])),
1766 ));
1767 let expected_columns: Vec<ArrayRef> = vec![
1768 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1779 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1780 let expected_record_batch =
1781 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1782
1783 assert_eq!(expected_record_batch, result);
1785
1786 let format =
1787 FlatReadFormat::new(metadata.clone(), column_ids.into_iter(), None, "test", true)
1788 .unwrap();
1789 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1791 assert_eq!(record_batch, result);
1792 }
1793
1794 #[test]
1795 fn test_convert_flat_batch() {
1796 let metadata = build_test_region_metadata();
1797 let write_format = PrimaryKeyWriteFormat::new(metadata);
1798
1799 let num_rows = 4;
1800 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1802 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1803
1804 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1806
1807 let expected_columns: Vec<ArrayRef> = vec![
1809 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1816 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1817
1818 assert_eq!(expected, result);
1819 }
1820
1821 #[test]
1822 fn test_convert_flat_batch_with_override_sequence() {
1823 let metadata = build_test_region_metadata();
1824 let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
1825
1826 let num_rows = 4;
1827 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1828 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1829
1830 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1831
1832 let expected_columns: Vec<ArrayRef> = vec![
1833 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![999; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1840 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1841
1842 assert_eq!(expected, result);
1843 }
1844
1845 #[test]
1846 fn test_convert_flat_batch_no_tags() {
1847 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1849 builder
1850 .push_column_metadata(ColumnMetadata {
1851 column_schema: ColumnSchema::new(
1852 "field0",
1853 ConcreteDataType::int64_datatype(),
1854 true,
1855 ),
1856 semantic_type: SemanticType::Field,
1857 column_id: 1,
1858 })
1859 .push_column_metadata(ColumnMetadata {
1860 column_schema: ColumnSchema::new(
1861 "ts",
1862 ConcreteDataType::timestamp_millisecond_datatype(),
1863 false,
1864 ),
1865 semantic_type: SemanticType::Timestamp,
1866 column_id: 2,
1867 });
1868 let metadata = Arc::new(builder.build().unwrap());
1869 let write_format = PrimaryKeyWriteFormat::new(metadata);
1870
1871 let num_rows = 3;
1872 let sst_schema = write_format.arrow_schema().clone();
1874 let columns: Vec<ArrayRef> = vec![
1875 Arc::new(Int64Array::from(vec![10; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), build_test_pk_array(&[(b"".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1881 let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
1882
1883 let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
1885 let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
1886
1887 assert_eq!(expected, result);
1888 }
1889}