1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
33 build_primary_key_codec_with_fields,
34};
35use snafu::{OptionExt, ResultExt, ensure};
36use store_api::codec::PrimaryKeyEncoding;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39
40use crate::error::{
41 CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
42 NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
43};
44use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
45use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
46use crate::read::{Batch, BatchColumn, BatchReader};
47use crate::sst::parquet::flat_format::primary_key_column_index;
48use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
50
51pub struct CompatReader<R> {
53 reader: R,
55 compat: PrimaryKeyCompatBatch,
57}
58
59impl<R> CompatReader<R> {
60 pub fn new(
65 mapper: &ProjectionMapper,
66 reader_meta: RegionMetadataRef,
67 reader: R,
68 ) -> Result<CompatReader<R>> {
69 Ok(CompatReader {
70 reader,
71 compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
72 })
73 }
74}
75
76#[async_trait::async_trait]
77impl<R: BatchReader> BatchReader for CompatReader<R> {
78 async fn next_batch(&mut self) -> Result<Option<Batch>> {
79 let Some(mut batch) = self.reader.next_batch().await? else {
80 return Ok(None);
81 };
82
83 batch = self.compat.compat_batch(batch)?;
84
85 Ok(Some(batch))
86 }
87}
88
89pub(crate) enum CompatBatch {
91 PrimaryKey(PrimaryKeyCompatBatch),
93 Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100 match self {
101 CompatBatch::PrimaryKey(batch) => Some(batch),
102 _ => None,
103 }
104 }
105
106 pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
108 match self {
109 CompatBatch::Flat(batch) => Some(batch),
110 _ => None,
111 }
112 }
113}
114
115pub(crate) struct PrimaryKeyCompatBatch {
117 rewrite_pk: Option<RewritePrimaryKey>,
119 compat_pk: Option<CompatPrimaryKey>,
121 compat_fields: Option<CompatFields>,
123}
124
125impl PrimaryKeyCompatBatch {
126 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
130 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
131 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
132 let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
133 reason: "Unexpected format",
134 })?;
135 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
136
137 Ok(Self {
138 rewrite_pk,
139 compat_pk,
140 compat_fields,
141 })
142 }
143
144 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
146 if let Some(rewrite_pk) = &self.rewrite_pk {
147 batch = rewrite_pk.compat(batch)?;
148 }
149 if let Some(compat_pk) = &self.compat_pk {
150 batch = compat_pk.compat(batch)?;
151 }
152 if let Some(compat_fields) = &self.compat_fields {
153 batch = compat_fields.compat(batch);
154 }
155
156 Ok(batch)
157 }
158}
159
160pub(crate) fn has_same_columns_and_pk_encoding(
162 left: &RegionMetadata,
163 right: &RegionMetadata,
164) -> bool {
165 if left.primary_key_encoding != right.primary_key_encoding {
166 return false;
167 }
168
169 if left.column_metadatas.len() != right.column_metadatas.len() {
170 return false;
171 }
172
173 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
174 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
175 return false;
176 }
177 debug_assert_eq!(
178 left_col.column_schema.data_type,
179 right_col.column_schema.data_type
180 );
181 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
182 }
183
184 true
185}
186
187pub(crate) struct FlatCompatBatch {
189 index_or_defaults: Vec<IndexOrDefault>,
191 arrow_schema: SchemaRef,
193 compat_pk: FlatCompatPrimaryKey,
195}
196
197impl FlatCompatBatch {
198 pub(crate) fn try_new(
205 mapper: &FlatProjectionMapper,
206 actual: &RegionMetadataRef,
207 format_projection: &FormatProjection,
208 compaction: bool,
209 ) -> Result<Option<Self>> {
210 let actual_schema = flat_projected_columns(actual, format_projection);
211 let expect_schema = mapper.batch_schema();
212 if expect_schema == actual_schema {
213 return Ok(None);
216 }
217
218 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
219 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
221 }
222
223 let (index_or_defaults, fields) =
224 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
225
226 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
227
228 Ok(Some(Self {
229 index_or_defaults,
230 arrow_schema: Arc::new(Schema::new(fields)),
231 compat_pk,
232 }))
233 }
234
235 fn compute_index_and_fields(
236 actual_schema: &[(ColumnId, ConcreteDataType)],
237 expect_schema: &[(ColumnId, ConcreteDataType)],
238 expect_metadata: &RegionMetadata,
239 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
240 let actual_schema_index: HashMap<_, _> = actual_schema
242 .iter()
243 .enumerate()
244 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
245 .collect();
246
247 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
248 let mut fields = Vec::with_capacity(expect_schema.len());
249 for (column_id, expect_data_type) in expect_schema {
250 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
252 let expect_column = &expect_metadata.column_metadatas[column_index];
253 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
254 if expect_column.semantic_type == SemanticType::Tag {
256 fields.push(tag_maybe_to_dictionary_field(
257 &expect_column.column_schema.data_type,
258 column_field,
259 ));
260 } else {
261 fields.push(column_field.clone());
262 };
263
264 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
265 let mut cast_type = None;
266
267 if expect_data_type != *actual_data_type {
269 cast_type = Some(expect_data_type.clone())
270 }
271 index_or_defaults.push(IndexOrDefault::Index {
273 pos: *index,
274 cast_type,
275 });
276 } else {
277 let default_vector = expect_column
279 .column_schema
280 .create_default_vector(1)
281 .context(CreateDefaultSnafu {
282 region_id: expect_metadata.region_id,
283 column: &expect_column.column_schema.name,
284 })?
285 .with_context(|| CompatReaderSnafu {
286 region_id: expect_metadata.region_id,
287 reason: format!(
288 "column {} does not have a default value to read",
289 expect_column.column_schema.name
290 ),
291 })?;
292 index_or_defaults.push(IndexOrDefault::DefaultValue {
293 column_id: expect_column.column_id,
294 default_vector,
295 semantic_type: expect_column.semantic_type,
296 });
297 };
298 }
299 fields.extend_from_slice(&internal_fields());
300
301 Ok((index_or_defaults, fields))
302 }
303
304 fn try_new_compact_sparse(
305 mapper: &FlatProjectionMapper,
306 actual: &RegionMetadataRef,
307 ) -> Result<Option<Self>> {
308 ensure!(
311 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
312 UnsupportedOperationSnafu {
313 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
314 }
315 );
316
317 let actual_schema: Vec<_> = actual
320 .field_columns()
321 .chain([actual.time_index_column()])
322 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
323 .collect();
324 let expect_schema: Vec<_> = mapper
325 .metadata()
326 .field_columns()
327 .chain([mapper.metadata().time_index_column()])
328 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
329 .collect();
330
331 let (index_or_defaults, fields) =
332 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
333
334 let compat_pk = FlatCompatPrimaryKey::default();
335
336 Ok(Some(Self {
337 index_or_defaults,
338 arrow_schema: Arc::new(Schema::new(fields)),
339 compat_pk,
340 }))
341 }
342
343 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
345 let len = batch.num_rows();
346 let columns = self
347 .index_or_defaults
348 .iter()
349 .map(|index_or_default| match index_or_default {
350 IndexOrDefault::Index { pos, cast_type } => {
351 let old_column = batch.column(*pos);
352
353 if let Some(ty) = cast_type {
354 let casted =
357 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
358 .context(ComputeArrowSnafu)?;
359 Ok(casted)
360 } else {
361 Ok(old_column.clone())
362 }
363 }
364 IndexOrDefault::DefaultValue {
365 column_id: _,
366 default_vector,
367 semantic_type,
368 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
369 })
370 .chain(
371 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
373 .iter()
374 .map(|col| Ok(col.clone())),
375 )
376 .collect::<Result<Vec<_>>>()?;
377
378 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
379 .context(NewRecordBatchSnafu)?;
380
381 self.compat_pk.compat(compat_batch)
383 }
384}
385
386fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
388 assert_eq!(1, vector.len());
389 if is_tag {
390 let values = vector.to_arrow_array();
391 if values.is_null(0) {
392 let keys = UInt32Array::new_null(to_len);
394 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
395 } else {
396 let keys = UInt32Array::from_value(0, to_len);
397 Ok(Arc::new(DictionaryArray::new(keys, values)))
398 }
399 } else {
400 let keys = UInt32Array::from_value(0, to_len);
401 take(
402 &vector.to_arrow_array(),
403 &keys,
404 Some(TakeOptions {
405 check_bounds: false,
406 }),
407 )
408 .context(ComputeArrowSnafu)
409 }
410}
411
412#[derive(Debug)]
414struct CompatPrimaryKey {
415 converter: Arc<dyn PrimaryKeyCodec>,
417 values: Vec<(ColumnId, Value)>,
419}
420
421impl CompatPrimaryKey {
422 fn compat(&self, mut batch: Batch) -> Result<Batch> {
424 let mut buffer = Vec::with_capacity(
425 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
426 );
427 buffer.extend_from_slice(batch.primary_key());
428 self.converter
429 .encode_values(&self.values, &mut buffer)
430 .context(EncodeSnafu)?;
431
432 batch.set_primary_key(buffer);
433
434 if let Some(pk_values) = &mut batch.pk_values {
436 pk_values.extend(&self.values);
437 }
438
439 Ok(batch)
440 }
441}
442
443#[derive(Debug)]
445struct CompatFields {
446 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
448 index_or_defaults: Vec<IndexOrDefault>,
450}
451
452impl CompatFields {
453 #[must_use]
455 fn compat(&self, batch: Batch) -> Batch {
456 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
457 debug_assert!(
458 self.actual_fields
459 .iter()
460 .zip(batch.fields())
461 .all(|((id, _), batch_column)| *id == batch_column.column_id)
462 );
463
464 let len = batch.num_rows();
465 let fields = self
466 .index_or_defaults
467 .iter()
468 .map(|index_or_default| match index_or_default {
469 IndexOrDefault::Index { pos, cast_type } => {
470 let old_column = &batch.fields()[*pos];
471
472 let data = if let Some(ty) = cast_type {
473 old_column.data.cast(ty).unwrap()
476 } else {
477 old_column.data.clone()
478 };
479 BatchColumn {
480 column_id: old_column.column_id,
481 data,
482 }
483 }
484 IndexOrDefault::DefaultValue {
485 column_id,
486 default_vector,
487 semantic_type: _,
488 } => {
489 let data = default_vector.replicate(&[len]);
490 BatchColumn {
491 column_id: *column_id,
492 data,
493 }
494 }
495 })
496 .collect();
497
498 batch.with_fields(fields).unwrap()
500 }
501}
502
503fn may_rewrite_primary_key(
504 expect: &RegionMetadata,
505 actual: &RegionMetadata,
506) -> Option<RewritePrimaryKey> {
507 if expect.primary_key_encoding == actual.primary_key_encoding {
508 return None;
509 }
510
511 let fields = expect.primary_key.clone();
512 let original = build_primary_key_codec(actual);
513 let new = build_primary_key_codec(expect);
514
515 Some(RewritePrimaryKey {
516 original,
517 new,
518 fields,
519 })
520}
521
522fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
524 ensure!(
525 actual.primary_key.len() <= expect.primary_key.len(),
526 CompatReaderSnafu {
527 region_id: expect.region_id,
528 reason: format!(
529 "primary key has more columns {} than expect {}",
530 actual.primary_key.len(),
531 expect.primary_key.len()
532 ),
533 }
534 );
535 ensure!(
536 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
537 CompatReaderSnafu {
538 region_id: expect.region_id,
539 reason: format!(
540 "primary key has different prefix, expect: {:?}, actual: {:?}",
541 expect.primary_key, actual.primary_key
542 ),
543 }
544 );
545
546 Ok(actual.primary_key.len() == expect.primary_key.len())
547}
548
549fn may_compat_primary_key(
551 expect: &RegionMetadata,
552 actual: &RegionMetadata,
553) -> Result<Option<CompatPrimaryKey>> {
554 if is_primary_key_same(expect, actual)? {
555 return Ok(None);
556 }
557
558 let to_add = &expect.primary_key[actual.primary_key.len()..];
560 let mut fields = Vec::with_capacity(to_add.len());
561 let mut values = Vec::with_capacity(to_add.len());
562 for column_id in to_add {
563 let column = expect.column_by_id(*column_id).unwrap();
565 fields.push((
566 *column_id,
567 SortField::new(column.column_schema.data_type.clone()),
568 ));
569 let default_value = column
570 .column_schema
571 .create_default()
572 .context(CreateDefaultSnafu {
573 region_id: expect.region_id,
574 column: &column.column_schema.name,
575 })?
576 .with_context(|| CompatReaderSnafu {
577 region_id: expect.region_id,
578 reason: format!(
579 "key column {} does not have a default value to read",
580 column.column_schema.name
581 ),
582 })?;
583 values.push((*column_id, default_value));
584 }
585 let converter =
587 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
588
589 Ok(Some(CompatPrimaryKey { converter, values }))
590}
591
592fn may_compat_fields(
594 mapper: &PrimaryKeyProjectionMapper,
595 actual: &RegionMetadata,
596) -> Result<Option<CompatFields>> {
597 let expect_fields = mapper.batch_fields();
598 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
599 if expect_fields == actual_fields {
600 return Ok(None);
601 }
602
603 let source_field_index: HashMap<_, _> = actual_fields
604 .iter()
605 .enumerate()
606 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
607 .collect();
608
609 let index_or_defaults = expect_fields
610 .iter()
611 .map(|(column_id, expect_data_type)| {
612 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
613 let mut cast_type = None;
614
615 if expect_data_type != *actual_data_type {
616 cast_type = Some(expect_data_type.clone())
617 }
618 Ok(IndexOrDefault::Index {
620 pos: *index,
621 cast_type,
622 })
623 } else {
624 let column = mapper.metadata().column_by_id(*column_id).unwrap();
626 let default_vector = column
628 .column_schema
629 .create_default_vector(1)
630 .context(CreateDefaultSnafu {
631 region_id: mapper.metadata().region_id,
632 column: &column.column_schema.name,
633 })?
634 .with_context(|| CompatReaderSnafu {
635 region_id: mapper.metadata().region_id,
636 reason: format!(
637 "column {} does not have a default value to read",
638 column.column_schema.name
639 ),
640 })?;
641 Ok(IndexOrDefault::DefaultValue {
642 column_id: column.column_id,
643 default_vector,
644 semantic_type: SemanticType::Field,
645 })
646 }
647 })
648 .collect::<Result<Vec<_>>>()?;
649
650 Ok(Some(CompatFields {
651 actual_fields,
652 index_or_defaults,
653 }))
654}
655
656#[derive(Debug)]
658enum IndexOrDefault {
659 Index {
661 pos: usize,
662 cast_type: Option<ConcreteDataType>,
663 },
664 DefaultValue {
666 column_id: ColumnId,
668 default_vector: VectorRef,
670 semantic_type: SemanticType,
672 },
673}
674
675struct RewritePrimaryKey {
677 original: Arc<dyn PrimaryKeyCodec>,
679 new: Arc<dyn PrimaryKeyCodec>,
681 fields: Vec<ColumnId>,
683}
684
685impl RewritePrimaryKey {
686 fn compat(&self, mut batch: Batch) -> Result<Batch> {
688 if batch.pk_values().is_none() {
689 let new_pk_values = self
690 .original
691 .decode(batch.primary_key())
692 .context(DecodeSnafu)?;
693 batch.set_pk_values(new_pk_values);
694 }
695 let values = batch.pk_values().unwrap();
697
698 let mut buffer = Vec::with_capacity(
699 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
700 );
701 match values {
702 CompositeValues::Dense(values) => {
703 self.new
704 .encode_values(values.as_slice(), &mut buffer)
705 .context(EncodeSnafu)?;
706 }
707 CompositeValues::Sparse(values) => {
708 let values = self
709 .fields
710 .iter()
711 .map(|id| {
712 let value = values.get_or_null(*id);
713 (*id, value.as_value_ref())
714 })
715 .collect::<Vec<_>>();
716 self.new
717 .encode_value_refs(&values, &mut buffer)
718 .context(EncodeSnafu)?;
719 }
720 }
721 batch.set_primary_key(buffer);
722
723 Ok(batch)
724 }
725}
726
727struct FlatRewritePrimaryKey {
729 codec: Arc<dyn PrimaryKeyCodec>,
731 metadata: RegionMetadataRef,
733 old_codec: Arc<dyn PrimaryKeyCodec>,
736}
737
738impl FlatRewritePrimaryKey {
739 fn new(
740 expect: &RegionMetadataRef,
741 actual: &RegionMetadataRef,
742 ) -> Option<FlatRewritePrimaryKey> {
743 if expect.primary_key_encoding == actual.primary_key_encoding {
744 return None;
745 }
746 let codec = build_primary_key_codec(expect);
747 let old_codec = build_primary_key_codec(actual);
748
749 Some(FlatRewritePrimaryKey {
750 codec,
751 metadata: expect.clone(),
752 old_codec,
753 })
754 }
755
756 fn rewrite_key(
759 &self,
760 append_values: &[(ColumnId, Value)],
761 batch: RecordBatch,
762 ) -> Result<RecordBatch> {
763 let old_pk_dict_array = batch
764 .column(primary_key_column_index(batch.num_columns()))
765 .as_any()
766 .downcast_ref::<PrimaryKeyArray>()
767 .unwrap();
768 let old_pk_values_array = old_pk_dict_array
769 .values()
770 .as_any()
771 .downcast_ref::<BinaryArray>()
772 .unwrap();
773 let mut builder = BinaryBuilder::with_capacity(
774 old_pk_values_array.len(),
775 old_pk_values_array.value_data().len(),
776 );
777
778 let mut buffer = Vec::with_capacity(
780 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
781 );
782 let mut column_id_values = Vec::new();
783 for value in old_pk_values_array.iter() {
785 let Some(old_pk) = value else {
786 builder.append_null();
787 continue;
788 };
789 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
791 pk_values.extend(append_values);
792
793 buffer.clear();
794 column_id_values.clear();
795 match pk_values {
797 CompositeValues::Dense(dense_values) => {
798 self.codec
799 .encode_values(dense_values.as_slice(), &mut buffer)
800 .context(EncodeSnafu)?;
801 }
802 CompositeValues::Sparse(sparse_values) => {
803 for id in &self.metadata.primary_key {
804 let value = sparse_values.get_or_null(*id);
805 column_id_values.push((*id, value.clone()));
806 }
807 self.codec
808 .encode_values(&column_id_values, &mut buffer)
809 .context(EncodeSnafu)?;
810 }
811 }
812 builder.append_value(&buffer);
813 }
814 let new_pk_values_array = Arc::new(builder.finish());
815 let new_pk_dict_array =
816 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
817
818 let mut columns = batch.columns().to_vec();
819 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
820
821 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
822 }
823}
824
825#[derive(Default)]
827struct FlatCompatPrimaryKey {
828 rewriter: Option<FlatRewritePrimaryKey>,
830 converter: Option<Arc<dyn PrimaryKeyCodec>>,
832 values: Vec<(ColumnId, Value)>,
834}
835
836impl FlatCompatPrimaryKey {
837 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
838 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
839
840 if is_primary_key_same(expect, actual)? {
841 return Ok(Self {
842 rewriter,
843 converter: None,
844 values: Vec::new(),
845 });
846 }
847
848 let to_add = &expect.primary_key[actual.primary_key.len()..];
850 let mut values = Vec::with_capacity(to_add.len());
851 let mut fields = Vec::with_capacity(to_add.len());
852 for column_id in to_add {
853 let column = expect.column_by_id(*column_id).unwrap();
855 fields.push((
856 *column_id,
857 SortField::new(column.column_schema.data_type.clone()),
858 ));
859 let default_value = column
860 .column_schema
861 .create_default()
862 .context(CreateDefaultSnafu {
863 region_id: expect.region_id,
864 column: &column.column_schema.name,
865 })?
866 .with_context(|| CompatReaderSnafu {
867 region_id: expect.region_id,
868 reason: format!(
869 "key column {} does not have a default value to read",
870 column.column_schema.name
871 ),
872 })?;
873 values.push((*column_id, default_value));
874 }
875 debug_assert!(!fields.is_empty());
877
878 let converter = Some(build_primary_key_codec_with_fields(
880 expect.primary_key_encoding,
881 fields.into_iter(),
882 ));
883
884 Ok(Self {
885 rewriter,
886 converter,
887 values,
888 })
889 }
890
891 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
895 if let Some(rewriter) = &self.rewriter {
896 return rewriter.rewrite_key(&self.values, batch);
898 }
899
900 self.append_key(batch)
901 }
902
903 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
905 let Some(converter) = &self.converter else {
906 return Ok(batch);
907 };
908
909 let old_pk_dict_array = batch
910 .column(primary_key_column_index(batch.num_columns()))
911 .as_any()
912 .downcast_ref::<PrimaryKeyArray>()
913 .unwrap();
914 let old_pk_values_array = old_pk_dict_array
915 .values()
916 .as_any()
917 .downcast_ref::<BinaryArray>()
918 .unwrap();
919 let mut builder = BinaryBuilder::with_capacity(
920 old_pk_values_array.len(),
921 old_pk_values_array.value_data().len()
922 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
923 );
924
925 let mut buffer = Vec::with_capacity(
927 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
928 + converter.estimated_size().unwrap_or_default(),
929 );
930
931 for value in old_pk_values_array.iter() {
933 let Some(old_pk) = value else {
934 builder.append_null();
935 continue;
936 };
937
938 buffer.clear();
939 buffer.extend_from_slice(old_pk);
940 converter
941 .encode_values(&self.values, &mut buffer)
942 .context(EncodeSnafu)?;
943
944 builder.append_value(&buffer);
945 }
946
947 let new_pk_values_array = Arc::new(builder.finish());
948 let new_pk_dict_array =
949 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
950
951 let mut columns = batch.columns().to_vec();
953 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
954
955 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
956 }
957}
958
959#[cfg(test)]
960mod tests {
961 use std::sync::Arc;
962
963 use api::v1::{OpType, SemanticType};
964 use datatypes::arrow::array::{
965 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
966 TimestampMillisecondArray, UInt8Array, UInt64Array,
967 };
968 use datatypes::arrow::datatypes::UInt32Type;
969 use datatypes::arrow::record_batch::RecordBatch;
970 use datatypes::prelude::ConcreteDataType;
971 use datatypes::schema::ColumnSchema;
972 use datatypes::value::ValueRef;
973 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
974 use mito_codec::row_converter::{
975 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
976 };
977 use store_api::codec::PrimaryKeyEncoding;
978 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
979 use store_api::storage::RegionId;
980
981 use super::*;
982 use crate::read::flat_projection::FlatProjectionMapper;
983 use crate::sst::parquet::flat_format::FlatReadFormat;
984 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
985 use crate::test_util::{VecBatchReader, check_reader_result};
986
987 fn new_metadata(
989 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
990 primary_key: &[ColumnId],
991 ) -> RegionMetadata {
992 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
993 for (id, semantic_type, data_type) in semantic_types {
994 let column_schema = match semantic_type {
995 SemanticType::Tag => {
996 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
997 }
998 SemanticType::Field => {
999 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1000 }
1001 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1002 };
1003
1004 builder.push_column_metadata(ColumnMetadata {
1005 column_schema,
1006 semantic_type: *semantic_type,
1007 column_id: *id,
1008 });
1009 }
1010 builder.primary_key(primary_key.to_vec());
1011 builder.build().unwrap()
1012 }
1013
1014 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1016 let fields = (0..keys.len())
1017 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1018 .collect();
1019 let converter = DensePrimaryKeyCodec::with_fields(fields);
1020 let row = keys.iter().map(|str_opt| match str_opt {
1021 Some(v) => ValueRef::String(v),
1022 None => ValueRef::Null,
1023 });
1024
1025 converter.encode(row).unwrap()
1026 }
1027
1028 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1030 let fields = (0..keys.len())
1031 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1032 .collect();
1033 let converter = SparsePrimaryKeyCodec::with_fields(fields);
1034 let row = keys
1035 .iter()
1036 .map(|(id, str_opt)| match str_opt {
1037 Some(v) => (*id, ValueRef::String(v)),
1038 None => (*id, ValueRef::Null),
1039 })
1040 .collect::<Vec<_>>();
1041 let mut buffer = vec![];
1042 converter.encode_value_refs(&row, &mut buffer).unwrap();
1043 buffer
1044 }
1045
1046 fn new_batch(
1050 primary_key: &[u8],
1051 fields: &[(ColumnId, bool)],
1052 start_ts: i64,
1053 num_rows: usize,
1054 ) -> Batch {
1055 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1056 start_ts..start_ts + num_rows as i64,
1057 ));
1058 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1059 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1060 let field_columns = fields
1061 .iter()
1062 .map(|(id, is_null)| {
1063 let data = if *is_null {
1064 Arc::new(Int64Vector::from(vec![None; num_rows]))
1065 } else {
1066 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1067 };
1068 BatchColumn {
1069 column_id: *id,
1070 data,
1071 }
1072 })
1073 .collect();
1074 Batch::new(
1075 primary_key.to_vec(),
1076 timestamps,
1077 sequences,
1078 op_types,
1079 field_columns,
1080 )
1081 .unwrap()
1082 }
1083
1084 #[test]
1085 fn test_invalid_pk_len() {
1086 let reader_meta = new_metadata(
1087 &[
1088 (
1089 0,
1090 SemanticType::Timestamp,
1091 ConcreteDataType::timestamp_millisecond_datatype(),
1092 ),
1093 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1094 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1096 ],
1097 &[1, 2],
1098 );
1099 let expect_meta = new_metadata(
1100 &[
1101 (
1102 0,
1103 SemanticType::Timestamp,
1104 ConcreteDataType::timestamp_millisecond_datatype(),
1105 ),
1106 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1107 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1108 ],
1109 &[1],
1110 );
1111 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1112 }
1113
1114 #[test]
1115 fn test_different_pk() {
1116 let reader_meta = new_metadata(
1117 &[
1118 (
1119 0,
1120 SemanticType::Timestamp,
1121 ConcreteDataType::timestamp_millisecond_datatype(),
1122 ),
1123 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1124 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1125 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1126 ],
1127 &[2, 1],
1128 );
1129 let expect_meta = new_metadata(
1130 &[
1131 (
1132 0,
1133 SemanticType::Timestamp,
1134 ConcreteDataType::timestamp_millisecond_datatype(),
1135 ),
1136 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1137 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1138 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1139 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1140 ],
1141 &[1, 2, 4],
1142 );
1143 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1144 }
1145
1146 #[test]
1147 fn test_same_pk() {
1148 let reader_meta = new_metadata(
1149 &[
1150 (
1151 0,
1152 SemanticType::Timestamp,
1153 ConcreteDataType::timestamp_millisecond_datatype(),
1154 ),
1155 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1156 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1157 ],
1158 &[1],
1159 );
1160 assert!(
1161 may_compat_primary_key(&reader_meta, &reader_meta)
1162 .unwrap()
1163 .is_none()
1164 );
1165 }
1166
1167 #[test]
1168 fn test_same_pk_encoding() {
1169 let reader_meta = Arc::new(new_metadata(
1170 &[
1171 (
1172 0,
1173 SemanticType::Timestamp,
1174 ConcreteDataType::timestamp_millisecond_datatype(),
1175 ),
1176 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1177 ],
1178 &[1],
1179 ));
1180
1181 assert!(
1182 may_compat_primary_key(&reader_meta, &reader_meta)
1183 .unwrap()
1184 .is_none()
1185 );
1186 }
1187
1188 #[test]
1189 fn test_same_fields() {
1190 let reader_meta = Arc::new(new_metadata(
1191 &[
1192 (
1193 0,
1194 SemanticType::Timestamp,
1195 ConcreteDataType::timestamp_millisecond_datatype(),
1196 ),
1197 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1198 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1199 ],
1200 &[1],
1201 ));
1202 let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1203 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1204 }
1205
1206 #[tokio::test]
1207 async fn test_compat_reader() {
1208 let reader_meta = Arc::new(new_metadata(
1209 &[
1210 (
1211 0,
1212 SemanticType::Timestamp,
1213 ConcreteDataType::timestamp_millisecond_datatype(),
1214 ),
1215 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1216 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1217 ],
1218 &[1],
1219 ));
1220 let expect_meta = Arc::new(new_metadata(
1221 &[
1222 (
1223 0,
1224 SemanticType::Timestamp,
1225 ConcreteDataType::timestamp_millisecond_datatype(),
1226 ),
1227 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1228 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1229 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1230 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1231 ],
1232 &[1, 3],
1233 ));
1234 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1235 let k1 = encode_key(&[Some("a")]);
1236 let k2 = encode_key(&[Some("b")]);
1237 let source_reader = VecBatchReader::new(&[
1238 new_batch(&k1, &[(2, false)], 1000, 3),
1239 new_batch(&k2, &[(2, false)], 1000, 3),
1240 ]);
1241
1242 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1243 let k1 = encode_key(&[Some("a"), None]);
1244 let k2 = encode_key(&[Some("b"), None]);
1245 check_reader_result(
1246 &mut compat_reader,
1247 &[
1248 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1249 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1250 ],
1251 )
1252 .await;
1253 }
1254
1255 #[tokio::test]
1256 async fn test_compat_reader_different_order() {
1257 let reader_meta = Arc::new(new_metadata(
1258 &[
1259 (
1260 0,
1261 SemanticType::Timestamp,
1262 ConcreteDataType::timestamp_millisecond_datatype(),
1263 ),
1264 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1265 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1266 ],
1267 &[1],
1268 ));
1269 let expect_meta = Arc::new(new_metadata(
1270 &[
1271 (
1272 0,
1273 SemanticType::Timestamp,
1274 ConcreteDataType::timestamp_millisecond_datatype(),
1275 ),
1276 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1277 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1278 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1279 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1280 ],
1281 &[1],
1282 ));
1283 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1284 let k1 = encode_key(&[Some("a")]);
1285 let k2 = encode_key(&[Some("b")]);
1286 let source_reader = VecBatchReader::new(&[
1287 new_batch(&k1, &[(2, false)], 1000, 3),
1288 new_batch(&k2, &[(2, false)], 1000, 3),
1289 ]);
1290
1291 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1292 check_reader_result(
1293 &mut compat_reader,
1294 &[
1295 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1296 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1297 ],
1298 )
1299 .await;
1300 }
1301
1302 #[tokio::test]
1303 async fn test_compat_reader_different_types() {
1304 let actual_meta = Arc::new(new_metadata(
1305 &[
1306 (
1307 0,
1308 SemanticType::Timestamp,
1309 ConcreteDataType::timestamp_millisecond_datatype(),
1310 ),
1311 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1312 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1313 ],
1314 &[1],
1315 ));
1316 let expect_meta = Arc::new(new_metadata(
1317 &[
1318 (
1319 0,
1320 SemanticType::Timestamp,
1321 ConcreteDataType::timestamp_millisecond_datatype(),
1322 ),
1323 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1324 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1325 ],
1326 &[1],
1327 ));
1328 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1329 let k1 = encode_key(&[Some("a")]);
1330 let k2 = encode_key(&[Some("b")]);
1331 let source_reader = VecBatchReader::new(&[
1332 new_batch(&k1, &[(2, false)], 1000, 3),
1333 new_batch(&k2, &[(2, false)], 1000, 3),
1334 ]);
1335
1336 let fn_batch_cast = |batch: Batch| {
1337 let mut new_fields = batch.fields.clone();
1338 new_fields[0].data = new_fields[0]
1339 .data
1340 .cast(&ConcreteDataType::string_datatype())
1341 .unwrap();
1342
1343 batch.with_fields(new_fields).unwrap()
1344 };
1345 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1346 check_reader_result(
1347 &mut compat_reader,
1348 &[
1349 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1350 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1351 ],
1352 )
1353 .await;
1354 }
1355
1356 #[tokio::test]
1357 async fn test_compat_reader_projection() {
1358 let reader_meta = Arc::new(new_metadata(
1359 &[
1360 (
1361 0,
1362 SemanticType::Timestamp,
1363 ConcreteDataType::timestamp_millisecond_datatype(),
1364 ),
1365 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1366 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1367 ],
1368 &[1],
1369 ));
1370 let expect_meta = Arc::new(new_metadata(
1371 &[
1372 (
1373 0,
1374 SemanticType::Timestamp,
1375 ConcreteDataType::timestamp_millisecond_datatype(),
1376 ),
1377 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1378 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1379 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1381 ],
1382 &[1],
1383 ));
1384 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1386 let k1 = encode_key(&[Some("a")]);
1387 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1388
1389 let mut compat_reader =
1390 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1391 check_reader_result(
1392 &mut compat_reader,
1393 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1394 )
1395 .await;
1396
1397 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1399 let k1 = encode_key(&[Some("a")]);
1400 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1401
1402 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1403 check_reader_result(
1404 &mut compat_reader,
1405 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1406 )
1407 .await;
1408 }
1409
1410 #[tokio::test]
1411 async fn test_compat_reader_different_pk_encoding() {
1412 let mut reader_meta = new_metadata(
1413 &[
1414 (
1415 0,
1416 SemanticType::Timestamp,
1417 ConcreteDataType::timestamp_millisecond_datatype(),
1418 ),
1419 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1420 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1421 ],
1422 &[1],
1423 );
1424 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1425 let reader_meta = Arc::new(reader_meta);
1426 let mut expect_meta = new_metadata(
1427 &[
1428 (
1429 0,
1430 SemanticType::Timestamp,
1431 ConcreteDataType::timestamp_millisecond_datatype(),
1432 ),
1433 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1434 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1435 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1436 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1437 ],
1438 &[1, 3],
1439 );
1440 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1441 let expect_meta = Arc::new(expect_meta);
1442
1443 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1444 let k1 = encode_key(&[Some("a")]);
1445 let k2 = encode_key(&[Some("b")]);
1446 let source_reader = VecBatchReader::new(&[
1447 new_batch(&k1, &[(2, false)], 1000, 3),
1448 new_batch(&k2, &[(2, false)], 1000, 3),
1449 ]);
1450
1451 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1452 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1453 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1454 check_reader_result(
1455 &mut compat_reader,
1456 &[
1457 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1458 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1459 ],
1460 )
1461 .await;
1462 }
1463
1464 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1466 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1467 for &pk in primary_keys {
1468 builder.append(pk).unwrap();
1469 }
1470 Arc::new(builder.finish())
1471 }
1472
1473 #[test]
1474 fn test_flat_compat_batch_with_missing_columns() {
1475 let actual_metadata = Arc::new(new_metadata(
1476 &[
1477 (
1478 0,
1479 SemanticType::Timestamp,
1480 ConcreteDataType::timestamp_millisecond_datatype(),
1481 ),
1482 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1483 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1484 ],
1485 &[1],
1486 ));
1487
1488 let expected_metadata = Arc::new(new_metadata(
1489 &[
1490 (
1491 0,
1492 SemanticType::Timestamp,
1493 ConcreteDataType::timestamp_millisecond_datatype(),
1494 ),
1495 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1496 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1497 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1499 ],
1500 &[1],
1501 ));
1502
1503 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1504 let read_format = FlatReadFormat::new(
1505 actual_metadata.clone(),
1506 [0, 1, 2, 3].into_iter(),
1507 None,
1508 "test",
1509 false,
1510 )
1511 .unwrap();
1512 let format_projection = read_format.format_projection();
1513
1514 let compat_batch =
1515 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1516 .unwrap()
1517 .unwrap();
1518
1519 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1520 tag_builder.append_value("tag1");
1521 tag_builder.append_value("tag1");
1522 let tag_dict_array = Arc::new(tag_builder.finish());
1523
1524 let k1 = encode_key(&[Some("tag1")]);
1525 let input_columns: Vec<ArrayRef> = vec![
1526 tag_dict_array.clone(),
1527 Arc::new(Int64Array::from(vec![100, 200])),
1528 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1529 build_flat_test_pk_array(&[&k1, &k1]),
1530 Arc::new(UInt64Array::from_iter_values([1, 2])),
1531 Arc::new(UInt8Array::from_iter_values([
1532 OpType::Put as u8,
1533 OpType::Put as u8,
1534 ])),
1535 ];
1536 let input_schema =
1537 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1538 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1539
1540 let result = compat_batch.compat(input_batch).unwrap();
1541
1542 let expected_schema =
1543 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1544
1545 let expected_columns: Vec<ArrayRef> = vec![
1546 tag_dict_array.clone(),
1547 Arc::new(Int64Array::from(vec![100, 200])),
1548 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1549 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1550 build_flat_test_pk_array(&[&k1, &k1]),
1551 Arc::new(UInt64Array::from_iter_values([1, 2])),
1552 Arc::new(UInt8Array::from_iter_values([
1553 OpType::Put as u8,
1554 OpType::Put as u8,
1555 ])),
1556 ];
1557 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1558
1559 assert_eq!(expected_batch, result);
1560 }
1561
1562 #[test]
1563 fn test_flat_compat_batch_with_different_pk_encoding() {
1564 let mut actual_metadata = new_metadata(
1565 &[
1566 (
1567 0,
1568 SemanticType::Timestamp,
1569 ConcreteDataType::timestamp_millisecond_datatype(),
1570 ),
1571 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1572 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1573 ],
1574 &[1],
1575 );
1576 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1577 let actual_metadata = Arc::new(actual_metadata);
1578
1579 let mut expected_metadata = new_metadata(
1580 &[
1581 (
1582 0,
1583 SemanticType::Timestamp,
1584 ConcreteDataType::timestamp_millisecond_datatype(),
1585 ),
1586 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1587 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1588 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1589 ],
1590 &[1, 3],
1591 );
1592 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1593 let expected_metadata = Arc::new(expected_metadata);
1594
1595 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1596 let read_format = FlatReadFormat::new(
1597 actual_metadata.clone(),
1598 [0, 1, 2, 3].into_iter(),
1599 None,
1600 "test",
1601 false,
1602 )
1603 .unwrap();
1604 let format_projection = read_format.format_projection();
1605
1606 let compat_batch =
1607 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1608 .unwrap()
1609 .unwrap();
1610
1611 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1613 tag1_builder.append_value("tag1");
1614 tag1_builder.append_value("tag1");
1615 let tag1_dict_array = Arc::new(tag1_builder.finish());
1616
1617 let k1 = encode_key(&[Some("tag1")]);
1618 let input_columns: Vec<ArrayRef> = vec![
1619 tag1_dict_array.clone(),
1620 Arc::new(Int64Array::from(vec![100, 200])),
1621 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1622 build_flat_test_pk_array(&[&k1, &k1]),
1623 Arc::new(UInt64Array::from_iter_values([1, 2])),
1624 Arc::new(UInt8Array::from_iter_values([
1625 OpType::Put as u8,
1626 OpType::Put as u8,
1627 ])),
1628 ];
1629 let input_schema =
1630 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1631 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1632
1633 let result = compat_batch.compat(input_batch).unwrap();
1634
1635 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1636 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1637 null_tag_builder.append_nulls(2);
1638 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1639 let expected_columns: Vec<ArrayRef> = vec![
1640 tag1_dict_array.clone(),
1641 null_tag_dict_array,
1642 Arc::new(Int64Array::from(vec![100, 200])),
1643 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1644 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1645 Arc::new(UInt64Array::from_iter_values([1, 2])),
1646 Arc::new(UInt8Array::from_iter_values([
1647 OpType::Put as u8,
1648 OpType::Put as u8,
1649 ])),
1650 ];
1651 let output_schema =
1652 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1653 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1654
1655 assert_eq!(expected_batch, result);
1656 }
1657
1658 #[test]
1659 fn test_flat_compat_batch_compact_sparse() {
1660 let mut actual_metadata = new_metadata(
1661 &[
1662 (
1663 0,
1664 SemanticType::Timestamp,
1665 ConcreteDataType::timestamp_millisecond_datatype(),
1666 ),
1667 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1668 ],
1669 &[],
1670 );
1671 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1672 let actual_metadata = Arc::new(actual_metadata);
1673
1674 let mut expected_metadata = new_metadata(
1675 &[
1676 (
1677 0,
1678 SemanticType::Timestamp,
1679 ConcreteDataType::timestamp_millisecond_datatype(),
1680 ),
1681 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1682 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1683 ],
1684 &[],
1685 );
1686 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1687 let expected_metadata = Arc::new(expected_metadata);
1688
1689 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1690 let read_format = FlatReadFormat::new(
1691 actual_metadata.clone(),
1692 [0, 2, 3].into_iter(),
1693 None,
1694 "test",
1695 true,
1696 )
1697 .unwrap();
1698 let format_projection = read_format.format_projection();
1699
1700 let compat_batch =
1701 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1702 .unwrap()
1703 .unwrap();
1704
1705 let sparse_k1 = encode_sparse_key(&[]);
1706 let input_columns: Vec<ArrayRef> = vec![
1707 Arc::new(Int64Array::from(vec![100, 200])),
1708 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1709 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1710 Arc::new(UInt64Array::from_iter_values([1, 2])),
1711 Arc::new(UInt8Array::from_iter_values([
1712 OpType::Put as u8,
1713 OpType::Put as u8,
1714 ])),
1715 ];
1716 let input_schema =
1717 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1718 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1719
1720 let result = compat_batch.compat(input_batch).unwrap();
1721
1722 let expected_columns: Vec<ArrayRef> = vec![
1723 Arc::new(Int64Array::from(vec![100, 200])),
1724 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1725 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1726 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1727 Arc::new(UInt64Array::from_iter_values([1, 2])),
1728 Arc::new(UInt8Array::from_iter_values([
1729 OpType::Put as u8,
1730 OpType::Put as u8,
1731 ])),
1732 ];
1733 let output_schema =
1734 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1735 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1736
1737 assert_eq!(expected_batch, result);
1738 }
1739}