1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow::record_batch::RecordBatch;
39use datatypes::prelude::DataType;
40use datatypes::vectors::{Helper, Vector};
41use mito_codec::row_converter::{build_primary_key_codec_with_fields, SortField};
42use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
43use parquet::file::statistics::Statistics;
44use snafu::{ensure, OptionExt, ResultExt};
45use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
46use store_api::storage::{ColumnId, SequenceNumber};
47
48use crate::error::{
49 ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
50};
51use crate::read::{Batch, BatchBuilder, BatchColumn};
52use crate::sst::file::{FileMeta, FileTimeRange};
53use crate::sst::to_sst_arrow_schema;
54
55pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
57
58const FIXED_POS_COLUMN_NUM: usize = 4;
62
63pub(crate) struct WriteFormat {
65 metadata: RegionMetadataRef,
66 arrow_schema: SchemaRef,
68 override_sequence: Option<SequenceNumber>,
69}
70
71impl WriteFormat {
72 pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
74 let arrow_schema = to_sst_arrow_schema(&metadata);
75 WriteFormat {
76 metadata,
77 arrow_schema,
78 override_sequence: None,
79 }
80 }
81
82 pub(crate) fn with_override_sequence(
84 mut self,
85 override_sequence: Option<SequenceNumber>,
86 ) -> Self {
87 self.override_sequence = override_sequence;
88 self
89 }
90
91 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
93 &self.arrow_schema
94 }
95
96 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
98 debug_assert_eq!(
99 batch.fields().len() + FIXED_POS_COLUMN_NUM,
100 self.arrow_schema.fields().len()
101 );
102 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
103 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
105 ensure!(
106 column.column_id == column_metadata.column_id,
107 InvalidBatchSnafu {
108 reason: format!(
109 "Batch has column {} but metadata has column {}",
110 column.column_id, column_metadata.column_id
111 ),
112 }
113 );
114
115 columns.push(column.data.to_arrow_array());
116 }
117 columns.push(batch.timestamps().to_arrow_array());
119 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
121
122 if let Some(override_sequence) = self.override_sequence {
123 let sequence_array =
124 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
125 columns.push(sequence_array);
126 } else {
127 columns.push(batch.sequences().to_arrow_array());
128 }
129 columns.push(batch.op_types().to_arrow_array());
130
131 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
132 }
133}
134
135pub struct ReadFormat {
137 metadata: RegionMetadataRef,
139 arrow_schema: SchemaRef,
141 field_id_to_index: HashMap<ColumnId, usize>,
144 projection_indices: Vec<usize>,
146 field_id_to_projected_index: HashMap<ColumnId, usize>,
149 override_sequence: Option<SequenceNumber>,
151}
152
153impl ReadFormat {
154 pub fn new(
156 metadata: RegionMetadataRef,
157 column_ids: impl Iterator<Item = ColumnId>,
158 ) -> ReadFormat {
159 let field_id_to_index: HashMap<_, _> = metadata
160 .field_columns()
161 .enumerate()
162 .map(|(index, column)| (column.column_id, index))
163 .collect();
164 let arrow_schema = to_sst_arrow_schema(&metadata);
165
166 let mut projected_field_id_index: Vec<_> = column_ids
168 .filter_map(|column_id| {
169 field_id_to_index
171 .get(&column_id)
172 .copied()
173 .map(|index| (column_id, index))
174 })
175 .collect();
176 let mut projection_indices: Vec<_> = projected_field_id_index
177 .iter()
178 .map(|(_column_id, index)| *index)
179 .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
181 .collect();
182 projection_indices.sort_unstable();
183
184 projected_field_id_index.sort_unstable_by_key(|x| x.1);
187 let field_id_to_projected_index = projected_field_id_index
190 .into_iter()
191 .map(|(column_id, _)| column_id)
192 .enumerate()
193 .map(|(index, column_id)| (column_id, index))
194 .collect();
195
196 ReadFormat {
197 metadata,
198 arrow_schema,
199 field_id_to_index,
200 projection_indices,
201 field_id_to_projected_index,
202 override_sequence: None,
203 }
204 }
205
206 pub(crate) fn set_override_sequence(&mut self, sequence: Option<SequenceNumber>) {
208 self.override_sequence = sequence;
209 }
210
211 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
216 &self.arrow_schema
217 }
218
219 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
221 &self.metadata
222 }
223
224 pub(crate) fn projection_indices(&self) -> &[usize] {
226 &self.projection_indices
227 }
228
229 pub(crate) fn new_override_sequence_array(&self, length: usize) -> Option<ArrayRef> {
231 self.override_sequence
232 .map(|seq| Arc::new(UInt64Array::from_value(seq, length)) as ArrayRef)
233 }
234
235 pub fn convert_record_batch(
240 &self,
241 record_batch: &RecordBatch,
242 override_sequence_array: Option<&ArrayRef>,
243 batches: &mut VecDeque<Batch>,
244 ) -> Result<()> {
245 debug_assert!(batches.is_empty());
246
247 ensure!(
249 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
250 InvalidRecordBatchSnafu {
251 reason: format!(
252 "record batch only has {} columns",
253 record_batch.num_columns()
254 ),
255 }
256 );
257
258 let mut fixed_pos_columns = record_batch
259 .columns()
260 .iter()
261 .rev()
262 .take(FIXED_POS_COLUMN_NUM);
263 let op_type_array = fixed_pos_columns.next().unwrap();
265 let mut sequence_array = fixed_pos_columns.next().unwrap().clone();
266 let pk_array = fixed_pos_columns.next().unwrap();
267 let ts_array = fixed_pos_columns.next().unwrap();
268 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
269
270 if let Some(override_array) = override_sequence_array {
272 assert!(override_array.len() >= sequence_array.len());
273 sequence_array = if override_array.len() > sequence_array.len() {
276 override_array.slice(0, sequence_array.len())
277 } else {
278 override_array.clone()
279 };
280 }
281
282 let pk_dict_array = pk_array
284 .as_any()
285 .downcast_ref::<PrimaryKeyArray>()
286 .with_context(|| InvalidRecordBatchSnafu {
287 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
288 })?;
289 let offsets = primary_key_offsets(pk_dict_array)?;
290 if offsets.is_empty() {
291 return Ok(());
292 }
293
294 let keys = pk_dict_array.keys();
296 let pk_values = pk_dict_array
297 .values()
298 .as_any()
299 .downcast_ref::<BinaryArray>()
300 .with_context(|| InvalidRecordBatchSnafu {
301 reason: format!(
302 "values of primary key array should not be {:?}",
303 pk_dict_array.values().data_type()
304 ),
305 })?;
306 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
307 let end = offsets[i + 1];
308 let rows_in_batch = end - start;
309 let dict_key = keys.value(*start);
310 let primary_key = pk_values.value(dict_key as usize).to_vec();
311
312 let mut builder = BatchBuilder::new(primary_key);
313 builder
314 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
315 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
316 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
317 for batch_column in &field_batch_columns {
319 builder.push_field(BatchColumn {
320 column_id: batch_column.column_id,
321 data: batch_column.data.slice(*start, rows_in_batch),
322 });
323 }
324
325 let batch = builder.build()?;
326 batches.push_back(batch);
327 }
328
329 Ok(())
330 }
331
332 pub fn min_values(
334 &self,
335 row_groups: &[impl Borrow<RowGroupMetaData>],
336 column_id: ColumnId,
337 ) -> StatValues {
338 let Some(column) = self.metadata.column_by_id(column_id) else {
339 return StatValues::NoColumn;
341 };
342 match column.semantic_type {
343 SemanticType::Tag => self.tag_values(row_groups, column, true),
344 SemanticType::Field => {
345 let index = self.field_id_to_index.get(&column_id).unwrap();
347 let stats = Self::column_values(row_groups, column, *index, true);
348 StatValues::from_stats_opt(stats)
349 }
350 SemanticType::Timestamp => {
351 let index = self.time_index_position();
352 let stats = Self::column_values(row_groups, column, index, true);
353 StatValues::from_stats_opt(stats)
354 }
355 }
356 }
357
358 pub fn max_values(
360 &self,
361 row_groups: &[impl Borrow<RowGroupMetaData>],
362 column_id: ColumnId,
363 ) -> StatValues {
364 let Some(column) = self.metadata.column_by_id(column_id) else {
365 return StatValues::NoColumn;
367 };
368 match column.semantic_type {
369 SemanticType::Tag => self.tag_values(row_groups, column, false),
370 SemanticType::Field => {
371 let index = self.field_id_to_index.get(&column_id).unwrap();
373 let stats = Self::column_values(row_groups, column, *index, false);
374 StatValues::from_stats_opt(stats)
375 }
376 SemanticType::Timestamp => {
377 let index = self.time_index_position();
378 let stats = Self::column_values(row_groups, column, index, false);
379 StatValues::from_stats_opt(stats)
380 }
381 }
382 }
383
384 pub fn null_counts(
386 &self,
387 row_groups: &[impl Borrow<RowGroupMetaData>],
388 column_id: ColumnId,
389 ) -> StatValues {
390 let Some(column) = self.metadata.column_by_id(column_id) else {
391 return StatValues::NoColumn;
393 };
394 match column.semantic_type {
395 SemanticType::Tag => StatValues::NoStats,
396 SemanticType::Field => {
397 let index = self.field_id_to_index.get(&column_id).unwrap();
399 let stats = Self::column_null_counts(row_groups, *index);
400 StatValues::from_stats_opt(stats)
401 }
402 SemanticType::Timestamp => {
403 let index = self.time_index_position();
404 let stats = Self::column_null_counts(row_groups, index);
405 StatValues::from_stats_opt(stats)
406 }
407 }
408 }
409
410 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
412 record_batch
413 .columns()
414 .iter()
415 .zip(record_batch.schema().fields())
416 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
418 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
419 let column = self
420 .metadata
421 .column_by_name(field.name())
422 .with_context(|| InvalidRecordBatchSnafu {
423 reason: format!("column {} not found in metadata", field.name()),
424 })?;
425
426 Ok(BatchColumn {
427 column_id: column.column_id,
428 data: vector,
429 })
430 })
431 .collect()
432 }
433
434 fn tag_values(
436 &self,
437 row_groups: &[impl Borrow<RowGroupMetaData>],
438 column: &ColumnMetadata,
439 is_min: bool,
440 ) -> StatValues {
441 let is_first_tag = self
442 .metadata
443 .primary_key
444 .first()
445 .map(|id| *id == column.column_id)
446 .unwrap_or(false);
447 if !is_first_tag {
448 return StatValues::NoStats;
450 }
451
452 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
453 }
454
455 fn first_tag_values(
458 &self,
459 row_groups: &[impl Borrow<RowGroupMetaData>],
460 column: &ColumnMetadata,
461 is_min: bool,
462 ) -> Option<ArrayRef> {
463 debug_assert!(self
464 .metadata
465 .primary_key
466 .first()
467 .map(|id| *id == column.column_id)
468 .unwrap_or(false));
469
470 let primary_key_encoding = self.metadata.primary_key_encoding;
471 let converter = build_primary_key_codec_with_fields(
472 primary_key_encoding,
473 [(
474 column.column_id,
475 SortField::new(column.column_schema.data_type.clone()),
476 )]
477 .into_iter(),
478 );
479
480 let values = row_groups.iter().map(|meta| {
481 let stats = meta
482 .borrow()
483 .column(self.primary_key_position())
484 .statistics()?;
485 match stats {
486 Statistics::Boolean(_) => None,
487 Statistics::Int32(_) => None,
488 Statistics::Int64(_) => None,
489 Statistics::Int96(_) => None,
490 Statistics::Float(_) => None,
491 Statistics::Double(_) => None,
492 Statistics::ByteArray(s) => {
493 let bytes = if is_min {
494 s.min_bytes_opt()?
495 } else {
496 s.max_bytes_opt()?
497 };
498 converter.decode_leftmost(bytes).ok()?
499 }
500 Statistics::FixedLenByteArray(_) => None,
501 }
502 });
503 let mut builder = column
504 .column_schema
505 .data_type
506 .create_mutable_vector(row_groups.len());
507 for value_opt in values {
508 match value_opt {
509 Some(v) => builder.push_value_ref(v.as_value_ref()),
511 None => builder.push_null(),
512 }
513 }
514 let vector = builder.to_vector();
515
516 Some(vector.to_arrow_array())
517 }
518
519 fn column_values(
522 row_groups: &[impl Borrow<RowGroupMetaData>],
523 column: &ColumnMetadata,
524 column_index: usize,
525 is_min: bool,
526 ) -> Option<ArrayRef> {
527 let null_scalar: ScalarValue = column
528 .column_schema
529 .data_type
530 .as_arrow_type()
531 .try_into()
532 .ok()?;
533 let scalar_values = row_groups
534 .iter()
535 .map(|meta| {
536 let stats = meta.borrow().column(column_index).statistics()?;
537 match stats {
538 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
539 *s.min_opt()?
540 } else {
541 *s.max_opt()?
542 }))),
543 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
544 *s.min_opt()?
545 } else {
546 *s.max_opt()?
547 }))),
548 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
549 *s.min_opt()?
550 } else {
551 *s.max_opt()?
552 }))),
553
554 Statistics::Int96(_) => None,
555 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
556 *s.min_opt()?
557 } else {
558 *s.max_opt()?
559 }))),
560 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
561 *s.min_opt()?
562 } else {
563 *s.max_opt()?
564 }))),
565 Statistics::ByteArray(s) => {
566 let bytes = if is_min {
567 s.min_bytes_opt()?
568 } else {
569 s.max_bytes_opt()?
570 };
571 let s = String::from_utf8(bytes.to_vec()).ok();
572 Some(ScalarValue::Utf8(s))
573 }
574
575 Statistics::FixedLenByteArray(_) => None,
576 }
577 })
578 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
579 .collect::<Vec<ScalarValue>>();
580 debug_assert_eq!(scalar_values.len(), row_groups.len());
581 ScalarValue::iter_to_array(scalar_values).ok()
582 }
583
584 fn column_null_counts(
586 row_groups: &[impl Borrow<RowGroupMetaData>],
587 column_index: usize,
588 ) -> Option<ArrayRef> {
589 let values = row_groups.iter().map(|meta| {
590 let col = meta.borrow().column(column_index);
591 let stat = col.statistics()?;
592 stat.null_count_opt()
593 });
594 Some(Arc::new(UInt64Array::from_iter(values)))
595 }
596
597 fn primary_key_position(&self) -> usize {
599 self.arrow_schema.fields.len() - 3
600 }
601
602 fn time_index_position(&self) -> usize {
604 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
605 }
606
607 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
609 self.field_id_to_projected_index.get(&column_id).copied()
610 }
611}
612
613pub enum StatValues {
618 Values(ArrayRef),
620 NoColumn,
622 NoStats,
624}
625
626impl StatValues {
627 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
629 match stats {
630 Some(stats) => StatValues::Values(stats),
631 None => StatValues::NoStats,
632 }
633 }
634}
635
636#[cfg(test)]
637impl ReadFormat {
638 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
640 Self::new(
641 Arc::clone(&metadata),
642 metadata.column_metadatas.iter().map(|c| c.column_id),
643 )
644 }
645}
646
647fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
649 if pk_dict_array.is_empty() {
650 return Ok(Vec::new());
651 }
652
653 let mut offsets = vec![0];
655 let keys = pk_dict_array.keys();
656 let pk_indices = keys.values();
658 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
659 if *key != pk_indices[i + 1] {
661 offsets.push(i + 1);
663 }
664 }
665 offsets.push(keys.len());
666
667 Ok(offsets)
668}
669
670fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
672 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
673 let keys = UInt32Array::from_value(0, num_rows);
674
675 Arc::new(DictionaryArray::new(keys, values))
677}
678
679pub(crate) fn parquet_row_group_time_range(
682 file_meta: &FileMeta,
683 parquet_meta: &ParquetMetaData,
684 row_group_idx: usize,
685) -> Option<FileTimeRange> {
686 let row_group_meta = parquet_meta.row_group(row_group_idx);
687 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
688 assert!(
689 num_columns >= FIXED_POS_COLUMN_NUM,
690 "file only has {} columns",
691 num_columns
692 );
693 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
694
695 let stats = row_group_meta.column(time_index_pos).statistics()?;
696 let (min, max) = match stats {
698 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
699 Statistics::Int32(_)
700 | Statistics::Boolean(_)
701 | Statistics::Int96(_)
702 | Statistics::Float(_)
703 | Statistics::Double(_)
704 | Statistics::ByteArray(_)
705 | Statistics::FixedLenByteArray(_) => {
706 common_telemetry::warn!(
707 "Invalid statistics {:?} for time index in parquet in {}",
708 stats,
709 file_meta.file_id
710 );
711 return None;
712 }
713 };
714
715 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
716 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
717 let unit = file_meta.time_range.0.unit();
718
719 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
720}
721
722pub(crate) fn need_override_sequence(parquet_meta: &ParquetMetaData) -> bool {
725 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
726 if num_columns < FIXED_POS_COLUMN_NUM {
727 return false;
728 }
729
730 let sequence_pos = num_columns - 2;
732
733 for row_group in parquet_meta.row_groups() {
735 if let Some(Statistics::Int64(value_stats)) = row_group.column(sequence_pos).statistics() {
736 if let (Some(min_val), Some(max_val)) = (value_stats.min_opt(), value_stats.max_opt()) {
737 if *min_val != 0 || *max_val != 0 {
739 return false;
740 }
741 } else {
742 return false;
744 }
745 } else {
746 return false;
748 }
749 }
750
751 !parquet_meta.row_groups().is_empty()
753}
754
755#[cfg(test)]
756mod tests {
757 use api::v1::OpType;
758 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
759 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
760 use datatypes::prelude::ConcreteDataType;
761 use datatypes::schema::ColumnSchema;
762 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
763 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
764 use store_api::storage::RegionId;
765
766 use super::*;
767
768 const TEST_SEQUENCE: u64 = 1;
769 const TEST_OP_TYPE: u8 = OpType::Put as u8;
770
771 fn build_test_region_metadata() -> RegionMetadataRef {
772 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
773 builder
774 .push_column_metadata(ColumnMetadata {
775 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
776 semantic_type: SemanticType::Tag,
777 column_id: 1,
778 })
779 .push_column_metadata(ColumnMetadata {
780 column_schema: ColumnSchema::new(
781 "field1",
782 ConcreteDataType::int64_datatype(),
783 true,
784 ),
785 semantic_type: SemanticType::Field,
786 column_id: 4, })
788 .push_column_metadata(ColumnMetadata {
789 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
790 semantic_type: SemanticType::Tag,
791 column_id: 3,
792 })
793 .push_column_metadata(ColumnMetadata {
794 column_schema: ColumnSchema::new(
795 "field0",
796 ConcreteDataType::int64_datatype(),
797 true,
798 ),
799 semantic_type: SemanticType::Field,
800 column_id: 2,
801 })
802 .push_column_metadata(ColumnMetadata {
803 column_schema: ColumnSchema::new(
804 "ts",
805 ConcreteDataType::timestamp_millisecond_datatype(),
806 false,
807 ),
808 semantic_type: SemanticType::Timestamp,
809 column_id: 5,
810 })
811 .primary_key(vec![1, 3]);
812 Arc::new(builder.build().unwrap())
813 }
814
815 fn build_test_arrow_schema() -> SchemaRef {
816 let fields = vec![
817 Field::new("field1", ArrowDataType::Int64, true),
818 Field::new("field0", ArrowDataType::Int64, true),
819 Field::new(
820 "ts",
821 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
822 false,
823 ),
824 Field::new(
825 "__primary_key",
826 ArrowDataType::Dictionary(
827 Box::new(ArrowDataType::UInt32),
828 Box::new(ArrowDataType::Binary),
829 ),
830 false,
831 ),
832 Field::new("__sequence", ArrowDataType::UInt64, false),
833 Field::new("__op_type", ArrowDataType::UInt8, false),
834 ];
835 Arc::new(Schema::new(fields))
836 }
837
838 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
839 new_batch_with_sequence(primary_key, start_ts, start_field, num_rows, TEST_SEQUENCE)
840 }
841
842 fn new_batch_with_sequence(
843 primary_key: &[u8],
844 start_ts: i64,
845 start_field: i64,
846 num_rows: usize,
847 sequence: u64,
848 ) -> Batch {
849 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
850 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
851 let sequences = Arc::new(UInt64Vector::from_vec(vec![sequence; num_rows]));
852 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
853 let fields = vec![
854 BatchColumn {
855 column_id: 4,
856 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
857 }, BatchColumn {
859 column_id: 2,
860 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
861 }, ];
863
864 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
865 .with_fields(fields)
866 .build()
867 .unwrap()
868 }
869
870 #[test]
871 fn test_to_sst_arrow_schema() {
872 let metadata = build_test_region_metadata();
873 let write_format = WriteFormat::new(metadata);
874 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
875 }
876
877 #[test]
878 fn test_new_primary_key_array() {
879 let array = new_primary_key_array(b"test", 3);
880 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
881 assert_eq!(&expect, &array);
882 }
883
884 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
885 let values = Arc::new(BinaryArray::from_iter_values(
886 pk_row_nums.iter().map(|v| &v.0),
887 ));
888 let mut keys = vec![];
889 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
890 keys.extend(std::iter::repeat_n(index as u32, num_rows));
891 }
892 let keys = UInt32Array::from(keys);
893 Arc::new(DictionaryArray::new(keys, values))
894 }
895
896 #[test]
897 fn test_convert_batch() {
898 let metadata = build_test_region_metadata();
899 let write_format = WriteFormat::new(metadata);
900
901 let num_rows = 4;
902 let batch = new_batch(b"test", 1, 2, num_rows);
903 let columns: Vec<ArrayRef> = vec![
904 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
911 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
912
913 let actual = write_format.convert_batch(&batch).unwrap();
914 assert_eq!(expect_record, actual);
915 }
916
917 #[test]
918 fn test_convert_batch_with_override_sequence() {
919 let metadata = build_test_region_metadata();
920 let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411));
921
922 let num_rows = 4;
923 let batch = new_batch(b"test", 1, 2, num_rows);
924 let columns: Vec<ArrayRef> = vec![
925 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
932 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
933
934 let actual = write_format.convert_batch(&batch).unwrap();
935 assert_eq!(expect_record, actual);
936 }
937
938 #[test]
939 fn test_projection_indices() {
940 let metadata = build_test_region_metadata();
941 let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
943 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
944 let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
946 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
947 let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
949 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
950 let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
952 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
953 }
954
955 #[test]
956 fn test_empty_primary_key_offsets() {
957 let array = build_test_pk_array(&[]);
958 assert!(primary_key_offsets(&array).unwrap().is_empty());
959 }
960
961 #[test]
962 fn test_primary_key_offsets_one_series() {
963 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
964 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
965
966 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
967 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
968
969 let array = build_test_pk_array(&[
970 (b"one".to_vec(), 1),
971 (b"two".to_vec(), 1),
972 (b"three".to_vec(), 1),
973 ]);
974 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
975 }
976
977 #[test]
978 fn test_primary_key_offsets_multi_series() {
979 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
980 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
981
982 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
983 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
984
985 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
986 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
987 }
988
989 #[test]
990 fn test_convert_empty_record_batch() {
991 let metadata = build_test_region_metadata();
992 let arrow_schema = build_test_arrow_schema();
993 let column_ids: Vec<_> = metadata
994 .column_metadatas
995 .iter()
996 .map(|col| col.column_id)
997 .collect();
998 let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
999 assert_eq!(arrow_schema, *read_format.arrow_schema());
1000
1001 let record_batch = RecordBatch::new_empty(arrow_schema);
1002 let mut batches = VecDeque::new();
1003 read_format
1004 .convert_record_batch(&record_batch, None, &mut batches)
1005 .unwrap();
1006 assert!(batches.is_empty());
1007 }
1008
1009 #[test]
1010 fn test_convert_record_batch() {
1011 let metadata = build_test_region_metadata();
1012 let column_ids: Vec<_> = metadata
1013 .column_metadatas
1014 .iter()
1015 .map(|col| col.column_id)
1016 .collect();
1017 let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
1018
1019 let columns: Vec<ArrayRef> = vec![
1020 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1027 let arrow_schema = build_test_arrow_schema();
1028 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1029 let mut batches = VecDeque::new();
1030 read_format
1031 .convert_record_batch(&record_batch, None, &mut batches)
1032 .unwrap();
1033
1034 assert_eq!(
1035 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
1036 batches.into_iter().collect::<Vec<_>>(),
1037 );
1038 }
1039
1040 #[test]
1041 fn test_convert_record_batch_with_override_sequence() {
1042 let metadata = build_test_region_metadata();
1043 let column_ids: Vec<_> = metadata
1044 .column_metadatas
1045 .iter()
1046 .map(|col| col.column_id)
1047 .collect();
1048 let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
1049
1050 let columns: Vec<ArrayRef> = vec![
1051 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
1058 let arrow_schema = build_test_arrow_schema();
1059 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
1060
1061 let override_sequence: u64 = 12345;
1063 let override_sequence_array: ArrayRef =
1064 Arc::new(UInt64Array::from_value(override_sequence, 4));
1065
1066 let mut batches = VecDeque::new();
1067 read_format
1068 .convert_record_batch(&record_batch, Some(&override_sequence_array), &mut batches)
1069 .unwrap();
1070
1071 let expected_batch1 = new_batch_with_sequence(b"one", 1, 1, 2, override_sequence);
1073 let expected_batch2 = new_batch_with_sequence(b"two", 11, 10, 2, override_sequence);
1074
1075 assert_eq!(
1076 vec![expected_batch1, expected_batch2],
1077 batches.into_iter().collect::<Vec<_>>(),
1078 );
1079 }
1080}