1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::Helper;
43use mito_codec::row_converter::{
44 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields,
45};
46use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
47use parquet::file::statistics::Statistics;
48use snafu::{OptionExt, ResultExt, ensure};
49use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
50use store_api::storage::{ColumnId, SequenceNumber};
51
52use crate::error::{
53 ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
54};
55use crate::read::read_columns::ReadColumns;
56use crate::read::{Batch, BatchBuilder, BatchColumn};
57use crate::sst::file::{FileMeta, FileTimeRange};
58use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
59use crate::sst::to_sst_arrow_schema;
60
61pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
63pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
65
66pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
70pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
72
73pub(crate) struct PrimaryKeyWriteFormat {
75 arrow_schema: SchemaRef,
77 override_sequence: Option<SequenceNumber>,
78}
79
80impl PrimaryKeyWriteFormat {
81 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
83 let arrow_schema = to_sst_arrow_schema(&metadata);
84 PrimaryKeyWriteFormat {
85 arrow_schema,
86 override_sequence: None,
87 }
88 }
89
90 pub(crate) fn with_override_sequence(
92 mut self,
93 override_sequence: Option<SequenceNumber>,
94 ) -> Self {
95 self.override_sequence = override_sequence;
96 self
97 }
98
99 #[cfg(test)]
101 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
102 &self.arrow_schema
103 }
104
105 pub(crate) fn convert_flat_batch(
111 &self,
112 batch: &RecordBatch,
113 num_fields: usize,
114 ) -> Result<RecordBatch> {
115 let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
116 let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
117
118 if let Some(override_sequence) = self.override_sequence {
119 let num_cols = columns.len();
120 columns[num_cols - 2] =
122 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
123 }
124
125 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
126 }
127}
128
129pub(crate) fn column_values(
133 row_groups: &[impl Borrow<RowGroupMetaData>],
134 column: &ColumnMetadata,
135 column_index: usize,
136 is_min: bool,
137) -> Option<ArrayRef> {
138 let null_scalar: ScalarValue = column
139 .column_schema
140 .data_type
141 .as_arrow_type()
142 .try_into()
143 .ok()?;
144 let scalar_values = row_groups
145 .iter()
146 .map(|meta| {
147 let stats = meta.borrow().column(column_index).statistics()?;
148 match stats {
149 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
150 *s.min_opt()?
151 } else {
152 *s.max_opt()?
153 }))),
154 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
155 *s.min_opt()?
156 } else {
157 *s.max_opt()?
158 }))),
159 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
160 *s.min_opt()?
161 } else {
162 *s.max_opt()?
163 }))),
164 Statistics::Int96(_) => None,
165 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
166 *s.min_opt()?
167 } else {
168 *s.max_opt()?
169 }))),
170 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
171 *s.min_opt()?
172 } else {
173 *s.max_opt()?
174 }))),
175 Statistics::ByteArray(s) => {
176 let bytes = if is_min {
177 s.min_bytes_opt()?
178 } else {
179 s.max_bytes_opt()?
180 };
181 let s = String::from_utf8(bytes.to_vec()).ok();
182 Some(ScalarValue::Utf8(s))
183 }
184 Statistics::FixedLenByteArray(_) => None,
185 }
186 })
187 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
188 .collect::<Vec<ScalarValue>>();
189 debug_assert_eq!(scalar_values.len(), row_groups.len());
190 ScalarValue::iter_to_array(scalar_values).ok()
191}
192
193pub(crate) fn column_null_counts(
196 row_groups: &[impl Borrow<RowGroupMetaData>],
197 column_index: usize,
198) -> Option<ArrayRef> {
199 let values = row_groups.iter().map(|meta| {
200 let col = meta.borrow().column(column_index);
201 let stat = col.statistics()?;
202 stat.null_count_opt()
203 });
204 Some(Arc::new(UInt64Array::from_iter(values)))
205}
206
207pub struct PrimaryKeyReadFormat {
209 metadata: RegionMetadataRef,
211 arrow_schema: SchemaRef,
213 field_id_to_index: HashMap<ColumnId, usize>,
216 parquet_read_cols: ParquetReadColumns,
218 field_id_to_projected_index: HashMap<ColumnId, usize>,
221 primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
223}
224
225impl PrimaryKeyReadFormat {
226 pub fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> PrimaryKeyReadFormat {
228 let field_id_to_index: HashMap<_, _> = metadata
229 .field_columns()
230 .enumerate()
231 .map(|(index, column)| (column.column_id, index))
232 .collect();
233 let arrow_schema = to_sst_arrow_schema(&metadata);
234
235 let format_projection = FormatProjection::compute_format_projection(
236 &field_id_to_index,
237 arrow_schema.fields.len(),
238 read_cols,
239 );
240
241 PrimaryKeyReadFormat {
242 metadata,
243 arrow_schema,
244 field_id_to_index,
245 parquet_read_cols: format_projection.parquet_read_cols,
246 field_id_to_projected_index: format_projection.column_id_to_projected_index,
247 primary_key_codec: None,
248 }
249 }
250
251 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
256 &self.arrow_schema
257 }
258
259 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
261 &self.metadata
262 }
263
264 pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
265 &self.parquet_read_cols
266 }
267
268 pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
270 &self.field_id_to_projected_index
271 }
272
273 pub fn convert_record_batch(
278 &self,
279 record_batch: &RecordBatch,
280 override_sequence_array: Option<&ArrayRef>,
281 batches: &mut VecDeque<Batch>,
282 ) -> Result<()> {
283 debug_assert!(batches.is_empty());
284
285 ensure!(
287 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
288 InvalidRecordBatchSnafu {
289 reason: format!(
290 "record batch only has {} columns",
291 record_batch.num_columns()
292 ),
293 }
294 );
295
296 let mut fixed_pos_columns = record_batch
297 .columns()
298 .iter()
299 .rev()
300 .take(FIXED_POS_COLUMN_NUM);
301 let op_type_array = fixed_pos_columns.next().unwrap();
303 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
304 let pk_array = fixed_pos_columns.next().unwrap();
305 let ts_array = fixed_pos_columns.next().unwrap();
306 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
307
308 if let Some(override_array) = override_sequence_array {
310 assert!(override_array.len() >= sequence_array.len());
311 sequence_array = if override_array.len() > sequence_array.len() {
314 override_array.slice(0, sequence_array.len())
315 } else {
316 override_array.clone()
317 };
318 }
319
320 let pk_dict_array = pk_array
322 .as_any()
323 .downcast_ref::<PrimaryKeyArray>()
324 .with_context(|| InvalidRecordBatchSnafu {
325 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
326 })?;
327 let offsets = primary_key_offsets(pk_dict_array)?;
328 if offsets.is_empty() {
329 return Ok(());
330 }
331
332 let keys = pk_dict_array.keys();
334 let pk_values = pk_dict_array
335 .values()
336 .as_any()
337 .downcast_ref::<BinaryArray>()
338 .with_context(|| InvalidRecordBatchSnafu {
339 reason: format!(
340 "values of primary key array should not be {:?}",
341 pk_dict_array.values().data_type()
342 ),
343 })?;
344 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
345 let end = offsets[i + 1];
346 let rows_in_batch = end - start;
347 let dict_key = keys.value(*start);
348 let primary_key = pk_values.value(dict_key as usize).to_vec();
349
350 let mut builder = BatchBuilder::new(primary_key);
351 builder
352 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
353 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
354 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
355 for batch_column in &field_batch_columns {
357 builder.push_field(BatchColumn {
358 column_id: batch_column.column_id,
359 data: batch_column.data.slice(*start, rows_in_batch),
360 });
361 }
362
363 let mut batch = builder.build()?;
364 if let Some(codec) = &self.primary_key_codec {
365 let pk_values: CompositeValues =
366 codec.decode(batch.primary_key()).context(DecodeSnafu)?;
367 batch.set_pk_values(pk_values);
368 }
369 batches.push_back(batch);
370 }
371
372 Ok(())
373 }
374
375 pub fn min_values(
377 &self,
378 row_groups: &[impl Borrow<RowGroupMetaData>],
379 column_id: ColumnId,
380 ) -> StatValues {
381 let Some(column) = self.metadata.column_by_id(column_id) else {
382 return StatValues::NoColumn;
384 };
385 match column.semantic_type {
386 SemanticType::Tag => self.tag_values(row_groups, column, true),
387 SemanticType::Field => {
388 let index = self.field_id_to_index.get(&column_id).unwrap();
390 let stats = column_values(row_groups, column, *index, true);
391 StatValues::from_stats_opt(stats)
392 }
393 SemanticType::Timestamp => {
394 let index = self.time_index_position();
395 let stats = column_values(row_groups, column, index, true);
396 StatValues::from_stats_opt(stats)
397 }
398 }
399 }
400
401 pub fn max_values(
403 &self,
404 row_groups: &[impl Borrow<RowGroupMetaData>],
405 column_id: ColumnId,
406 ) -> StatValues {
407 let Some(column) = self.metadata.column_by_id(column_id) else {
408 return StatValues::NoColumn;
410 };
411 match column.semantic_type {
412 SemanticType::Tag => self.tag_values(row_groups, column, false),
413 SemanticType::Field => {
414 let index = self.field_id_to_index.get(&column_id).unwrap();
416 let stats = column_values(row_groups, column, *index, false);
417 StatValues::from_stats_opt(stats)
418 }
419 SemanticType::Timestamp => {
420 let index = self.time_index_position();
421 let stats = column_values(row_groups, column, index, false);
422 StatValues::from_stats_opt(stats)
423 }
424 }
425 }
426
427 pub fn null_counts(
429 &self,
430 row_groups: &[impl Borrow<RowGroupMetaData>],
431 column_id: ColumnId,
432 ) -> StatValues {
433 let Some(column) = self.metadata.column_by_id(column_id) else {
434 return StatValues::NoColumn;
436 };
437 match column.semantic_type {
438 SemanticType::Tag => StatValues::NoStats,
439 SemanticType::Field => {
440 let index = self.field_id_to_index.get(&column_id).unwrap();
442 let stats = column_null_counts(row_groups, *index);
443 StatValues::from_stats_opt(stats)
444 }
445 SemanticType::Timestamp => {
446 let index = self.time_index_position();
447 let stats = column_null_counts(row_groups, index);
448 StatValues::from_stats_opt(stats)
449 }
450 }
451 }
452
453 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
455 record_batch
456 .columns()
457 .iter()
458 .zip(record_batch.schema().fields())
459 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
461 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
462 let column = self
463 .metadata
464 .column_by_name(field.name())
465 .with_context(|| InvalidRecordBatchSnafu {
466 reason: format!("column {} not found in metadata", field.name()),
467 })?;
468
469 Ok(BatchColumn {
470 column_id: column.column_id,
471 data: vector,
472 })
473 })
474 .collect()
475 }
476
477 fn tag_values(
479 &self,
480 row_groups: &[impl Borrow<RowGroupMetaData>],
481 column: &ColumnMetadata,
482 is_min: bool,
483 ) -> StatValues {
484 let is_first_tag = self
485 .metadata
486 .primary_key
487 .first()
488 .map(|id| *id == column.column_id)
489 .unwrap_or(false);
490 if !is_first_tag {
491 return StatValues::NoStats;
493 }
494
495 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
496 }
497
498 fn first_tag_values(
501 &self,
502 row_groups: &[impl Borrow<RowGroupMetaData>],
503 column: &ColumnMetadata,
504 is_min: bool,
505 ) -> Option<ArrayRef> {
506 debug_assert!(
507 self.metadata
508 .primary_key
509 .first()
510 .map(|id| *id == column.column_id)
511 .unwrap_or(false)
512 );
513
514 let primary_key_encoding = self.metadata.primary_key_encoding;
515 let converter = build_primary_key_codec_with_fields(
516 primary_key_encoding,
517 [(
518 column.column_id,
519 SortField::new(column.column_schema.data_type.clone()),
520 )]
521 .into_iter(),
522 );
523
524 let values = row_groups.iter().map(|meta| {
525 let stats = meta
526 .borrow()
527 .column(self.primary_key_position())
528 .statistics()?;
529 match stats {
530 Statistics::Boolean(_) => None,
531 Statistics::Int32(_) => None,
532 Statistics::Int64(_) => None,
533 Statistics::Int96(_) => None,
534 Statistics::Float(_) => None,
535 Statistics::Double(_) => None,
536 Statistics::ByteArray(s) => {
537 let bytes = if is_min {
538 s.min_bytes_opt()?
539 } else {
540 s.max_bytes_opt()?
541 };
542 converter.decode_leftmost(bytes).ok()?
543 }
544 Statistics::FixedLenByteArray(_) => None,
545 }
546 });
547 let mut builder = column
548 .column_schema
549 .data_type
550 .create_mutable_vector(row_groups.len());
551 for value_opt in values {
552 match value_opt {
553 Some(v) => builder.push_value_ref(&v.as_value_ref()),
555 None => builder.push_null(),
556 }
557 }
558 let vector = builder.to_vector();
559
560 Some(vector.to_arrow_array())
561 }
562
563 fn primary_key_position(&self) -> usize {
565 self.arrow_schema.fields.len() - 3
566 }
567
568 fn time_index_position(&self) -> usize {
570 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
571 }
572
573 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
575 self.field_id_to_projected_index.get(&column_id).copied()
576 }
577}
578
579pub(crate) struct FormatProjection {
581 pub(crate) parquet_read_cols: ParquetReadColumns,
583 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
588}
589
590impl FormatProjection {
591 pub(crate) fn compute_format_projection(
595 id_to_index: &HashMap<ColumnId, usize>,
596 sst_column_num: usize,
597 cols: ReadColumns,
598 ) -> Self {
599 let mut projected_columns: Vec<_> = cols
600 .cols
601 .into_iter()
602 .filter_map(|col| {
603 id_to_index
604 .get(&col.column_id)
605 .copied()
606 .map(|index_of_sst| (col.column_id, index_of_sst, col.nested_paths))
607 })
608 .collect();
609 projected_columns.sort_unstable_by_key(|(_, index, _)| *index);
612
613 let mut parquet_read_cols: Vec<ParquetReadColumn> =
614 Vec::with_capacity(projected_columns.len() + FIXED_POS_COLUMN_NUM);
615 let mut column_id_to_projected_index = HashMap::with_capacity(projected_columns.len());
617
618 for (col_id, index_of_sst, nested_paths) in projected_columns {
619 Self::merge_or_push_parquet_column(&mut parquet_read_cols, index_of_sst, nested_paths);
620
621 column_id_to_projected_index
622 .entry(col_id)
623 .or_insert_with(|| parquet_read_cols.len() - 1);
624 }
625
626 Self::append_time_index_if_needed(&mut parquet_read_cols, sst_column_num);
629 Self::append_fixed_internal_columns(&mut parquet_read_cols, sst_column_num);
630
631 Self {
632 parquet_read_cols: ParquetReadColumns::from_deduped(parquet_read_cols),
633 column_id_to_projected_index,
634 }
635 }
636
637 fn merge_or_push_parquet_column(
638 parquet_read_cols: &mut Vec<ParquetReadColumn>,
639 index_of_sst: usize,
640 nested_paths: Vec<Vec<String>>,
641 ) {
642 if let Some(last_col) = parquet_read_cols.last_mut()
645 && last_col.root_index() == index_of_sst
646 {
647 last_col.merge_nested_paths(nested_paths);
648 return;
649 }
650
651 let parquet_col = ParquetReadColumn::new(index_of_sst).with_nested_paths(nested_paths);
652 parquet_read_cols.push(parquet_col);
653 }
654
655 fn append_time_index_if_needed(
656 parquet_read_cols: &mut Vec<ParquetReadColumn>,
657 sst_column_num: usize,
658 ) {
659 let time_index = sst_column_num - FIXED_POS_COLUMN_NUM;
660 let needs_time_index = parquet_read_cols
664 .last()
665 .map(|col| col.root_index() != time_index)
666 .unwrap_or(true);
667 if needs_time_index {
668 parquet_read_cols.push(ParquetReadColumn::new(time_index));
669 }
670 }
671
672 fn append_fixed_internal_columns(
675 parquet_read_cols: &mut Vec<ParquetReadColumn>,
676 sst_column_num: usize,
677 ) {
678 for index in sst_column_num - INTERNAL_COLUMN_NUM..sst_column_num {
679 parquet_read_cols.push(ParquetReadColumn::new(index));
680 }
681 }
682}
683
684pub enum StatValues {
689 Values(ArrayRef),
691 NoColumn,
693 NoStats,
695}
696
697impl StatValues {
698 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
700 match stats {
701 Some(stats) => StatValues::Values(stats),
702 None => StatValues::NoStats,
703 }
704 }
705}
706
707#[cfg(test)]
708impl PrimaryKeyReadFormat {
709 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
711 Self::new(
712 Arc::clone(&metadata),
713 ReadColumns::from_deduped_column_ids(
714 metadata.column_metadatas.iter().map(|c| c.column_id),
715 ),
716 )
717 }
718}
719
720pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
722 if pk_dict_array.is_empty() {
723 return Ok(Vec::new());
724 }
725
726 let mut offsets = vec![0];
728 let keys = pk_dict_array.keys();
729 let pk_indices = keys.values();
731 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
732 if *key != pk_indices[i + 1] {
734 offsets.push(i + 1);
736 }
737 }
738 offsets.push(keys.len());
739
740 Ok(offsets)
741}
742
743pub(crate) fn parquet_row_group_time_range(
746 file_meta: &FileMeta,
747 parquet_meta: &ParquetMetaData,
748 row_group_idx: usize,
749) -> Option<FileTimeRange> {
750 let row_group_meta = parquet_meta.row_group(row_group_idx);
751 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
752 assert!(
753 num_columns >= FIXED_POS_COLUMN_NUM,
754 "file only has {} columns",
755 num_columns
756 );
757 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
758
759 let stats = row_group_meta.column(time_index_pos).statistics()?;
760 let (min, max) = match stats {
762 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
763 Statistics::Int32(_)
764 | Statistics::Boolean(_)
765 | Statistics::Int96(_)
766 | Statistics::Float(_)
767 | Statistics::Double(_)
768 | Statistics::ByteArray(_)
769 | Statistics::FixedLenByteArray(_) => {
770 common_telemetry::warn!(
771 "Invalid statistics {:?} for time index in parquet in {}",
772 stats,
773 file_meta.file_id
774 );
775 return None;
776 }
777 };
778
779 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
780 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
781 let unit = file_meta.time_range.0.unit();
782
783 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
784}
785
786pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
789 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
790 if num_columns < FIXED_POS_COLUMN_NUM {
791 return false;
792 }
793
794 let sequence_pos = num_columns - 2;
796
797 for row_group in parquet_meta.row_groups() {
799 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
800 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
801 if *min_val != 0 || *max_val != 0 {
803 return false;
804 }
805 } else {
806 return false;
808 }
809 } else {
810 return false;
812 }
813 }
814
815 !parquet_meta.row_groups().is_empty()
817}
818
819#[cfg(test)]
820mod tests {
821 use std::sync::Arc;
822
823 use api::v1::OpType;
824 use datatypes::arrow::array::{
825 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
826 };
827 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
828 use datatypes::prelude::ConcreteDataType;
829 use datatypes::schema::ColumnSchema;
830 use datatypes::types::json_type::{JsonNativeType, JsonObjectType};
831 use datatypes::value::ValueRef;
832 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
833 use mito_codec::row_converter::{
834 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
835 };
836 use store_api::codec::PrimaryKeyEncoding;
837 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
838 use store_api::storage::RegionId;
839 use store_api::storage::consts::ReservedColumnId;
840
841 use super::*;
842 use crate::error::InvalidMetadataSnafu;
843 use crate::read::read_columns::ReadColumn;
844 use crate::sst::parquet::flat_format::{
845 FlatReadFormat, FlatWriteFormat, sequence_column_index, sst_column_id_indices,
846 };
847 use crate::sst::{
848 FlatSchemaOptions, OP_TYPE_PARQUET_FIELD_ID, PRIMARY_KEY_PARQUET_FIELD_ID,
849 SEQUENCE_PARQUET_FIELD_ID, to_flat_sst_arrow_schema, with_field_id,
850 };
851
852 const TEST_SEQUENCE: u64 = 1;
853 const TEST_OP_TYPE: u8 = OpType::Put as u8;
854
855 fn build_test_region_metadata() -> RegionMetadataRef {
856 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
857 builder
858 .push_column_metadata(ColumnMetadata {
859 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
860 semantic_type: SemanticType::Tag,
861 column_id: 1,
862 })
863 .push_column_metadata(ColumnMetadata {
864 column_schema: ColumnSchema::new(
865 "field1",
866 ConcreteDataType::int64_datatype(),
867 true,
868 ),
869 semantic_type: SemanticType::Field,
870 column_id: 4, })
872 .push_column_metadata(ColumnMetadata {
873 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
874 semantic_type: SemanticType::Tag,
875 column_id: 3,
876 })
877 .push_column_metadata(ColumnMetadata {
878 column_schema: ColumnSchema::new(
879 "field0",
880 ConcreteDataType::int64_datatype(),
881 true,
882 ),
883 semantic_type: SemanticType::Field,
884 column_id: 2,
885 })
886 .push_column_metadata(ColumnMetadata {
887 column_schema: ColumnSchema::new(
888 "ts",
889 ConcreteDataType::timestamp_millisecond_datatype(),
890 false,
891 ),
892 semantic_type: SemanticType::Timestamp,
893 column_id: 5,
894 })
895 .primary_key(vec![1, 3]);
896 Arc::new(builder.build().unwrap())
897 }
898
899 fn build_test_arrow_schema() -> SchemaRef {
900 let fields = vec![
901 make_field("field1", ArrowDataType::Int64, true, Some(4)),
902 make_field("field0", ArrowDataType::Int64, true, Some(2)),
903 make_field(
904 "ts",
905 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
906 false,
907 Some(5),
908 ),
909 make_field(
910 "__primary_key",
911 ArrowDataType::Dictionary(
912 Box::new(ArrowDataType::UInt32),
913 Box::new(ArrowDataType::Binary),
914 ),
915 false,
916 Some(PRIMARY_KEY_PARQUET_FIELD_ID),
917 ),
918 make_field(
919 "__sequence",
920 ArrowDataType::UInt64,
921 false,
922 Some(SEQUENCE_PARQUET_FIELD_ID),
923 ),
924 make_field(
925 "__op_type",
926 ArrowDataType::UInt8,
927 false,
928 Some(OP_TYPE_PARQUET_FIELD_ID),
929 ),
930 ];
931 Arc::new(Schema::new(fields))
932 }
933
934 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
935 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
936 }
937
938 fn new_batch_with_sequence(
939 primary_key: &[u8],
940 start_ts: i64,
941 start_field: i64,
942 num_rows: usize,
943 sequence: u64,
944 ) -> Batch {
945 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
946 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
947 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
948 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
949 let fields = vec![
950 BatchColumn {
951 column_id: 4,
952 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
953 }, BatchColumn {
955 column_id: 2,
956 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
957 }, ];
959
960 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
961 .with_fields(fields)
962 .build()
963 .unwrap()
964 }
965
966 #[test]
967 fn test_to_sst_arrow_schema() {
968 let metadata = build_test_region_metadata();
969 let write_format = PrimaryKeyWriteFormat::new(metadata);
970 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
971 }
972
973 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
974 let values = Arc::new(BinaryArray::from_iter_values(
975 pk_row_nums.iter().map(|v| &v.0),
976 ));
977 let mut keys = vec![];
978 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
979 keys.extend(std::iter::repeat_n(index as u32, num_rows));
980 }
981 let keys = UInt32Array::from(keys);
982 Arc::new(DictionaryArray::new(keys, values))
983 }
984
985 #[test]
986 fn test_projection_indices() {
987 let metadata = build_test_region_metadata();
988 let read_format =
990 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([3]));
991 assert_eq!(
992 &[2, 3, 4, 5],
993 read_format.parquet_read_columns().root_indices()
994 );
995 let read_format =
997 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([4]));
998 assert_eq!(
999 &[0, 2, 3, 4, 5],
1000 read_format.parquet_read_columns().root_indices()
1001 );
1002 let read_format =
1004 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([5]));
1005 assert_eq!(
1006 &[2, 3, 4, 5],
1007 read_format.parquet_read_columns().root_indices()
1008 );
1009 let read_format =
1011 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids([2, 1, 5]));
1012 assert_eq!(
1013 &[1, 2, 3, 4, 5],
1014 read_format.parquet_read_columns().root_indices()
1015 );
1016 }
1017
1018 #[test]
1019 fn test_format_projection_preserves_nested_paths() -> Result<()> {
1020 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1021 builder
1022 .push_column_metadata(ColumnMetadata {
1023 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1024 semantic_type: SemanticType::Tag,
1025 column_id: 1,
1026 })
1027 .push_column_metadata(ColumnMetadata {
1028 column_schema: ColumnSchema::new(
1029 "j",
1030 ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::from([
1031 ("a".to_string(), JsonNativeType::i64()),
1032 ("b".to_string(), JsonNativeType::String),
1033 ]))),
1034 true,
1035 ),
1036 semantic_type: SemanticType::Field,
1037 column_id: 4,
1038 })
1039 .push_column_metadata(ColumnMetadata {
1040 column_schema: ColumnSchema::new(
1041 "ts",
1042 ConcreteDataType::timestamp_millisecond_datatype(),
1043 false,
1044 ),
1045 semantic_type: SemanticType::Timestamp,
1046 column_id: 5,
1047 })
1048 .primary_key(vec![1]);
1049 let metadata = Arc::new(builder.build().context(InvalidMetadataSnafu)?);
1050 let column_id_to_parquet_index = sst_column_id_indices(&metadata);
1051 let projection = FormatProjection::compute_format_projection(
1052 &column_id_to_parquet_index,
1053 metadata.column_metadatas.len() + FIXED_POS_COLUMN_NUM,
1054 ReadColumns {
1055 cols: vec![ReadColumn::new(
1056 4,
1057 vec![vec!["j".to_string(), "a".to_string()]],
1058 )],
1059 },
1060 );
1061
1062 let columns = projection.parquet_read_cols.columns();
1063 assert_eq!(1, columns[0].root_index());
1064 assert_eq!(
1065 &[vec!["j".to_string(), "a".to_string()]],
1066 columns[0].nested_paths()
1067 );
1068 Ok(())
1069 }
1070
1071 #[test]
1072 fn test_empty_primary_key_offsets() {
1073 let array = build_test_pk_array(&[]);
1074 assert!(primary_key_offsets(&array).unwrap().is_empty());
1075 }
1076
1077 #[test]
1078 fn test_primary_key_offsets_one_series() {
1079 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1080 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1081
1082 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1083 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1084
1085 let array = build_test_pk_array(&[
1086 (b"one".to_vec(), 1),
1087 (b"two".to_vec(), 1),
1088 (b"three".to_vec(), 1),
1089 ]);
1090 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1091 }
1092
1093 #[test]
1094 fn test_primary_key_offsets_multi_series() {
1095 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1096 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1097
1098 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1099 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1100
1101 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1102 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1103 }
1104
1105 #[test]
1106 fn test_convert_empty_record_batch() {
1107 let metadata = build_test_region_metadata();
1108 let arrow_schema = build_test_arrow_schema();
1109 let column_ids: Vec<_> = metadata
1110 .column_metadatas
1111 .iter()
1112 .map(|col| col.column_id)
1113 .collect();
1114 let read_format =
1115 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1116 assert_eq!(arrow_schema, *read_format.arrow_schema());
1117
1118 let record_batch = RecordBatch::new_empty(arrow_schema);
1119 let mut batches = VecDeque::new();
1120 read_format
1121 .convert_record_batch(&record_batch, None, &mut batches)
1122 .unwrap();
1123 assert!(batches.is_empty());
1124 }
1125
1126 #[test]
1127 fn test_convert_record_batch() {
1128 let metadata = build_test_region_metadata();
1129 let column_ids: Vec<_> = metadata
1130 .column_metadatas
1131 .iter()
1132 .map(|col| col.column_id)
1133 .collect();
1134 let read_format =
1135 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1136
1137 let columns: Vec<ArrayRef> = vec![
1138 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1145 let arrow_schema = build_test_arrow_schema();
1146 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1147 let mut batches = VecDeque::new();
1148 read_format
1149 .convert_record_batch(&record_batch, None, &mut batches)
1150 .unwrap();
1151
1152 assert_eq!(
1153 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1154 batches.into_iter().collect::<Vec<_>>(),
1155 );
1156 }
1157
1158 #[test]
1159 fn test_convert_record_batch_with_override_sequence() {
1160 let metadata = build_test_region_metadata();
1161 let read_format = PrimaryKeyReadFormat::new(
1162 metadata.clone(),
1163 ReadColumns::from_deduped_column_ids(
1164 metadata.column_metadatas.iter().map(|c| c.column_id),
1165 ),
1166 );
1167
1168 let columns: Vec<ArrayRef> = vec![
1169 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1176 let arrow_schema = build_test_arrow_schema();
1177 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1178
1179 let override_sequence: u64 = 12345;
1181 let override_sequence_array: ArrayRef =
1182 Arc::new(UInt64Array::from_value(override_sequence, 4));
1183
1184 let mut batches = VecDeque::new();
1185 read_format
1186 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1187 .unwrap();
1188
1189 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1191 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1192
1193 assert_eq!(
1194 vec![expected_batch1, expected_batch2],
1195 batches.into_iter().collect::<Vec<_>>(),
1196 );
1197 }
1198
1199 fn make_field(name: &str, dt: ArrowDataType, nullable: bool, field_id: Option<u32>) -> Field {
1200 let mut field = Field::new(name, dt, nullable);
1201 if let Some(id) = field_id {
1202 field = with_field_id(field, id);
1203 }
1204 field
1205 }
1206
1207 fn build_test_flat_sst_schema() -> SchemaRef {
1208 let fields = vec![
1209 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1211 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1213 Field::new(
1214 "ts",
1215 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1216 false,
1217 ),
1218 Field::new(
1219 "__primary_key",
1220 ArrowDataType::Dictionary(
1221 Box::new(ArrowDataType::UInt32),
1222 Box::new(ArrowDataType::Binary),
1223 ),
1224 false,
1225 ),
1226 Field::new("__sequence", ArrowDataType::UInt64, false),
1227 Field::new("__op_type", ArrowDataType::UInt8, false),
1228 ];
1229 Arc::new(Schema::new(fields))
1230 }
1231
1232 fn build_test_flat_sst_schema_with_field_ids() -> SchemaRef {
1233 let ids = [
1234 Some(1u32),
1235 Some(3),
1236 Some(4),
1237 Some(2),
1238 Some(5),
1239 Some(PRIMARY_KEY_PARQUET_FIELD_ID),
1240 Some(SEQUENCE_PARQUET_FIELD_ID),
1241 Some(OP_TYPE_PARQUET_FIELD_ID),
1242 ];
1243 let fields: Vec<_> = build_test_flat_sst_schema()
1244 .fields()
1245 .iter()
1246 .zip(ids)
1247 .map(|(f, id)| match id {
1248 Some(id) => Arc::new(with_field_id((**f).clone(), id)) as _,
1249 None => f.clone(),
1250 })
1251 .collect();
1252 Arc::new(Schema::new(fields))
1253 }
1254
1255 #[test]
1256 fn test_flat_to_sst_arrow_schema() {
1257 let metadata = build_test_region_metadata();
1258 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1259 assert_eq!(
1260 &build_test_flat_sst_schema_with_field_ids(),
1261 format.arrow_schema()
1262 );
1263 }
1264
1265 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1266 vec![
1267 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1276 }
1277
1278 #[test]
1279 fn test_flat_convert_batch() {
1280 let metadata = build_test_region_metadata();
1281 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1282
1283 let num_rows = 4;
1284 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1285 let batch =
1286 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns.clone())
1287 .unwrap();
1288 let expect_record =
1289 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1290
1291 let actual = format.convert_batch(&batch).unwrap();
1292 assert_eq!(expect_record, actual);
1293 }
1294
1295 #[test]
1296 fn test_flat_convert_with_override_sequence() {
1297 let metadata = build_test_region_metadata();
1298 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1299 .with_override_sequence(Some(415411));
1300
1301 let num_rows = 4;
1302 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1303 let batch =
1304 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1305
1306 let expected_columns: Vec<ArrayRef> = vec![
1307 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1316 let expected_record = RecordBatch::try_new(
1317 build_test_flat_sst_schema_with_field_ids(),
1318 expected_columns,
1319 )
1320 .unwrap();
1321
1322 let actual = format.convert_batch(&batch).unwrap();
1323 assert_eq!(expected_record, actual);
1324 }
1325
1326 #[test]
1327 fn test_flat_projection_indices() {
1328 let metadata = build_test_region_metadata();
1329 let read_format = FlatReadFormat::new(
1334 metadata.clone(),
1335 ReadColumns::from_deduped_column_ids([3]),
1336 None,
1337 "test",
1338 false,
1339 )
1340 .unwrap();
1341 assert_eq!(
1342 &[1, 4, 5, 6, 7],
1343 read_format.parquet_read_columns().root_indices()
1344 );
1345
1346 let read_format = FlatReadFormat::new(
1348 metadata.clone(),
1349 ReadColumns::from_deduped_column_ids([4]),
1350 None,
1351 "test",
1352 false,
1353 )
1354 .unwrap();
1355 assert_eq!(
1356 &[2, 4, 5, 6, 7],
1357 read_format.parquet_read_columns().root_indices()
1358 );
1359
1360 let read_format = FlatReadFormat::new(
1362 metadata.clone(),
1363 ReadColumns::from_deduped_column_ids([5]),
1364 None,
1365 "test",
1366 false,
1367 )
1368 .unwrap();
1369 assert_eq!(
1370 &[4, 5, 6, 7],
1371 read_format.parquet_read_columns().root_indices()
1372 );
1373
1374 let read_format = FlatReadFormat::new(
1376 metadata,
1377 ReadColumns::from_deduped_column_ids([2, 1, 5]),
1378 None,
1379 "test",
1380 false,
1381 )
1382 .unwrap();
1383 assert_eq!(
1384 &[0, 3, 4, 5, 6, 7],
1385 read_format.parquet_read_columns().root_indices()
1386 );
1387 }
1388
1389 #[test]
1390 fn test_flat_read_format_convert_batch() {
1391 let metadata = build_test_region_metadata();
1392 let mut format = FlatReadFormat::new(
1393 metadata,
1394 ReadColumns::from_deduped_column_ids(std::iter::once(1)), Some(build_test_flat_sst_schema()),
1396 "test",
1397 false,
1398 )
1399 .unwrap();
1400
1401 let num_rows = 4;
1402 let original_sequence = 100u64;
1403 let override_sequence = 200u64;
1404
1405 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1407 let mut test_columns = columns.clone();
1408 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1410 let record_batch =
1411 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1412
1413 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1415 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1416 let sequence_array = sequence_column
1417 .as_any()
1418 .downcast_ref::<UInt64Array>()
1419 .unwrap();
1420
1421 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1422 assert_eq!(sequence_array, &expected_original);
1423
1424 format.set_override_sequence(Some(override_sequence));
1426 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1427 let result = format
1428 .convert_batch(record_batch, Some(&override_sequence_array))
1429 .unwrap();
1430 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1431 let sequence_array = sequence_column
1432 .as_any()
1433 .downcast_ref::<UInt64Array>()
1434 .unwrap();
1435
1436 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1437 assert_eq!(sequence_array, &expected_override);
1438 }
1439
1440 #[test]
1441 fn test_need_convert_to_flat() {
1442 let metadata = build_test_region_metadata();
1443
1444 let expected_columns = metadata.column_metadatas.len() + 3;
1447 let result =
1448 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1449 assert!(
1450 !result,
1451 "Should not need conversion when column counts match"
1452 );
1453
1454 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1457 let result =
1458 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1459 .unwrap();
1460 assert!(
1461 result,
1462 "Should need conversion when primary key columns are missing"
1463 );
1464
1465 let too_many_columns = expected_columns + 1;
1467 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1468 .unwrap_err();
1469 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1470
1471 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1474 .unwrap_err();
1475 assert!(
1476 err.to_string().contains("Column number difference"),
1477 "{err:?}"
1478 );
1479 }
1480
1481 fn build_test_dense_pk_array(
1482 codec: &DensePrimaryKeyCodec,
1483 pk_values_per_row: &[&[Option<i64>]],
1484 ) -> Arc<PrimaryKeyArray> {
1485 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1486
1487 for pk_values_row in pk_values_per_row {
1488 let values: Vec<ValueRef> = pk_values_row
1489 .iter()
1490 .map(|opt| match opt {
1491 Some(val) => ValueRef::Int64(*val),
1492 None => ValueRef::Null,
1493 })
1494 .collect();
1495
1496 let encoded = codec.encode(values.into_iter()).unwrap();
1497 builder.append_value(&encoded);
1498 }
1499
1500 Arc::new(builder.finish())
1501 }
1502
1503 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1504 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1505 builder
1506 .push_column_metadata(ColumnMetadata {
1507 column_schema: ColumnSchema::new(
1508 "__table_id",
1509 ConcreteDataType::uint32_datatype(),
1510 false,
1511 ),
1512 semantic_type: SemanticType::Tag,
1513 column_id: ReservedColumnId::table_id(),
1514 })
1515 .push_column_metadata(ColumnMetadata {
1516 column_schema: ColumnSchema::new(
1517 "__tsid",
1518 ConcreteDataType::uint64_datatype(),
1519 false,
1520 ),
1521 semantic_type: SemanticType::Tag,
1522 column_id: ReservedColumnId::tsid(),
1523 })
1524 .push_column_metadata(ColumnMetadata {
1525 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1526 semantic_type: SemanticType::Tag,
1527 column_id: 1,
1528 })
1529 .push_column_metadata(ColumnMetadata {
1530 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1531 semantic_type: SemanticType::Tag,
1532 column_id: 3,
1533 })
1534 .push_column_metadata(ColumnMetadata {
1535 column_schema: ColumnSchema::new(
1536 "field1",
1537 ConcreteDataType::int64_datatype(),
1538 true,
1539 ),
1540 semantic_type: SemanticType::Field,
1541 column_id: 4,
1542 })
1543 .push_column_metadata(ColumnMetadata {
1544 column_schema: ColumnSchema::new(
1545 "field0",
1546 ConcreteDataType::int64_datatype(),
1547 true,
1548 ),
1549 semantic_type: SemanticType::Field,
1550 column_id: 2,
1551 })
1552 .push_column_metadata(ColumnMetadata {
1553 column_schema: ColumnSchema::new(
1554 "ts",
1555 ConcreteDataType::timestamp_millisecond_datatype(),
1556 false,
1557 ),
1558 semantic_type: SemanticType::Timestamp,
1559 column_id: 5,
1560 })
1561 .primary_key(vec![
1562 ReservedColumnId::table_id(),
1563 ReservedColumnId::tsid(),
1564 1,
1565 3,
1566 ])
1567 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1568 Arc::new(builder.build().unwrap())
1569 }
1570
1571 fn build_test_sparse_pk_array(
1572 codec: &SparsePrimaryKeyCodec,
1573 pk_values_per_row: &[SparseTestRow],
1574 ) -> Arc<PrimaryKeyArray> {
1575 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1576 for row in pk_values_per_row {
1577 let values = vec![
1578 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1579 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1580 (1, ValueRef::String(&row.tag0)),
1581 (3, ValueRef::String(&row.tag1)),
1582 ];
1583
1584 let mut buffer = Vec::new();
1585 codec.encode_value_refs(&values, &mut buffer).unwrap();
1586 builder.append_value(&buffer);
1587 }
1588
1589 Arc::new(builder.finish())
1590 }
1591
1592 #[derive(Clone)]
1593 struct SparseTestRow {
1594 table_id: u32,
1595 tsid: u64,
1596 tag0: String,
1597 tag1: String,
1598 }
1599
1600 #[test]
1601 fn test_flat_read_format_convert_format_with_dense_encoding() {
1602 let metadata = build_test_region_metadata();
1603
1604 let column_ids: Vec<_> = metadata
1605 .column_metadatas
1606 .iter()
1607 .map(|c| c.column_id)
1608 .collect();
1609 let format = FlatReadFormat::new(
1610 metadata.clone(),
1611 ReadColumns::from_deduped_column_ids(column_ids),
1612 Some(build_test_arrow_schema()),
1613 "test",
1614 false,
1615 )
1616 .unwrap();
1617
1618 let num_rows = 4;
1619 let original_sequence = 100u64;
1620
1621 let pk_values_per_row = vec![
1623 &[Some(1i64), Some(1i64)][..]; num_rows ];
1625
1626 let codec = DensePrimaryKeyCodec::new(&metadata);
1628 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1629 let columns: Vec<ArrayRef> = vec![
1630 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1637
1638 let old_schema = build_test_arrow_schema();
1640 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1641
1642 let result = format.convert_batch(record_batch, None).unwrap();
1644
1645 let expected_columns: Vec<ArrayRef> = vec![
1647 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1656 let expected_record_batch = RecordBatch::try_new(
1657 build_test_flat_sst_schema_with_field_ids(),
1658 expected_columns,
1659 )
1660 .unwrap();
1661
1662 assert_eq!(expected_record_batch, result);
1664 }
1665
1666 #[test]
1667 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1668 let metadata = build_test_sparse_region_metadata();
1669
1670 let column_ids: Vec<_> = metadata
1671 .column_metadatas
1672 .iter()
1673 .map(|c| c.column_id)
1674 .collect();
1675 let format = FlatReadFormat::new(
1676 metadata.clone(),
1677 ReadColumns::from_deduped_column_ids(column_ids.clone()),
1678 None,
1679 "test",
1680 false,
1681 )
1682 .unwrap();
1683
1684 let num_rows = 4;
1685 let original_sequence = 100u64;
1686
1687 let pk_test_rows = vec![
1689 SparseTestRow {
1690 table_id: 1,
1691 tsid: 123,
1692 tag0: "frontend".to_string(),
1693 tag1: "pod1".to_string(),
1694 };
1695 num_rows
1696 ];
1697
1698 let codec = SparsePrimaryKeyCodec::new(&metadata);
1699 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1700 let columns: Vec<ArrayRef> = vec![
1702 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1709
1710 let old_schema = build_test_arrow_schema();
1712 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1713
1714 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1716
1717 let tag0_array = Arc::new(DictionaryArray::new(
1719 UInt32Array::from(vec![0; num_rows]),
1720 Arc::new(StringArray::from(vec!["frontend"])),
1721 ));
1722 let tag1_array = Arc::new(DictionaryArray::new(
1723 UInt32Array::from(vec![0; num_rows]),
1724 Arc::new(StringArray::from(vec!["pod1"])),
1725 ));
1726 let expected_columns: Vec<ArrayRef> = vec![
1727 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1738 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1739 let expected_record_batch =
1740 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1741
1742 assert_eq!(expected_record_batch, result);
1744
1745 let format = FlatReadFormat::new(
1746 metadata.clone(),
1747 ReadColumns::from_deduped_column_ids(column_ids),
1748 None,
1749 "test",
1750 true,
1751 )
1752 .unwrap();
1753 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1755 assert_eq!(record_batch, result);
1756 }
1757
1758 #[test]
1759 fn test_convert_flat_batch() {
1760 let metadata = build_test_region_metadata();
1761 let write_format = PrimaryKeyWriteFormat::new(metadata);
1762
1763 let num_rows = 4;
1764 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1766 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1767
1768 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1770
1771 let expected_columns: Vec<ArrayRef> = vec![
1773 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1780 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1781
1782 assert_eq!(expected, result);
1783 }
1784
1785 #[test]
1786 fn test_convert_flat_batch_with_override_sequence() {
1787 let metadata = build_test_region_metadata();
1788 let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
1789
1790 let num_rows = 4;
1791 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1792 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1793
1794 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1795
1796 let expected_columns: Vec<ArrayRef> = vec![
1797 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![999; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1804 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1805
1806 assert_eq!(expected, result);
1807 }
1808
1809 #[test]
1810 fn test_convert_flat_batch_no_tags() {
1811 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1813 builder
1814 .push_column_metadata(ColumnMetadata {
1815 column_schema: ColumnSchema::new(
1816 "field0",
1817 ConcreteDataType::int64_datatype(),
1818 true,
1819 ),
1820 semantic_type: SemanticType::Field,
1821 column_id: 1,
1822 })
1823 .push_column_metadata(ColumnMetadata {
1824 column_schema: ColumnSchema::new(
1825 "ts",
1826 ConcreteDataType::timestamp_millisecond_datatype(),
1827 false,
1828 ),
1829 semantic_type: SemanticType::Timestamp,
1830 column_id: 2,
1831 });
1832 let metadata = Arc::new(builder.build().unwrap());
1833 let write_format = PrimaryKeyWriteFormat::new(metadata);
1834
1835 let num_rows = 3;
1836 let sst_schema = write_format.arrow_schema().clone();
1838 let columns: Vec<ArrayRef> = vec![
1839 Arc::new(Int64Array::from(vec![10; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), build_test_pk_array(&[(b"".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1845 let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
1846
1847 let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
1849 let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
1850
1851 assert_eq!(expected, result);
1852 }
1853}