1use std::borrow::Borrow;
30use std::collections::{HashMap, VecDeque};
31use std::sync::Arc;
32
33use api::v1::SemanticType;
34use common_time::Timestamp;
35use datafusion_common::ScalarValue;
36use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array, UInt64Array};
37use datatypes::arrow::datatypes::{SchemaRef, UInt32Type};
38use datatypes::arrow::record_batch::RecordBatch;
39use datatypes::prelude::DataType;
40use datatypes::vectors::{Helper, Vector};
41use parquet::file::metadata::{ParquetMetaData, RowGroupMetaData};
42use parquet::file::statistics::Statistics;
43use snafu::{ensure, OptionExt, ResultExt};
44use store_api::metadata::{ColumnMetadata, RegionMetadataRef};
45use store_api::storage::{ColumnId, SequenceNumber};
46
47use crate::error::{
48 ConvertVectorSnafu, InvalidBatchSnafu, InvalidRecordBatchSnafu, NewRecordBatchSnafu, Result,
49};
50use crate::read::{Batch, BatchBuilder, BatchColumn};
51use crate::row_converter::{build_primary_key_codec_with_fields, SortField};
52use crate::sst::file::{FileMeta, FileTimeRange};
53use crate::sst::to_sst_arrow_schema;
54
55pub(crate) type PrimaryKeyArray = DictionaryArray<UInt32Type>;
57
58const FIXED_POS_COLUMN_NUM: usize = 4;
62
63pub(crate) struct WriteFormat {
65 metadata: RegionMetadataRef,
66 arrow_schema: SchemaRef,
68 override_sequence: Option<SequenceNumber>,
69}
70
71impl WriteFormat {
72 pub(crate) fn new(metadata: RegionMetadataRef) -> WriteFormat {
74 let arrow_schema = to_sst_arrow_schema(&metadata);
75 WriteFormat {
76 metadata,
77 arrow_schema,
78 override_sequence: None,
79 }
80 }
81
82 pub(crate) fn with_override_sequence(
84 mut self,
85 override_sequence: Option<SequenceNumber>,
86 ) -> Self {
87 self.override_sequence = override_sequence;
88 self
89 }
90
91 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
93 &self.arrow_schema
94 }
95
96 pub(crate) fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
98 debug_assert_eq!(
99 batch.fields().len() + FIXED_POS_COLUMN_NUM,
100 self.arrow_schema.fields().len()
101 );
102 let mut columns = Vec::with_capacity(batch.fields().len() + FIXED_POS_COLUMN_NUM);
103 for (column, column_metadata) in batch.fields().iter().zip(self.metadata.field_columns()) {
105 ensure!(
106 column.column_id == column_metadata.column_id,
107 InvalidBatchSnafu {
108 reason: format!(
109 "Batch has column {} but metadata has column {}",
110 column.column_id, column_metadata.column_id
111 ),
112 }
113 );
114
115 columns.push(column.data.to_arrow_array());
116 }
117 columns.push(batch.timestamps().to_arrow_array());
119 columns.push(new_primary_key_array(batch.primary_key(), batch.num_rows()));
121
122 if let Some(override_sequence) = self.override_sequence {
123 let sequence_array =
124 Arc::new(UInt64Array::from(vec![override_sequence; batch.num_rows()]));
125 columns.push(sequence_array);
126 } else {
127 columns.push(batch.sequences().to_arrow_array());
128 }
129 columns.push(batch.op_types().to_arrow_array());
130
131 RecordBatch::try_new(self.arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
132 }
133}
134
135pub struct ReadFormat {
137 metadata: RegionMetadataRef,
139 arrow_schema: SchemaRef,
141 field_id_to_index: HashMap<ColumnId, usize>,
144 projection_indices: Vec<usize>,
146 field_id_to_projected_index: HashMap<ColumnId, usize>,
149}
150
151impl ReadFormat {
152 pub fn new(
154 metadata: RegionMetadataRef,
155 column_ids: impl Iterator<Item = ColumnId>,
156 ) -> ReadFormat {
157 let field_id_to_index: HashMap<_, _> = metadata
158 .field_columns()
159 .enumerate()
160 .map(|(index, column)| (column.column_id, index))
161 .collect();
162 let arrow_schema = to_sst_arrow_schema(&metadata);
163
164 let mut projected_field_id_index: Vec<_> = column_ids
166 .filter_map(|column_id| {
167 field_id_to_index
169 .get(&column_id)
170 .copied()
171 .map(|index| (column_id, index))
172 })
173 .collect();
174 let mut projection_indices: Vec<_> = projected_field_id_index
175 .iter()
176 .map(|(_column_id, index)| *index)
177 .chain(arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM..arrow_schema.fields.len())
179 .collect();
180 projection_indices.sort_unstable();
181
182 projected_field_id_index.sort_unstable_by_key(|x| x.1);
185 let field_id_to_projected_index = projected_field_id_index
188 .into_iter()
189 .map(|(column_id, _)| column_id)
190 .enumerate()
191 .map(|(index, column_id)| (column_id, index))
192 .collect();
193
194 ReadFormat {
195 metadata,
196 arrow_schema,
197 field_id_to_index,
198 projection_indices,
199 field_id_to_projected_index,
200 }
201 }
202
203 pub(crate) fn arrow_schema(&self) -> &SchemaRef {
208 &self.arrow_schema
209 }
210
211 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
213 &self.metadata
214 }
215
216 pub(crate) fn projection_indices(&self) -> &[usize] {
218 &self.projection_indices
219 }
220
221 pub fn convert_record_batch(
225 &self,
226 record_batch: &RecordBatch,
227 batches: &mut VecDeque<Batch>,
228 ) -> Result<()> {
229 debug_assert!(batches.is_empty());
230
231 ensure!(
233 record_batch.num_columns() >= FIXED_POS_COLUMN_NUM,
234 InvalidRecordBatchSnafu {
235 reason: format!(
236 "record batch only has {} columns",
237 record_batch.num_columns()
238 ),
239 }
240 );
241
242 let mut fixed_pos_columns = record_batch
243 .columns()
244 .iter()
245 .rev()
246 .take(FIXED_POS_COLUMN_NUM);
247 let op_type_array = fixed_pos_columns.next().unwrap();
249 let sequence_array = fixed_pos_columns.next().unwrap();
250 let pk_array = fixed_pos_columns.next().unwrap();
251 let ts_array = fixed_pos_columns.next().unwrap();
252 let field_batch_columns = self.get_field_batch_columns(record_batch)?;
253
254 let pk_dict_array = pk_array
256 .as_any()
257 .downcast_ref::<PrimaryKeyArray>()
258 .with_context(|| InvalidRecordBatchSnafu {
259 reason: format!("primary key array should not be {:?}", pk_array.data_type()),
260 })?;
261 let offsets = primary_key_offsets(pk_dict_array)?;
262 if offsets.is_empty() {
263 return Ok(());
264 }
265
266 let keys = pk_dict_array.keys();
268 let pk_values = pk_dict_array
269 .values()
270 .as_any()
271 .downcast_ref::<BinaryArray>()
272 .with_context(|| InvalidRecordBatchSnafu {
273 reason: format!(
274 "values of primary key array should not be {:?}",
275 pk_dict_array.values().data_type()
276 ),
277 })?;
278 for (i, start) in offsets[..offsets.len() - 1].iter().enumerate() {
279 let end = offsets[i + 1];
280 let rows_in_batch = end - start;
281 let dict_key = keys.value(*start);
282 let primary_key = pk_values.value(dict_key as usize).to_vec();
283
284 let mut builder = BatchBuilder::new(primary_key);
285 builder
286 .timestamps_array(ts_array.slice(*start, rows_in_batch))?
287 .sequences_array(sequence_array.slice(*start, rows_in_batch))?
288 .op_types_array(op_type_array.slice(*start, rows_in_batch))?;
289 for batch_column in &field_batch_columns {
291 builder.push_field(BatchColumn {
292 column_id: batch_column.column_id,
293 data: batch_column.data.slice(*start, rows_in_batch),
294 });
295 }
296
297 let batch = builder.build()?;
298 batches.push_back(batch);
299 }
300
301 Ok(())
302 }
303
304 pub fn min_values(
306 &self,
307 row_groups: &[impl Borrow<RowGroupMetaData>],
308 column_id: ColumnId,
309 ) -> StatValues {
310 let Some(column) = self.metadata.column_by_id(column_id) else {
311 return StatValues::NoColumn;
313 };
314 match column.semantic_type {
315 SemanticType::Tag => self.tag_values(row_groups, column, true),
316 SemanticType::Field => {
317 let index = self.field_id_to_index.get(&column_id).unwrap();
319 let stats = Self::column_values(row_groups, column, *index, true);
320 StatValues::from_stats_opt(stats)
321 }
322 SemanticType::Timestamp => {
323 let index = self.time_index_position();
324 let stats = Self::column_values(row_groups, column, index, true);
325 StatValues::from_stats_opt(stats)
326 }
327 }
328 }
329
330 pub fn max_values(
332 &self,
333 row_groups: &[impl Borrow<RowGroupMetaData>],
334 column_id: ColumnId,
335 ) -> StatValues {
336 let Some(column) = self.metadata.column_by_id(column_id) else {
337 return StatValues::NoColumn;
339 };
340 match column.semantic_type {
341 SemanticType::Tag => self.tag_values(row_groups, column, false),
342 SemanticType::Field => {
343 let index = self.field_id_to_index.get(&column_id).unwrap();
345 let stats = Self::column_values(row_groups, column, *index, false);
346 StatValues::from_stats_opt(stats)
347 }
348 SemanticType::Timestamp => {
349 let index = self.time_index_position();
350 let stats = Self::column_values(row_groups, column, index, false);
351 StatValues::from_stats_opt(stats)
352 }
353 }
354 }
355
356 pub fn null_counts(
358 &self,
359 row_groups: &[impl Borrow<RowGroupMetaData>],
360 column_id: ColumnId,
361 ) -> StatValues {
362 let Some(column) = self.metadata.column_by_id(column_id) else {
363 return StatValues::NoColumn;
365 };
366 match column.semantic_type {
367 SemanticType::Tag => StatValues::NoStats,
368 SemanticType::Field => {
369 let index = self.field_id_to_index.get(&column_id).unwrap();
371 let stats = Self::column_null_counts(row_groups, *index);
372 StatValues::from_stats_opt(stats)
373 }
374 SemanticType::Timestamp => {
375 let index = self.time_index_position();
376 let stats = Self::column_null_counts(row_groups, index);
377 StatValues::from_stats_opt(stats)
378 }
379 }
380 }
381
382 fn get_field_batch_columns(&self, record_batch: &RecordBatch) -> Result<Vec<BatchColumn>> {
384 record_batch
385 .columns()
386 .iter()
387 .zip(record_batch.schema().fields())
388 .take(record_batch.num_columns() - FIXED_POS_COLUMN_NUM) .map(|(array, field)| {
390 let vector = Helper::try_into_vector(array.clone()).context(ConvertVectorSnafu)?;
391 let column = self
392 .metadata
393 .column_by_name(field.name())
394 .with_context(|| InvalidRecordBatchSnafu {
395 reason: format!("column {} not found in metadata", field.name()),
396 })?;
397
398 Ok(BatchColumn {
399 column_id: column.column_id,
400 data: vector,
401 })
402 })
403 .collect()
404 }
405
406 fn tag_values(
408 &self,
409 row_groups: &[impl Borrow<RowGroupMetaData>],
410 column: &ColumnMetadata,
411 is_min: bool,
412 ) -> StatValues {
413 let is_first_tag = self
414 .metadata
415 .primary_key
416 .first()
417 .map(|id| *id == column.column_id)
418 .unwrap_or(false);
419 if !is_first_tag {
420 return StatValues::NoStats;
422 }
423
424 StatValues::from_stats_opt(self.first_tag_values(row_groups, column, is_min))
425 }
426
427 fn first_tag_values(
430 &self,
431 row_groups: &[impl Borrow<RowGroupMetaData>],
432 column: &ColumnMetadata,
433 is_min: bool,
434 ) -> Option<ArrayRef> {
435 debug_assert!(self
436 .metadata
437 .primary_key
438 .first()
439 .map(|id| *id == column.column_id)
440 .unwrap_or(false));
441
442 let primary_key_encoding = self.metadata.primary_key_encoding;
443 let converter = build_primary_key_codec_with_fields(
444 primary_key_encoding,
445 [(
446 column.column_id,
447 SortField::new(column.column_schema.data_type.clone()),
448 )]
449 .into_iter(),
450 );
451
452 let values = row_groups.iter().map(|meta| {
453 let stats = meta
454 .borrow()
455 .column(self.primary_key_position())
456 .statistics()?;
457 match stats {
458 Statistics::Boolean(_) => None,
459 Statistics::Int32(_) => None,
460 Statistics::Int64(_) => None,
461 Statistics::Int96(_) => None,
462 Statistics::Float(_) => None,
463 Statistics::Double(_) => None,
464 Statistics::ByteArray(s) => {
465 let bytes = if is_min {
466 s.min_bytes_opt()?
467 } else {
468 s.max_bytes_opt()?
469 };
470 converter.decode_leftmost(bytes).ok()?
471 }
472 Statistics::FixedLenByteArray(_) => None,
473 }
474 });
475 let mut builder = column
476 .column_schema
477 .data_type
478 .create_mutable_vector(row_groups.len());
479 for value_opt in values {
480 match value_opt {
481 Some(v) => builder.push_value_ref(v.as_value_ref()),
483 None => builder.push_null(),
484 }
485 }
486 let vector = builder.to_vector();
487
488 Some(vector.to_arrow_array())
489 }
490
491 fn column_values(
494 row_groups: &[impl Borrow<RowGroupMetaData>],
495 column: &ColumnMetadata,
496 column_index: usize,
497 is_min: bool,
498 ) -> Option<ArrayRef> {
499 let null_scalar: ScalarValue = column
500 .column_schema
501 .data_type
502 .as_arrow_type()
503 .try_into()
504 .ok()?;
505 let scalar_values = row_groups
506 .iter()
507 .map(|meta| {
508 let stats = meta.borrow().column(column_index).statistics()?;
509 match stats {
510 Statistics::Boolean(s) => Some(ScalarValue::Boolean(Some(if is_min {
511 *s.min_opt()?
512 } else {
513 *s.max_opt()?
514 }))),
515 Statistics::Int32(s) => Some(ScalarValue::Int32(Some(if is_min {
516 *s.min_opt()?
517 } else {
518 *s.max_opt()?
519 }))),
520 Statistics::Int64(s) => Some(ScalarValue::Int64(Some(if is_min {
521 *s.min_opt()?
522 } else {
523 *s.max_opt()?
524 }))),
525
526 Statistics::Int96(_) => None,
527 Statistics::Float(s) => Some(ScalarValue::Float32(Some(if is_min {
528 *s.min_opt()?
529 } else {
530 *s.max_opt()?
531 }))),
532 Statistics::Double(s) => Some(ScalarValue::Float64(Some(if is_min {
533 *s.min_opt()?
534 } else {
535 *s.max_opt()?
536 }))),
537 Statistics::ByteArray(s) => {
538 let bytes = if is_min {
539 s.min_bytes_opt()?
540 } else {
541 s.max_bytes_opt()?
542 };
543 let s = String::from_utf8(bytes.to_vec()).ok();
544 Some(ScalarValue::Utf8(s))
545 }
546
547 Statistics::FixedLenByteArray(_) => None,
548 }
549 })
550 .map(|maybe_scalar| maybe_scalar.unwrap_or_else(|| null_scalar.clone()))
551 .collect::<Vec<ScalarValue>>();
552 debug_assert_eq!(scalar_values.len(), row_groups.len());
553 ScalarValue::iter_to_array(scalar_values).ok()
554 }
555
556 fn column_null_counts(
558 row_groups: &[impl Borrow<RowGroupMetaData>],
559 column_index: usize,
560 ) -> Option<ArrayRef> {
561 let values = row_groups.iter().map(|meta| {
562 let col = meta.borrow().column(column_index);
563 let stat = col.statistics()?;
564 stat.null_count_opt()
565 });
566 Some(Arc::new(UInt64Array::from_iter(values)))
567 }
568
569 fn primary_key_position(&self) -> usize {
571 self.arrow_schema.fields.len() - 3
572 }
573
574 fn time_index_position(&self) -> usize {
576 self.arrow_schema.fields.len() - FIXED_POS_COLUMN_NUM
577 }
578
579 pub fn field_index_by_id(&self, column_id: ColumnId) -> Option<usize> {
581 self.field_id_to_projected_index.get(&column_id).copied()
582 }
583}
584
585pub enum StatValues {
590 Values(ArrayRef),
592 NoColumn,
594 NoStats,
596}
597
598impl StatValues {
599 pub fn from_stats_opt(stats: Option<ArrayRef>) -> Self {
601 match stats {
602 Some(stats) => StatValues::Values(stats),
603 None => StatValues::NoStats,
604 }
605 }
606}
607
608#[cfg(test)]
609impl ReadFormat {
610 pub fn new_with_all_columns(metadata: RegionMetadataRef) -> ReadFormat {
612 Self::new(
613 Arc::clone(&metadata),
614 metadata.column_metadatas.iter().map(|c| c.column_id),
615 )
616 }
617}
618
619fn primary_key_offsets(pk_dict_array: &PrimaryKeyArray) -> Result<Vec<usize>> {
621 if pk_dict_array.is_empty() {
622 return Ok(Vec::new());
623 }
624
625 let mut offsets = vec![0];
627 let keys = pk_dict_array.keys();
628 let pk_indices = keys.values();
630 for (i, key) in pk_indices.iter().take(keys.len() - 1).enumerate() {
631 if *key != pk_indices[i + 1] {
633 offsets.push(i + 1);
635 }
636 }
637 offsets.push(keys.len());
638
639 Ok(offsets)
640}
641
642fn new_primary_key_array(primary_key: &[u8], num_rows: usize) -> ArrayRef {
644 let values = Arc::new(BinaryArray::from_iter_values([primary_key]));
645 let keys = UInt32Array::from_value(0, num_rows);
646
647 Arc::new(DictionaryArray::new(keys, values))
649}
650
651pub(crate) fn parquet_row_group_time_range(
654 file_meta: &FileMeta,
655 parquet_meta: &ParquetMetaData,
656 row_group_idx: usize,
657) -> Option<FileTimeRange> {
658 let row_group_meta = parquet_meta.row_group(row_group_idx);
659 let num_columns = parquet_meta.file_metadata().schema_descr().num_columns();
660 assert!(
661 num_columns >= FIXED_POS_COLUMN_NUM,
662 "file only has {} columns",
663 num_columns
664 );
665 let time_index_pos = num_columns - FIXED_POS_COLUMN_NUM;
666
667 let stats = row_group_meta.column(time_index_pos).statistics()?;
668 let (min, max) = match stats {
670 Statistics::Int64(value_stats) => (*value_stats.min_opt()?, *value_stats.max_opt()?),
671 Statistics::Int32(_)
672 | Statistics::Boolean(_)
673 | Statistics::Int96(_)
674 | Statistics::Float(_)
675 | Statistics::Double(_)
676 | Statistics::ByteArray(_)
677 | Statistics::FixedLenByteArray(_) => {
678 common_telemetry::warn!(
679 "Invalid statistics {:?} for time index in parquet in {}",
680 stats,
681 file_meta.file_id
682 );
683 return None;
684 }
685 };
686
687 debug_assert!(min >= file_meta.time_range.0.value() && min <= file_meta.time_range.1.value());
688 debug_assert!(max >= file_meta.time_range.0.value() && max <= file_meta.time_range.1.value());
689 let unit = file_meta.time_range.0.unit();
690
691 Some((Timestamp::new(min, unit), Timestamp::new(max, unit)))
692}
693
694#[cfg(test)]
695mod tests {
696 use api::v1::OpType;
697 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
698 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema, TimeUnit};
699 use datatypes::prelude::ConcreteDataType;
700 use datatypes::schema::ColumnSchema;
701 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
702 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
703 use store_api::storage::RegionId;
704
705 use super::*;
706
707 const TEST_SEQUENCE: u64 = 1;
708 const TEST_OP_TYPE: u8 = OpType::Put as u8;
709
710 fn build_test_region_metadata() -> RegionMetadataRef {
711 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
712 builder
713 .push_column_metadata(ColumnMetadata {
714 column_schema: ColumnSchema::new("tag0", ConcreteDataType::int64_datatype(), true),
715 semantic_type: SemanticType::Tag,
716 column_id: 1,
717 })
718 .push_column_metadata(ColumnMetadata {
719 column_schema: ColumnSchema::new(
720 "field1",
721 ConcreteDataType::int64_datatype(),
722 true,
723 ),
724 semantic_type: SemanticType::Field,
725 column_id: 4, })
727 .push_column_metadata(ColumnMetadata {
728 column_schema: ColumnSchema::new("tag1", ConcreteDataType::int64_datatype(), true),
729 semantic_type: SemanticType::Tag,
730 column_id: 3,
731 })
732 .push_column_metadata(ColumnMetadata {
733 column_schema: ColumnSchema::new(
734 "field0",
735 ConcreteDataType::int64_datatype(),
736 true,
737 ),
738 semantic_type: SemanticType::Field,
739 column_id: 2,
740 })
741 .push_column_metadata(ColumnMetadata {
742 column_schema: ColumnSchema::new(
743 "ts",
744 ConcreteDataType::timestamp_millisecond_datatype(),
745 false,
746 ),
747 semantic_type: SemanticType::Timestamp,
748 column_id: 5,
749 })
750 .primary_key(vec![1, 3]);
751 Arc::new(builder.build().unwrap())
752 }
753
754 fn build_test_arrow_schema() -> SchemaRef {
755 let fields = vec![
756 Field::new("field1", ArrowDataType::Int64, true),
757 Field::new("field0", ArrowDataType::Int64, true),
758 Field::new(
759 "ts",
760 ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
761 false,
762 ),
763 Field::new(
764 "__primary_key",
765 ArrowDataType::Dictionary(
766 Box::new(ArrowDataType::UInt32),
767 Box::new(ArrowDataType::Binary),
768 ),
769 false,
770 ),
771 Field::new("__sequence", ArrowDataType::UInt64, false),
772 Field::new("__op_type", ArrowDataType::UInt8, false),
773 ];
774 Arc::new(Schema::new(fields))
775 }
776
777 fn new_batch(primary_key: &[u8], start_ts: i64, start_field: i64, num_rows: usize) -> Batch {
778 let ts_values = (0..num_rows).map(|i| start_ts + i as i64);
779 let timestamps = Arc::new(TimestampMillisecondVector::from_values(ts_values));
780 let sequences = Arc::new(UInt64Vector::from_vec(vec![TEST_SEQUENCE; num_rows]));
781 let op_types = Arc::new(UInt8Vector::from_vec(vec![TEST_OP_TYPE; num_rows]));
782 let fields = vec![
783 BatchColumn {
784 column_id: 4,
785 data: Arc::new(Int64Vector::from_vec(vec![start_field; num_rows])),
786 }, BatchColumn {
788 column_id: 2,
789 data: Arc::new(Int64Vector::from_vec(vec![start_field + 1; num_rows])),
790 }, ];
792
793 BatchBuilder::with_required_columns(primary_key.to_vec(), timestamps, sequences, op_types)
794 .with_fields(fields)
795 .build()
796 .unwrap()
797 }
798
799 #[test]
800 fn test_to_sst_arrow_schema() {
801 let metadata = build_test_region_metadata();
802 let write_format = WriteFormat::new(metadata);
803 assert_eq!(&build_test_arrow_schema(), write_format.arrow_schema());
804 }
805
806 #[test]
807 fn test_new_primary_key_array() {
808 let array = new_primary_key_array(b"test", 3);
809 let expect = build_test_pk_array(&[(b"test".to_vec(), 3)]) as ArrayRef;
810 assert_eq!(&expect, &array);
811 }
812
813 fn build_test_pk_array(pk_row_nums: &[(Vec<u8>, usize)]) -> Arc<PrimaryKeyArray> {
814 let values = Arc::new(BinaryArray::from_iter_values(
815 pk_row_nums.iter().map(|v| &v.0),
816 ));
817 let mut keys = vec![];
818 for (index, num_rows) in pk_row_nums.iter().map(|v| v.1).enumerate() {
819 keys.extend(std::iter::repeat_n(index as u32, num_rows));
820 }
821 let keys = UInt32Array::from(keys);
822 Arc::new(DictionaryArray::new(keys, values))
823 }
824
825 #[test]
826 fn test_convert_batch() {
827 let metadata = build_test_region_metadata();
828 let write_format = WriteFormat::new(metadata);
829
830 let num_rows = 4;
831 let batch = new_batch(b"test", 1, 2, num_rows);
832 let columns: Vec<ArrayRef> = vec![
833 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
840 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
841
842 let actual = write_format.convert_batch(&batch).unwrap();
843 assert_eq!(expect_record, actual);
844 }
845
846 #[test]
847 fn test_convert_batch_with_override_sequence() {
848 let metadata = build_test_region_metadata();
849 let write_format = WriteFormat::new(metadata).with_override_sequence(Some(415411));
850
851 let num_rows = 4;
852 let batch = new_batch(b"test", 1, 2, num_rows);
853 let columns: Vec<ArrayRef> = vec![
854 Arc::new(Int64Array::from(vec![2; num_rows])), Arc::new(Int64Array::from(vec![3; num_rows])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 3, 4])), build_test_pk_array(&[(b"test".to_vec(), num_rows)]), Arc::new(UInt64Array::from(vec![415411; num_rows])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; num_rows])), ];
861 let expect_record = RecordBatch::try_new(build_test_arrow_schema(), columns).unwrap();
862
863 let actual = write_format.convert_batch(&batch).unwrap();
864 assert_eq!(expect_record, actual);
865 }
866
867 #[test]
868 fn test_projection_indices() {
869 let metadata = build_test_region_metadata();
870 let read_format = ReadFormat::new(metadata.clone(), [3].iter().copied());
872 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
873 let read_format = ReadFormat::new(metadata.clone(), [4].iter().copied());
875 assert_eq!(&[0, 2, 3, 4, 5], read_format.projection_indices());
876 let read_format = ReadFormat::new(metadata.clone(), [5].iter().copied());
878 assert_eq!(&[2, 3, 4, 5], read_format.projection_indices());
879 let read_format = ReadFormat::new(metadata, [2, 1, 5].iter().copied());
881 assert_eq!(&[1, 2, 3, 4, 5], read_format.projection_indices());
882 }
883
884 #[test]
885 fn test_empty_primary_key_offsets() {
886 let array = build_test_pk_array(&[]);
887 assert!(primary_key_offsets(&array).unwrap().is_empty());
888 }
889
890 #[test]
891 fn test_primary_key_offsets_one_series() {
892 let array = build_test_pk_array(&[(b"one".to_vec(), 1)]);
893 assert_eq!(vec![0, 1], primary_key_offsets(&array).unwrap());
894
895 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 1)]);
896 assert_eq!(vec![0, 1, 2], primary_key_offsets(&array).unwrap());
897
898 let array = build_test_pk_array(&[
899 (b"one".to_vec(), 1),
900 (b"two".to_vec(), 1),
901 (b"three".to_vec(), 1),
902 ]);
903 assert_eq!(vec![0, 1, 2, 3], primary_key_offsets(&array).unwrap());
904 }
905
906 #[test]
907 fn test_primary_key_offsets_multi_series() {
908 let array = build_test_pk_array(&[(b"one".to_vec(), 1), (b"two".to_vec(), 3)]);
909 assert_eq!(vec![0, 1, 4], primary_key_offsets(&array).unwrap());
910
911 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 1)]);
912 assert_eq!(vec![0, 3, 4], primary_key_offsets(&array).unwrap());
913
914 let array = build_test_pk_array(&[(b"one".to_vec(), 3), (b"two".to_vec(), 3)]);
915 assert_eq!(vec![0, 3, 6], primary_key_offsets(&array).unwrap());
916 }
917
918 #[test]
919 fn test_convert_empty_record_batch() {
920 let metadata = build_test_region_metadata();
921 let arrow_schema = build_test_arrow_schema();
922 let column_ids: Vec<_> = metadata
923 .column_metadatas
924 .iter()
925 .map(|col| col.column_id)
926 .collect();
927 let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
928 assert_eq!(arrow_schema, *read_format.arrow_schema());
929
930 let record_batch = RecordBatch::new_empty(arrow_schema);
931 let mut batches = VecDeque::new();
932 read_format
933 .convert_record_batch(&record_batch, &mut batches)
934 .unwrap();
935 assert!(batches.is_empty());
936 }
937
938 #[test]
939 fn test_convert_record_batch() {
940 let metadata = build_test_region_metadata();
941 let column_ids: Vec<_> = metadata
942 .column_metadatas
943 .iter()
944 .map(|col| col.column_id)
945 .collect();
946 let read_format = ReadFormat::new(metadata, column_ids.iter().copied());
947
948 let columns: Vec<ArrayRef> = vec![
949 Arc::new(Int64Array::from(vec![1, 1, 10, 10])), Arc::new(Int64Array::from(vec![2, 2, 11, 11])), Arc::new(TimestampMillisecondArray::from(vec![1, 2, 11, 12])), build_test_pk_array(&[(b"one".to_vec(), 2), (b"two".to_vec(), 2)]), Arc::new(UInt64Array::from(vec![TEST_SEQUENCE; 4])), Arc::new(UInt8Array::from(vec![TEST_OP_TYPE; 4])), ];
956 let arrow_schema = build_test_arrow_schema();
957 let record_batch = RecordBatch::try_new(arrow_schema, columns).unwrap();
958 let mut batches = VecDeque::new();
959 read_format
960 .convert_record_batch(&record_batch, &mut batches)
961 .unwrap();
962
963 assert_eq!(
964 vec![new_batch(b"one", 1, 1, 2), new_batch(b"two", 11, 10, 2)],
965 batches.into_iter().collect::<Vec<_>>(),
966 );
967 }
968}