1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{
37 ArrayRef, BinaryArray, BinaryDictionaryBuilder, DictionaryArray, UInt64Array,
38};
39use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
40use datatypes::arrow::record_batch::RecordBatch;
41use datatypes::prelude::DataType;
42use datatypes::vectors::Helper;
43use mito_codec::row_converter::{
44 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields,
45};
46use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
47use parquet::file::statistics::Statistics;
48use snafu::{OptionExt, ResultExt, ensure};
49use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
50use store_api::storage::{ColumnId, SequenceNumber};
51
52use crate::error::{
53 ConvertVectorSnafu, DecodeSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
54};
55use crate::read::read_columns::ReadColumns;
56use crate::read::{Batch, BatchBuilder, BatchColumn};
57use crate::sst::file::{FileMeta, FileTimeRange};
58use crate::sst::parquet::read_columns::{ParquetReadColumn, ParquetReadColumns};
59use crate::sst::to_sst_arrow_schema;
60
61pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
63pub(crate) type PrimaryKeyArrayBuilder = BinaryDictionaryBuilder<UInt32Type>;
65
66pub(crate) const FIXED_POS_COLUMN_NUM: usize = 4;
70pub(crate) const INTERNAL_COLUMN_NUM: usize = 3;
72
73pub(crate) struct PrimaryKeyWriteFormat {
75 arrow_schema: SchemaRef,
77 override_sequence: Option<SequenceNumber>,
78}
79
80impl PrimaryKeyWriteFormat {
81 pub(crate) fn new(metadata: RegionMetadataRef) -> PrimaryKeyWriteFormat {
83 let arrow_schema = to_sst_arrow_schema(&metadata);
84 PrimaryKeyWriteFormat {
85 arrow_schema,
86 override_sequence: None,
87 }
88 }
89
90 pub(crate) fn with_override_sequence(
92 mut self,
93 override_sequence: Option<SequenceNumber>,
94 ) -> Self {
95 self.override_sequence = override_sequence;
96 self
97 }
98
99 #[cfg(test)]
101 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
102 &self.arrow_schema
103 }
104
105 pub(crate) fn convert_flat_batch(
111 &self,
112 batch: &RecordBatch,
113 num_fields: usize,
114 ) -> Result<RecordBatch> {
115 let num_tag_columns = batch.num_columns() - num_fields - FIXED_POS_COLUMN_NUM;
116 let mut columns: Vec<ArrayRef> = batch.columns()[num_tag_columns..].to_vec();
117
118 if let Some(override_sequence) = self.override_sequence {
119 let num_cols = columns.len();
120 columns[num_cols - 2] =
122 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
123 }
124
125 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
126 }
127}
128
129pub(crate) fn column_values(
133 row_groups: &[impl Borrow<RowGroupMetaData>],
134 column: &ColumnMetadata,
135 column_index: usize,
136 is_min: bool,
137) -> Option<ArrayRef> {
138 let null_scalar: ScalarValue = column
139 .column_schema
140 .data_type
141 .as_arrow_type()
142 .try_into()
143 .ok()?;
144 let scalar_values = row_groups
145 .iter()
146 .map(|meta| {
147 let stats = meta.borrow().column(column_index).statistics()?;
148 match stats {
149 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
150 *s.min_opt()?
151 } else {
152 *s.max_opt()?
153 }))),
154 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
155 *s.min_opt()?
156 } else {
157 *s.max_opt()?
158 }))),
159 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
160 *s.min_opt()?
161 } else {
162 *s.max_opt()?
163 }))),
164 Statistics::Int96(_) => None,
165 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
166 *s.min_opt()?
167 } else {
168 *s.max_opt()?
169 }))),
170 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
171 *s.min_opt()?
172 } else {
173 *s.max_opt()?
174 }))),
175 Statistics::ByteArray(s) => {
176 let bytes = if is_min {
177 s.min_bytes_opt()?
178 } else {
179 s.max_bytes_opt()?
180 };
181 let s = String::from_utf8(bytes.to_vec()).ok();
182 Some(ScalarValue::Utf8(s))
183 }
184 Statistics::FixedLenByteArray(_) => None,
185 }
186 })
187 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
188 .collect::<Vec<ScalarValue>>();
189 debug_assert_eq!(scalar_values.len(), row_groups.len());
190 ScalarValue::iter_to_array(scalar_values).ok()
191}
192
193pub(crate) fn column_null_counts(
196 row_groups: &[impl Borrow<RowGroupMetaData>],
197 column_index: usize,
198) -> Option<ArrayRef> {
199 let values = row_groups.iter().map(|meta| {
200 let col = meta.borrow().column(column_index);
201 let stat = col.statistics()?;
202 stat.null_count_opt()
203 });
204 Some(Arc::new(UInt64Array::from_iter(values)))
205}
206
207pub struct PrimaryKeyReadFormat {
209 metadata: RegionMetadataRef,
211 arrow_schema: SchemaRef,
213 field_id_to_index: HashMap<ColumnId, usize>,
216 parquet_read_cols: ParquetReadColumns,
218 field_id_to_projected_index: HashMap<ColumnId, usize>,
221 primary_key_codec: Option<Arc<dyn PrimaryKeyCodec>>,
223}
224
225impl PrimaryKeyReadFormat {
226 pub fn new(metadata: RegionMetadataRef, read_cols: ReadColumns) -> PrimaryKeyReadFormat {
228 let field_id_to_index: HashMap<_, _> = metadata
229 .field_columns()
230 .enumerate()
231 .map(|(index, column)| (column.column_id, index))
232 .collect();
233 let arrow_schema = to_sst_arrow_schema(&metadata);
234
235 let format_projection = FormatProjection::compute_format_projection(
236 &field_id_to_index,
237 arrow_schema.fields.len(),
238 read_cols,
239 );
240
241 PrimaryKeyReadFormat {
242 metadata,
243 arrow_schema,
244 field_id_to_index,
245 parquet_read_cols: format_projection.parquet_read_cols,
246 field_id_to_projected_index: format_projection.column_id_to_projected_index,
247 primary_key_codec: None,
248 }
249 }
250
251 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
256 &self.arrow_schema
257 }
258
259 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
261 &self.metadata
262 }
263
264 pub(crate) fn parquet_read_columns(&self) -> &ParquetReadColumns {
265 &self.parquet_read_cols
266 }
267
268 pub(crate) fn field_id_to_projected_index(&self) -> &HashMap<ColumnId, usize> {
270 &self.field_id_to_projected_index
271 }
272
273 pub fn convert_record_batch(
278 &self,
279 record_batch: &RecordBatch,
280 override_sequence_array: Option<&ArrayRef>,
281 batches: &mut VecDeque<Batch>,
282 ) -> Result<()> {
283 debug_assert!(batches.is_empty());
284
285 ensure!(
287 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
288 InvalidRecordBatchSnafu {
289 reason: format!(
290 "record batch only has {} columns",
291 record_batch.num_columns()
292 ),
293 }
294 );
295
296 let mut fixed_pos_columns = record_batch
297 .columns()
298 .iter()
299 .rev()
300 .take(FIXED_POS_COLUMN_NUM);
301 let op_type_array = fixed_pos_columns.next().unwrap();
303 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
304 let pk_array = fixed_pos_columns.next().unwrap();
305 let ts_array = fixed_pos_columns.next().unwrap();
306 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
307
308 if let Some(override_array) = override_sequence_array {
310 assert!(override_array.len() >= sequence_array.len());
311 sequence_array = if override_array.len() > sequence_array.len() {
314 override_array.slice(0, sequence_array.len())
315 } else {
316 override_array.clone()
317 };
318 }
319
320 let pk_dict_array = pk_array
322 .as_any()
323 .downcast_ref::<PrimaryKeyArray>()
324 .with_context(|| InvalidRecordBatchSnafu {
325 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
326 })?;
327 let offsets = primary_key_offsets(pk_dict_array)?;
328 if offsets.is_empty() {
329 return Ok(());
330 }
331
332 let keys = pk_dict_array.keys();
334 let pk_values = pk_dict_array
335 .values()
336 .as_any()
337 .downcast_ref::<BinaryArray>()
338 .with_context(|| InvalidRecordBatchSnafu {
339 reason: format!(
340 "values of primary key array should not be {:?}",
341 pk_dict_array.values().data_type()
342 ),
343 })?;
344 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
345 let end = offsets[i + 1];
346 let rows_in_batch = end - start;
347 let dict_key = keys.value(*start);
348 let primary_key = pk_values.value(dict_key as usize).to_vec();
349
350 let mut builder = BatchBuilder::new(primary_key);
351 builder
352 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
353 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
354 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
355 for batch_column in &field_batch_columns {
357 builder.push_field(BatchColumn {
358 column_id: batch_column.column_id,
359 data: batch_column.data.slice(*start, rows_in_batch),
360 });
361 }
362
363 let mut batch = builder.build()?;
364 if let Some(codec) = &self.primary_key_codec {
365 let pk_values: CompositeValues =
366 codec.decode(batch.primary_key()).context(DecodeSnafu)?;
367 batch.set_pk_values(pk_values);
368 }
369 batches.push_back(batch);
370 }
371
372 Ok(())
373 }
374
375 pub fn min_values(
377 &self,
378 row_groups: &[impl Borrow<RowGroupMetaData>],
379 column_id: ColumnId,
380 ) -> StatValues {
381 let Some(column) = self.metadata.column_by_id(column_id) else {
382 return StatValues::NoColumn;
384 };
385 match column.semantic_type {
386 SemanticType::Tag => self.tag_values(row_groups, column, true),
387 SemanticType::Field => {
388 let index = self.field_id_to_index.get(&column_id).unwrap();
390 let stats = column_values(row_groups, column, *index, true);
391 StatValues::from_stats_opt(stats)
392 }
393 SemanticType::Timestamp => {
394 let index = self.time_index_position();
395 let stats = column_values(row_groups, column, index, true);
396 StatValues::from_stats_opt(stats)
397 }
398 }
399 }
400
401 pub fn max_values(
403 &self,
404 row_groups: &[impl Borrow<RowGroupMetaData>],
405 column_id: ColumnId,
406 ) -> StatValues {
407 let Some(column) = self.metadata.column_by_id(column_id) else {
408 return StatValues::NoColumn;
410 };
411 match column.semantic_type {
412 SemanticType::Tag => self.tag_values(row_groups, column, false),
413 SemanticType::Field => {
414 let index = self.field_id_to_index.get(&column_id).unwrap();
416 let stats = column_values(row_groups, column, *index, false);
417 StatValues::from_stats_opt(stats)
418 }
419 SemanticType::Timestamp => {
420 let index = self.time_index_position();
421 let stats = column_values(row_groups, column, index, false);
422 StatValues::from_stats_opt(stats)
423 }
424 }
425 }
426
427 pub fn null_counts(
429 &self,
430 row_groups: &[impl Borrow<RowGroupMetaData>],
431 column_id: ColumnId,
432 ) -> StatValues {
433 let Some(column) = self.metadata.column_by_id(column_id) else {
434 return StatValues::NoColumn;
436 };
437 match column.semantic_type {
438 SemanticType::Tag => StatValues::NoStats,
439 SemanticType::Field => {
440 let index = self.field_id_to_index.get(&column_id).unwrap();
442 let stats = column_null_counts(row_groups, *index);
443 StatValues::from_stats_opt(stats)
444 }
445 SemanticType::Timestamp => {
446 let index = self.time_index_position();
447 let stats = column_null_counts(row_groups, index);
448 StatValues::from_stats_opt(stats)
449 }
450 }
451 }
452
453 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
455 record_batch
456 .columns()
457 .iter()
458 .zip(record_batch.schema().fields())
459 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
461 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
462 let column = self
463 .metadata
464 .column_by_name(field.name())
465 .with_context(|| InvalidRecordBatchSnafu {
466 reason: format!("column {} not found in metadata", field.name()),
467 })?;
468
469 Ok(BatchColumn {
470 column_id: column.column_id,
471 data: vector,
472 })
473 })
474 .collect()
475 }
476
477 fn tag_values(
479 &self,
480 row_groups: &[impl Borrow<RowGroupMetaData>],
481 column: &ColumnMetadata,
482 is_min: bool,
483 ) -> StatValues {
484 let is_first_tag = self
485 .metadata
486 .primary_key
487 .first()
488 .map(|id| *id == column.column_id)
489 .unwrap_or(false);
490 if !is_first_tag {
491 return StatValues::NoStats;
493 }
494
495 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
496 }
497
498 fn first_tag_values(
501 &self,
502 row_groups: &[impl Borrow<RowGroupMetaData>],
503 column: &ColumnMetadata,
504 is_min: bool,
505 ) -> Option<ArrayRef> {
506 debug_assert!(
507 self.metadata
508 .primary_key
509 .first()
510 .map(|id| *id == column.column_id)
511 .unwrap_or(false)
512 );
513
514 let primary_key_encoding = self.metadata.primary_key_encoding;
515 let converter = build_primary_key_codec_with_fields(
516 primary_key_encoding,
517 [(
518 column.column_id,
519 SortField::new(column.column_schema.data_type.clone()),
520 )]
521 .into_iter(),
522 );
523
524 let values = row_groups.iter().map(|meta| {
525 let stats = meta
526 .borrow()
527 .column(self.primary_key_position())
528 .statistics()?;
529 match stats {
530 Statistics::Boolean(_) => None,
531 Statistics::Int32(_) => None,
532 Statistics::Int64(_) => None,
533 Statistics::Int96(_) => None,
534 Statistics::Float(_) => None,
535 Statistics::Double(_) => None,
536 Statistics::ByteArray(s) => {
537 let bytes = if is_min {
538 s.min_bytes_opt()?
539 } else {
540 s.max_bytes_opt()?
541 };
542 converter.decode_leftmost(bytes).ok()?
543 }
544 Statistics::FixedLenByteArray(_) => None,
545 }
546 });
547 let mut builder = column
548 .column_schema
549 .data_type
550 .create_mutable_vector(row_groups.len());
551 for value_opt in values {
552 match value_opt {
553 Some(v) => builder.push_value_ref(&v.as_value_ref()),
555 None => builder.push_null(),
556 }
557 }
558 let vector = builder.to_vector();
559
560 Some(vector.to_arrow_array())
561 }
562
563 fn primary_key_position(&self) -> usize {
565 self.arrow_schema.fields.len() - 3
566 }
567
568 fn time_index_position(&self) -> usize {
570 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
571 }
572
573 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
575 self.field_id_to_projected_index.get(&column_id).copied()
576 }
577}
578
579pub(crate) struct FormatProjection {
581 pub(crate) parquet_read_cols: ParquetReadColumns,
583 pub(crate) column_id_to_projected_index: HashMap<ColumnId, usize>,
588}
589
590impl FormatProjection {
591 pub(crate) fn compute_format_projection(
595 id_to_index: &HashMap<ColumnId, usize>,
596 sst_column_num: usize,
597 cols: ReadColumns,
598 ) -> Self {
599 let mut projected_columns: Vec<_> = cols
600 .cols
601 .into_iter()
602 .filter_map(|col| {
603 id_to_index
604 .get(&col.column_id)
605 .copied()
606 .map(|index_of_sst| (col.column_id, index_of_sst, col.nested_paths))
607 })
608 .collect();
609 projected_columns.sort_unstable_by_key(|(_, index, _)| *index);
612
613 let mut parquet_read_cols: Vec<ParquetReadColumn> =
614 Vec::with_capacity(projected_columns.len() + FIXED_POS_COLUMN_NUM);
615 let mut column_id_to_projected_index = HashMap::with_capacity(projected_columns.len());
617
618 for (col_id, index_of_sst, nested_paths) in projected_columns {
619 Self::merge_or_push_parquet_column(&mut parquet_read_cols, index_of_sst, nested_paths);
620
621 column_id_to_projected_index
622 .entry(col_id)
623 .or_insert_with(|| parquet_read_cols.len() - 1);
624 }
625
626 Self::append_time_index_if_needed(&mut parquet_read_cols, sst_column_num);
629 Self::append_fixed_internal_columns(&mut parquet_read_cols, sst_column_num);
630
631 Self {
632 parquet_read_cols: ParquetReadColumns::from_deduped(parquet_read_cols),
633 column_id_to_projected_index,
634 }
635 }
636
637 fn merge_or_push_parquet_column(
638 parquet_read_cols: &mut Vec<ParquetReadColumn>,
639 index_of_sst: usize,
640 nested_paths: Vec<Vec<String>>,
641 ) {
642 if let Some(last_col) = parquet_read_cols.last_mut()
645 && last_col.root_index() == index_of_sst
646 {
647 last_col.merge_nested_paths(nested_paths);
648 return;
649 }
650
651 let parquet_col = ParquetReadColumn::new(index_of_sst).with_nested_paths(nested_paths);
652 parquet_read_cols.push(parquet_col);
653 }
654
655 fn append_time_index_if_needed(
656 parquet_read_cols: &mut Vec<ParquetReadColumn>,
657 sst_column_num: usize,
658 ) {
659 let time_index = sst_column_num - FIXED_POS_COLUMN_NUM;
660 let needs_time_index = parquet_read_cols
664 .last()
665 .map(|col| col.root_index() != time_index)
666 .unwrap_or(true);
667 if needs_time_index {
668 parquet_read_cols.push(ParquetReadColumn::new(time_index));
669 }
670 }
671
672 fn append_fixed_internal_columns(
675 parquet_read_cols: &mut Vec<ParquetReadColumn>,
676 sst_column_num: usize,
677 ) {
678 for index in sst_column_num - INTERNAL_COLUMN_NUM..sst_column_num {
679 parquet_read_cols.push(ParquetReadColumn::new(index));
680 }
681 }
682}
683
684pub enum StatValues {
689 Values(ArrayRef),
691 NoColumn,
693 NoStats,
695}
696
697impl StatValues {
698 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
700 match stats {
701 Some(stats) => StatValues::Values(stats),
702 None => StatValues::NoStats,
703 }
704 }
705}
706
707#[cfg(test)]
708impl PrimaryKeyReadFormat {
709 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> PrimaryKeyReadFormat {
711 Self::new(
712 Arc::clone(&metadata),
713 ReadColumns::from_deduped_column_ids(
714 metadata.column_metadatas.iter().map(|c| c.column_id),
715 ),
716 )
717 }
718}
719
720pub(crate) fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
722 if pk_dict_array.is_empty() {
723 return Ok(Vec::new());
724 }
725
726 let mut offsets = vec![0];
728 let keys = pk_dict_array.keys();
729 let pk_indices = keys.values();
731 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
732 if *key != pk_indices[i + 1] {
734 offsets.push(i + 1);
736 }
737 }
738 offsets.push(keys.len());
739
740 Ok(offsets)
741}
742
743pub(crate) fn parquet_row_group_time_range(
746 file_meta: &FileMeta,
747 parquet_meta: &ParquetMetaData,
748 row_group_idx: usize,
749) -> Option<FileTimeRange> {
750 let row_group_meta = parquet_meta.row_group(row_group_idx);
751 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
752 assert!(
753 num_columns >= FIXED_POS_COLUMN_NUM,
754 "file only has {} columns",
755 num_columns
756 );
757 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
758
759 let stats = row_group_meta.column(time_index_pos).statistics()?;
760 let (min, max) = match stats {
762 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
763 Statistics::Int32(_)
764 | Statistics::Boolean(_)
765 | Statistics::Int96(_)
766 | Statistics::Float(_)
767 | Statistics::Double(_)
768 | Statistics::ByteArray(_)
769 | Statistics::FixedLenByteArray(_) => {
770 common_telemetry::warn!(
771 "Invalid statistics {:?} for time index in parquet in {}",
772 stats,
773 file_meta.file_id
774 );
775 return None;
776 }
777 };
778
779 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
780 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
781 let unit = file_meta.time_range.0.unit();
782
783 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
784}
785
786pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
789 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
790 if num_columns < FIXED_POS_COLUMN_NUM {
791 return false;
792 }
793
794 let sequence_pos = num_columns - 2;
796
797 for row_group in parquet_meta.row_groups() {
799 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
800 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
801 if *min_val != 0 || *max_val != 0 {
803 return false;
804 }
805 } else {
806 return false;
808 }
809 } else {
810 return false;
812 }
813 }
814
815 !parquet_meta.row_groups().is_empty()
817}
818
819#[cfg(test)]
820mod tests {
821 use std::sync::Arc;
822
823 use api::v1::OpType;
824 use datatypes::arrow::array::{
825 Int64Array, StringArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
826 };
827 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
828 use datatypes::prelude::ConcreteDataType;
829 use datatypes::schema::ColumnSchema;
830 use datatypes::value::ValueRef;
831 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
832 use mito_codec::row_converter::{
833 DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
834 };
835 use store_api::codec::PrimaryKeyEncoding;
836 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
837 use store_api::storage::RegionId;
838 use store_api::storage::consts::ReservedColumnId;
839
840 use super::*;
841 use crate::sst::parquet::flat_format::{
842 FlatReadFormat, FlatWriteFormat, sequence_column_index,
843 };
844 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema, with_field_id};
845
846 const TEST_SEQUENCE: u64 = 1;
847 const TEST_OP_TYPE: u8 = OpType::Put as u8;
848
849 fn build_test_region_metadata() -> RegionMetadataRef {
850 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
851 builder
852 .push_column_metadata(ColumnMetadata {
853 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
854 semantic_type: SemanticType::Tag,
855 column_id: 1,
856 })
857 .push_column_metadata(ColumnMetadata {
858 column_schema: ColumnSchema::new(
859 "field1",
860 ConcreteDataType::int64_datatype(),
861 true,
862 ),
863 semantic_type: SemanticType::Field,
864 column_id: 4, })
866 .push_column_metadata(ColumnMetadata {
867 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
868 semantic_type: SemanticType::Tag,
869 column_id: 3,
870 })
871 .push_column_metadata(ColumnMetadata {
872 column_schema: ColumnSchema::new(
873 "field0",
874 ConcreteDataType::int64_datatype(),
875 true,
876 ),
877 semantic_type: SemanticType::Field,
878 column_id: 2,
879 })
880 .push_column_metadata(ColumnMetadata {
881 column_schema: ColumnSchema::new(
882 "ts",
883 ConcreteDataType::timestamp_millisecond_datatype(),
884 false,
885 ),
886 semantic_type: SemanticType::Timestamp,
887 column_id: 5,
888 })
889 .primary_key(vec![1, 3]);
890 Arc::new(builder.build().unwrap())
891 }
892
893 fn build_test_arrow_schema() -> SchemaRef {
894 let fields = vec![
895 make_field("field1", ArrowDataType::Int64, true, Some(4)),
896 make_field("field0", ArrowDataType::Int64, true, Some(2)),
897 make_field(
898 "ts",
899 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
900 false,
901 Some(5),
902 ),
903 make_field(
904 "__primary_key",
905 ArrowDataType::Dictionary(
906 Box::new(ArrowDataType::UInt32),
907 Box::new(ArrowDataType::Binary),
908 ),
909 false,
910 None,
911 ),
912 make_field("__sequence", ArrowDataType::UInt64, false, None),
913 make_field("__op_type", ArrowDataType::UInt8, false, None),
914 ];
915 Arc::new(Schema::new(fields))
916 }
917
918 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
919 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
920 }
921
922 fn new_batch_with_sequence(
923 primary_key: &[u8],
924 start_ts: i64,
925 start_field: i64,
926 num_rows: usize,
927 sequence: u64,
928 ) -> Batch {
929 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
930 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
931 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
932 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
933 let fields = vec![
934 BatchColumn {
935 column_id: 4,
936 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
937 }, BatchColumn {
939 column_id: 2,
940 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
941 }, ];
943
944 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
945 .with_fields(fields)
946 .build()
947 .unwrap()
948 }
949
950 #[test]
951 fn test_to_sst_arrow_schema() {
952 let metadata = build_test_region_metadata();
953 let write_format = PrimaryKeyWriteFormat::new(metadata);
954 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
955 }
956
957 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
958 let values = Arc::new(BinaryArray::from_iter_values(
959 pk_row_nums.iter().map(|v| &v.0),
960 ));
961 let mut keys = vec![];
962 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
963 keys.extend(std::iter::repeat_n(index as u32, num_rows));
964 }
965 let keys = UInt32Array::from(keys);
966 Arc::new(DictionaryArray::new(keys, values))
967 }
968
969 #[test]
970 fn test_projection_indices() {
971 let metadata = build_test_region_metadata();
972 let read_format =
974 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([3]));
975 assert_eq!(
976 &[2, 3, 4, 5],
977 read_format.parquet_read_columns().root_indices()
978 );
979 let read_format =
981 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([4]));
982 assert_eq!(
983 &[0, 2, 3, 4, 5],
984 read_format.parquet_read_columns().root_indices()
985 );
986 let read_format =
988 PrimaryKeyReadFormat::new(metadata.clone(), ReadColumns::from_deduped_column_ids([5]));
989 assert_eq!(
990 &[2, 3, 4, 5],
991 read_format.parquet_read_columns().root_indices()
992 );
993 let read_format =
995 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids([2, 1, 5]));
996 assert_eq!(
997 &[1, 2, 3, 4, 5],
998 read_format.parquet_read_columns().root_indices()
999 );
1000 }
1001
1002 #[test]
1003 fn test_empty_primary_key_offsets() {
1004 let array = build_test_pk_array(&[]);
1005 assert!(primary_key_offsets(&array).unwrap().is_empty());
1006 }
1007
1008 #[test]
1009 fn test_primary_key_offsets_one_series() {
1010 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
1011 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
1012
1013 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
1014 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
1015
1016 let array = build_test_pk_array(&[
1017 (b"one".to_vec(), 1),
1018 (b"two".to_vec(), 1),
1019 (b"three".to_vec(), 1),
1020 ]);
1021 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
1022 }
1023
1024 #[test]
1025 fn test_primary_key_offsets_multi_series() {
1026 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
1027 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
1028
1029 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
1030 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
1031
1032 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
1033 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
1034 }
1035
1036 #[test]
1037 fn test_convert_empty_record_batch() {
1038 let metadata = build_test_region_metadata();
1039 let arrow_schema = build_test_arrow_schema();
1040 let column_ids: Vec<_> = metadata
1041 .column_metadatas
1042 .iter()
1043 .map(|col| col.column_id)
1044 .collect();
1045 let read_format =
1046 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1047 assert_eq!(arrow_schema, *read_format.arrow_schema());
1048
1049 let record_batch = RecordBatch::new_empty(arrow_schema);
1050 let mut batches = VecDeque::new();
1051 read_format
1052 .convert_record_batch(&record_batch, None, &mut batches)
1053 .unwrap();
1054 assert!(batches.is_empty());
1055 }
1056
1057 #[test]
1058 fn test_convert_record_batch() {
1059 let metadata = build_test_region_metadata();
1060 let column_ids: Vec<_> = metadata
1061 .column_metadatas
1062 .iter()
1063 .map(|col| col.column_id)
1064 .collect();
1065 let read_format =
1066 PrimaryKeyReadFormat::new(metadata, ReadColumns::from_deduped_column_ids(column_ids));
1067
1068 let columns: Vec<ArrayRef> = vec![
1069 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1076 let arrow_schema = build_test_arrow_schema();
1077 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1078 let mut batches = VecDeque::new();
1079 read_format
1080 .convert_record_batch(&record_batch, None, &mut batches)
1081 .unwrap();
1082
1083 assert_eq!(
1084 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1085 batches.into_iter().collect::<Vec<_>>(),
1086 );
1087 }
1088
1089 #[test]
1090 fn test_convert_record_batch_with_override_sequence() {
1091 let metadata = build_test_region_metadata();
1092 let read_format = PrimaryKeyReadFormat::new(
1093 metadata.clone(),
1094 ReadColumns::from_deduped_column_ids(
1095 metadata.column_metadatas.iter().map(|c| c.column_id),
1096 ),
1097 );
1098
1099 let columns: Vec<ArrayRef> = vec![
1100 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1107 let arrow_schema = build_test_arrow_schema();
1108 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1109
1110 let override_sequence: u64 = 12345;
1112 let override_sequence_array: ArrayRef =
1113 Arc::new(UInt64Array::from_value(override_sequence, 4));
1114
1115 let mut batches = VecDeque::new();
1116 read_format
1117 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1118 .unwrap();
1119
1120 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1122 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1123
1124 assert_eq!(
1125 vec![expected_batch1, expected_batch2],
1126 batches.into_iter().collect::<Vec<_>>(),
1127 );
1128 }
1129
1130 fn make_field(name: &str, dt: ArrowDataType, nullable: bool, field_id: Option<u32>) -> Field {
1131 let mut field = Field::new(name, dt, nullable);
1132 if let Some(id) = field_id {
1133 field = with_field_id(field, id);
1134 }
1135 field
1136 }
1137
1138 fn build_test_flat_sst_schema() -> SchemaRef {
1139 let fields = vec![
1140 Field::new("tag0", ArrowDataType::Int64, true), Field::new("tag1", ArrowDataType::Int64, true),
1142 Field::new("field1", ArrowDataType::Int64, true), Field::new("field0", ArrowDataType::Int64, true),
1144 Field::new(
1145 "ts",
1146 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1147 false,
1148 ),
1149 Field::new(
1150 "__primary_key",
1151 ArrowDataType::Dictionary(
1152 Box::new(ArrowDataType::UInt32),
1153 Box::new(ArrowDataType::Binary),
1154 ),
1155 false,
1156 ),
1157 Field::new("__sequence", ArrowDataType::UInt64, false),
1158 Field::new("__op_type", ArrowDataType::UInt8, false),
1159 ];
1160 Arc::new(Schema::new(fields))
1161 }
1162
1163 fn build_test_flat_sst_schema_with_field_ids() -> SchemaRef {
1164 let ids = [
1165 Some(1u32),
1166 Some(3),
1167 Some(4),
1168 Some(2),
1169 Some(5),
1170 None,
1171 None,
1172 None,
1173 ];
1174 let fields: Vec<_> = build_test_flat_sst_schema()
1175 .fields()
1176 .iter()
1177 .zip(ids)
1178 .map(|(f, id)| match id {
1179 Some(id) => Arc::new(with_field_id((**f).clone(), id)) as _,
1180 None => f.clone(),
1181 })
1182 .collect();
1183 Arc::new(Schema::new(fields))
1184 }
1185
1186 #[test]
1187 fn test_flat_to_sst_arrow_schema() {
1188 let metadata = build_test_region_metadata();
1189 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1190 assert_eq!(
1191 &build_test_flat_sst_schema_with_field_ids(),
1192 format.arrow_schema()
1193 );
1194 }
1195
1196 fn input_columns_for_flat_batch(num_rows: usize) -> Vec<ArrayRef> {
1197 vec![
1198 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ]
1207 }
1208
1209 #[test]
1210 fn test_flat_convert_batch() {
1211 let metadata = build_test_region_metadata();
1212 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default());
1213
1214 let num_rows = 4;
1215 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1216 let batch =
1217 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns.clone())
1218 .unwrap();
1219 let expect_record =
1220 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1221
1222 let actual = format.convert_batch(&batch).unwrap();
1223 assert_eq!(expect_record, actual);
1224 }
1225
1226 #[test]
1227 fn test_flat_convert_with_override_sequence() {
1228 let metadata = build_test_region_metadata();
1229 let format = FlatWriteFormat::new(metadata, &FlatSchemaOptions::default())
1230 .with_override_sequence(Some(415411));
1231
1232 let num_rows = 4;
1233 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1234 let batch =
1235 RecordBatch::try_new(build_test_flat_sst_schema_with_field_ids(), columns).unwrap();
1236
1237 let expected_columns: Vec<ArrayRef> = vec![
1238 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1247 let expected_record = RecordBatch::try_new(
1248 build_test_flat_sst_schema_with_field_ids(),
1249 expected_columns,
1250 )
1251 .unwrap();
1252
1253 let actual = format.convert_batch(&batch).unwrap();
1254 assert_eq!(expected_record, actual);
1255 }
1256
1257 #[test]
1258 fn test_flat_projection_indices() {
1259 let metadata = build_test_region_metadata();
1260 let read_format = FlatReadFormat::new(
1265 metadata.clone(),
1266 ReadColumns::from_deduped_column_ids([3]),
1267 None,
1268 "test",
1269 false,
1270 )
1271 .unwrap();
1272 assert_eq!(
1273 &[1, 4, 5, 6, 7],
1274 read_format.parquet_read_columns().root_indices()
1275 );
1276
1277 let read_format = FlatReadFormat::new(
1279 metadata.clone(),
1280 ReadColumns::from_deduped_column_ids([4]),
1281 None,
1282 "test",
1283 false,
1284 )
1285 .unwrap();
1286 assert_eq!(
1287 &[2, 4, 5, 6, 7],
1288 read_format.parquet_read_columns().root_indices()
1289 );
1290
1291 let read_format = FlatReadFormat::new(
1293 metadata.clone(),
1294 ReadColumns::from_deduped_column_ids([5]),
1295 None,
1296 "test",
1297 false,
1298 )
1299 .unwrap();
1300 assert_eq!(
1301 &[4, 5, 6, 7],
1302 read_format.parquet_read_columns().root_indices()
1303 );
1304
1305 let read_format = FlatReadFormat::new(
1307 metadata,
1308 ReadColumns::from_deduped_column_ids([2, 1, 5]),
1309 None,
1310 "test",
1311 false,
1312 )
1313 .unwrap();
1314 assert_eq!(
1315 &[0, 3, 4, 5, 6, 7],
1316 read_format.parquet_read_columns().root_indices()
1317 );
1318 }
1319
1320 #[test]
1321 fn test_flat_read_format_convert_batch() {
1322 let metadata = build_test_region_metadata();
1323 let mut format = FlatReadFormat::new(
1324 metadata,
1325 ReadColumns::from_deduped_column_ids(std::iter::once(1)), Some(8),
1327 "test",
1328 false,
1329 )
1330 .unwrap();
1331
1332 let num_rows = 4;
1333 let original_sequence = 100u64;
1334 let override_sequence = 200u64;
1335
1336 let columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1338 let mut test_columns = columns.clone();
1339 test_columns[6] = Arc::new(UInt64Array::from(vec![original_sequence; num_rows]));
1341 let record_batch =
1342 RecordBatch::try_new(format.arrow_schema().clone(), test_columns).unwrap();
1343
1344 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1346 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1347 let sequence_array = sequence_column
1348 .as_any()
1349 .downcast_ref::<UInt64Array>()
1350 .unwrap();
1351
1352 let expected_original = UInt64Array::from(vec![original_sequence; num_rows]);
1353 assert_eq!(sequence_array, &expected_original);
1354
1355 format.set_override_sequence(Some(override_sequence));
1357 let override_sequence_array = format.new_override_sequence_array(num_rows).unwrap();
1358 let result = format
1359 .convert_batch(record_batch, Some(&override_sequence_array))
1360 .unwrap();
1361 let sequence_column = result.column(sequence_column_index(result.num_columns()));
1362 let sequence_array = sequence_column
1363 .as_any()
1364 .downcast_ref::<UInt64Array>()
1365 .unwrap();
1366
1367 let expected_override = UInt64Array::from(vec![override_sequence; num_rows]);
1368 assert_eq!(sequence_array, &expected_override);
1369 }
1370
1371 #[test]
1372 fn test_need_convert_to_flat() {
1373 let metadata = build_test_region_metadata();
1374
1375 let expected_columns = metadata.column_metadatas.len() + 3;
1378 let result =
1379 FlatReadFormat::is_legacy_format(&metadata, expected_columns, "test.parquet").unwrap();
1380 assert!(
1381 !result,
1382 "Should not need conversion when column counts match"
1383 );
1384
1385 let num_columns_without_pk = expected_columns - metadata.primary_key.len();
1388 let result =
1389 FlatReadFormat::is_legacy_format(&metadata, num_columns_without_pk, "test.parquet")
1390 .unwrap();
1391 assert!(
1392 result,
1393 "Should need conversion when primary key columns are missing"
1394 );
1395
1396 let too_many_columns = expected_columns + 1;
1398 let err = FlatReadFormat::is_legacy_format(&metadata, too_many_columns, "test.parquet")
1399 .unwrap_err();
1400 assert!(err.to_string().contains("Expected columns"), "{err:?}");
1401
1402 let wrong_diff_columns = expected_columns - 1; let err = FlatReadFormat::is_legacy_format(&metadata, wrong_diff_columns, "test.parquet")
1405 .unwrap_err();
1406 assert!(
1407 err.to_string().contains("Column number difference"),
1408 "{err:?}"
1409 );
1410 }
1411
1412 fn build_test_dense_pk_array(
1413 codec: &DensePrimaryKeyCodec,
1414 pk_values_per_row: &[&[Option<i64>]],
1415 ) -> Arc<PrimaryKeyArray> {
1416 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1417
1418 for pk_values_row in pk_values_per_row {
1419 let values: Vec<ValueRef> = pk_values_row
1420 .iter()
1421 .map(|opt| match opt {
1422 Some(val) => ValueRef::Int64(*val),
1423 None => ValueRef::Null,
1424 })
1425 .collect();
1426
1427 let encoded = codec.encode(values.into_iter()).unwrap();
1428 builder.append_value(&encoded);
1429 }
1430
1431 Arc::new(builder.finish())
1432 }
1433
1434 fn build_test_sparse_region_metadata() -> RegionMetadataRef {
1435 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1436 builder
1437 .push_column_metadata(ColumnMetadata {
1438 column_schema: ColumnSchema::new(
1439 "__table_id",
1440 ConcreteDataType::uint32_datatype(),
1441 false,
1442 ),
1443 semantic_type: SemanticType::Tag,
1444 column_id: ReservedColumnId::table_id(),
1445 })
1446 .push_column_metadata(ColumnMetadata {
1447 column_schema: ColumnSchema::new(
1448 "__tsid",
1449 ConcreteDataType::uint64_datatype(),
1450 false,
1451 ),
1452 semantic_type: SemanticType::Tag,
1453 column_id: ReservedColumnId::tsid(),
1454 })
1455 .push_column_metadata(ColumnMetadata {
1456 column_schema: ColumnSchema::new("tag0", ConcreteDataType::string_datatype(), true),
1457 semantic_type: SemanticType::Tag,
1458 column_id: 1,
1459 })
1460 .push_column_metadata(ColumnMetadata {
1461 column_schema: ColumnSchema::new("tag1", ConcreteDataType::string_datatype(), true),
1462 semantic_type: SemanticType::Tag,
1463 column_id: 3,
1464 })
1465 .push_column_metadata(ColumnMetadata {
1466 column_schema: ColumnSchema::new(
1467 "field1",
1468 ConcreteDataType::int64_datatype(),
1469 true,
1470 ),
1471 semantic_type: SemanticType::Field,
1472 column_id: 4,
1473 })
1474 .push_column_metadata(ColumnMetadata {
1475 column_schema: ColumnSchema::new(
1476 "field0",
1477 ConcreteDataType::int64_datatype(),
1478 true,
1479 ),
1480 semantic_type: SemanticType::Field,
1481 column_id: 2,
1482 })
1483 .push_column_metadata(ColumnMetadata {
1484 column_schema: ColumnSchema::new(
1485 "ts",
1486 ConcreteDataType::timestamp_millisecond_datatype(),
1487 false,
1488 ),
1489 semantic_type: SemanticType::Timestamp,
1490 column_id: 5,
1491 })
1492 .primary_key(vec![
1493 ReservedColumnId::table_id(),
1494 ReservedColumnId::tsid(),
1495 1,
1496 3,
1497 ])
1498 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
1499 Arc::new(builder.build().unwrap())
1500 }
1501
1502 fn build_test_sparse_pk_array(
1503 codec: &SparsePrimaryKeyCodec,
1504 pk_values_per_row: &[SparseTestRow],
1505 ) -> Arc<PrimaryKeyArray> {
1506 let mut builder = PrimaryKeyArrayBuilder::with_capacity(pk_values_per_row.len(), 1024, 0);
1507 for row in pk_values_per_row {
1508 let values = vec![
1509 (ReservedColumnId::table_id(), ValueRef::UInt32(row.table_id)),
1510 (ReservedColumnId::tsid(), ValueRef::UInt64(row.tsid)),
1511 (1, ValueRef::String(&row.tag0)),
1512 (3, ValueRef::String(&row.tag1)),
1513 ];
1514
1515 let mut buffer = Vec::new();
1516 codec.encode_value_refs(&values, &mut buffer).unwrap();
1517 builder.append_value(&buffer);
1518 }
1519
1520 Arc::new(builder.finish())
1521 }
1522
1523 #[derive(Clone)]
1524 struct SparseTestRow {
1525 table_id: u32,
1526 tsid: u64,
1527 tag0: String,
1528 tag1: String,
1529 }
1530
1531 #[test]
1532 fn test_flat_read_format_convert_format_with_dense_encoding() {
1533 let metadata = build_test_region_metadata();
1534
1535 let column_ids: Vec<_> = metadata
1536 .column_metadatas
1537 .iter()
1538 .map(|c| c.column_id)
1539 .collect();
1540 let format = FlatReadFormat::new(
1541 metadata.clone(),
1542 ReadColumns::from_deduped_column_ids(column_ids),
1543 Some(6),
1544 "test",
1545 false,
1546 )
1547 .unwrap();
1548
1549 let num_rows = 4;
1550 let original_sequence = 100u64;
1551
1552 let pk_values_per_row = vec![
1554 &[Some(1i64), Some(1i64)][..]; num_rows ];
1556
1557 let codec = DensePrimaryKeyCodec::new(&metadata);
1559 let dense_pk_array = build_test_dense_pk_array(&codec, &pk_values_per_row);
1560 let columns: Vec<ArrayRef> = vec![
1561 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1568
1569 let old_schema = build_test_arrow_schema();
1571 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1572
1573 let result = format.convert_batch(record_batch, None).unwrap();
1575
1576 let expected_columns: Vec<ArrayRef> = vec![
1578 Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![1; num_rows])), Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), dense_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1587 let expected_record_batch = RecordBatch::try_new(
1588 build_test_flat_sst_schema_with_field_ids(),
1589 expected_columns,
1590 )
1591 .unwrap();
1592
1593 assert_eq!(expected_record_batch, result);
1595 }
1596
1597 #[test]
1598 fn test_flat_read_format_convert_format_with_sparse_encoding() {
1599 let metadata = build_test_sparse_region_metadata();
1600
1601 let column_ids: Vec<_> = metadata
1602 .column_metadatas
1603 .iter()
1604 .map(|c| c.column_id)
1605 .collect();
1606 let format = FlatReadFormat::new(
1607 metadata.clone(),
1608 ReadColumns::from_deduped_column_ids(column_ids.clone()),
1609 None,
1610 "test",
1611 false,
1612 )
1613 .unwrap();
1614
1615 let num_rows = 4;
1616 let original_sequence = 100u64;
1617
1618 let pk_test_rows = vec![
1620 SparseTestRow {
1621 table_id: 1,
1622 tsid: 123,
1623 tag0: "frontend".to_string(),
1624 tag1: "pod1".to_string(),
1625 };
1626 num_rows
1627 ];
1628
1629 let codec = SparsePrimaryKeyCodec::new(&metadata);
1630 let sparse_pk_array = build_test_sparse_pk_array(&codec, &pk_test_rows);
1631 let columns: Vec<ArrayRef> = vec![
1633 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array.clone(), Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1640
1641 let old_schema = build_test_arrow_schema();
1643 let record_batch = RecordBatch::try_new(old_schema, columns).unwrap();
1644
1645 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1647
1648 let tag0_array = Arc::new(DictionaryArray::new(
1650 UInt32Array::from(vec![0; num_rows]),
1651 Arc::new(StringArray::from(vec!["frontend"])),
1652 ));
1653 let tag1_array = Arc::new(DictionaryArray::new(
1654 UInt32Array::from(vec![0; num_rows]),
1655 Arc::new(StringArray::from(vec!["pod1"])),
1656 ));
1657 let expected_columns: Vec<ArrayRef> = vec![
1658 Arc::new(UInt32Array::from(vec![1; num_rows])), Arc::new(UInt64Array::from(vec![123; num_rows])), tag0_array, tag1_array, Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), sparse_pk_array, Arc::new(UInt64Array::from(vec![original_sequence; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1669 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1670 let expected_record_batch =
1671 RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1672
1673 assert_eq!(expected_record_batch, result);
1675
1676 let format = FlatReadFormat::new(
1677 metadata.clone(),
1678 ReadColumns::from_deduped_column_ids(column_ids),
1679 None,
1680 "test",
1681 true,
1682 )
1683 .unwrap();
1684 let result = format.convert_batch(record_batch.clone(), None).unwrap();
1686 assert_eq!(record_batch, result);
1687 }
1688
1689 #[test]
1690 fn test_convert_flat_batch() {
1691 let metadata = build_test_region_metadata();
1692 let write_format = PrimaryKeyWriteFormat::new(metadata);
1693
1694 let num_rows = 4;
1695 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1697 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1698
1699 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1701
1702 let expected_columns: Vec<ArrayRef> = vec![
1704 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1711 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1712
1713 assert_eq!(expected, result);
1714 }
1715
1716 #[test]
1717 fn test_convert_flat_batch_with_override_sequence() {
1718 let metadata = build_test_region_metadata();
1719 let write_format = PrimaryKeyWriteFormat::new(metadata).with_override_sequence(Some(999));
1720
1721 let num_rows = 4;
1722 let flat_columns: Vec<ArrayRef> = input_columns_for_flat_batch(num_rows);
1723 let flat_batch = RecordBatch::try_new(build_test_flat_sst_schema(), flat_columns).unwrap();
1724
1725 let result = write_format.convert_flat_batch(&flat_batch, 2).unwrap();
1726
1727 let expected_columns: Vec<ArrayRef> = vec![
1728 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![999; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1735 let expected = RecordBatch::try_new(build_test_arrow_schema(), expected_columns).unwrap();
1736
1737 assert_eq!(expected, result);
1738 }
1739
1740 #[test]
1741 fn test_convert_flat_batch_no_tags() {
1742 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1744 builder
1745 .push_column_metadata(ColumnMetadata {
1746 column_schema: ColumnSchema::new(
1747 "field0",
1748 ConcreteDataType::int64_datatype(),
1749 true,
1750 ),
1751 semantic_type: SemanticType::Field,
1752 column_id: 1,
1753 })
1754 .push_column_metadata(ColumnMetadata {
1755 column_schema: ColumnSchema::new(
1756 "ts",
1757 ConcreteDataType::timestamp_millisecond_datatype(),
1758 false,
1759 ),
1760 semantic_type: SemanticType::Timestamp,
1761 column_id: 2,
1762 });
1763 let metadata = Arc::new(builder.build().unwrap());
1764 let write_format = PrimaryKeyWriteFormat::new(metadata);
1765
1766 let num_rows = 3;
1767 let sst_schema = write_format.arrow_schema().clone();
1769 let columns: Vec<ArrayRef> = vec![
1770 Arc::new(Int64Array::from(vec![10; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3])), build_test_pk_array(&[(b"".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
1776 let flat_batch = RecordBatch::try_new(sst_schema.clone(), columns.clone()).unwrap();
1777
1778 let result = write_format.convert_flat_batch(&flat_batch, 1).unwrap();
1780 let expected = RecordBatch::try_new(sst_schema, columns).unwrap();
1781
1782 assert_eq!(expected, result);
1783 }
1784}