1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
33 build_primary_key_codec_with_fields,
34};
35use snafu::{OptionExt, ResultExt, ensure};
36use store_api::codec::PrimaryKeyEncoding;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::storage::ColumnId;
39
40use crate::error::{
41 CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
42 NewRecordBatchSnafu, Result, UnexpectedSnafu, UnsupportedOperationSnafu,
43};
44use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
45use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
46use crate::read::{Batch, BatchColumn, BatchReader};
47use crate::sst::parquet::flat_format::primary_key_column_index;
48use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
50
51pub struct CompatReader<R> {
53 reader: R,
55 compat: PrimaryKeyCompatBatch,
57}
58
59impl<R> CompatReader<R> {
60 pub fn new(
65 mapper: &ProjectionMapper,
66 reader_meta: RegionMetadataRef,
67 reader: R,
68 ) -> Result<CompatReader<R>> {
69 Ok(CompatReader {
70 reader,
71 compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
72 })
73 }
74}
75
76#[async_trait::async_trait]
77impl<R: BatchReader> BatchReader for CompatReader<R> {
78 async fn next_batch(&mut self) -> Result<Option<Batch>> {
79 let Some(mut batch) = self.reader.next_batch().await? else {
80 return Ok(None);
81 };
82
83 batch = self.compat.compat_batch(batch)?;
84
85 Ok(Some(batch))
86 }
87}
88
89pub(crate) enum CompatBatch {
91 PrimaryKey(PrimaryKeyCompatBatch),
93 Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100 match self {
101 CompatBatch::PrimaryKey(batch) => Some(batch),
102 _ => None,
103 }
104 }
105
106 pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
108 match self {
109 CompatBatch::Flat(batch) => Some(batch),
110 _ => None,
111 }
112 }
113}
114
115pub(crate) struct PrimaryKeyCompatBatch {
117 rewrite_pk: Option<RewritePrimaryKey>,
119 compat_pk: Option<CompatPrimaryKey>,
121 compat_fields: Option<CompatFields>,
123}
124
125impl PrimaryKeyCompatBatch {
126 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
130 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
131 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
132 let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
133 reason: "Unexpected format",
134 })?;
135 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
136
137 Ok(Self {
138 rewrite_pk,
139 compat_pk,
140 compat_fields,
141 })
142 }
143
144 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
146 if let Some(rewrite_pk) = &self.rewrite_pk {
147 batch = rewrite_pk.compat(batch)?;
148 }
149 if let Some(compat_pk) = &self.compat_pk {
150 batch = compat_pk.compat(batch)?;
151 }
152 if let Some(compat_fields) = &self.compat_fields {
153 batch = compat_fields.compat(batch);
154 }
155
156 Ok(batch)
157 }
158}
159
160pub(crate) fn has_same_columns_and_pk_encoding(
162 left: &RegionMetadata,
163 right: &RegionMetadata,
164) -> bool {
165 if left.primary_key_encoding != right.primary_key_encoding {
166 return false;
167 }
168
169 if left.column_metadatas.len() != right.column_metadatas.len() {
170 return false;
171 }
172
173 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
174 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
175 return false;
176 }
177 debug_assert_eq!(
178 left_col.column_schema.data_type,
179 right_col.column_schema.data_type
180 );
181 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
182 }
183
184 true
185}
186
187pub(crate) struct FlatCompatBatch {
189 index_or_defaults: Vec<IndexOrDefault>,
191 arrow_schema: SchemaRef,
193 compat_pk: FlatCompatPrimaryKey,
195}
196
197impl FlatCompatBatch {
198 pub(crate) fn try_new(
205 mapper: &FlatProjectionMapper,
206 actual: &RegionMetadataRef,
207 format_projection: &FormatProjection,
208 compaction: bool,
209 ) -> Result<Option<Self>> {
210 let actual_schema = flat_projected_columns(actual, format_projection);
211 let expect_schema = mapper.batch_schema();
212 if expect_schema == actual_schema {
213 return Ok(None);
216 }
217
218 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
219 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
221 }
222
223 let (index_or_defaults, fields) =
224 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
225
226 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
227
228 Ok(Some(Self {
229 index_or_defaults,
230 arrow_schema: Arc::new(Schema::new(fields)),
231 compat_pk,
232 }))
233 }
234
235 fn compute_index_and_fields(
236 actual_schema: &[(ColumnId, ConcreteDataType)],
237 expect_schema: &[(ColumnId, ConcreteDataType)],
238 expect_metadata: &RegionMetadata,
239 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
240 let actual_schema_index: HashMap<_, _> = actual_schema
242 .iter()
243 .enumerate()
244 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
245 .collect();
246
247 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
248 let mut fields = Vec::with_capacity(expect_schema.len());
249 for (column_id, expect_data_type) in expect_schema {
250 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
252 let expect_column = &expect_metadata.column_metadatas[column_index];
253 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
254 if expect_column.semantic_type == SemanticType::Tag {
256 fields.push(tag_maybe_to_dictionary_field(
257 &expect_column.column_schema.data_type,
258 column_field,
259 ));
260 } else {
261 fields.push(column_field.clone());
262 };
263
264 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
265 let mut cast_type = None;
266
267 if expect_data_type != *actual_data_type {
269 cast_type = Some(expect_data_type.clone())
270 }
271 index_or_defaults.push(IndexOrDefault::Index {
273 pos: *index,
274 cast_type,
275 });
276 } else {
277 let default_vector = expect_column
279 .column_schema
280 .create_default_vector(1)
281 .context(CreateDefaultSnafu {
282 region_id: expect_metadata.region_id,
283 column: &expect_column.column_schema.name,
284 })?
285 .with_context(|| CompatReaderSnafu {
286 region_id: expect_metadata.region_id,
287 reason: format!(
288 "column {} does not have a default value to read",
289 expect_column.column_schema.name
290 ),
291 })?;
292 index_or_defaults.push(IndexOrDefault::DefaultValue {
293 column_id: expect_column.column_id,
294 default_vector,
295 semantic_type: expect_column.semantic_type,
296 });
297 };
298 }
299 fields.extend_from_slice(&internal_fields());
300
301 Ok((index_or_defaults, fields))
302 }
303
304 fn try_new_compact_sparse(
305 mapper: &FlatProjectionMapper,
306 actual: &RegionMetadataRef,
307 ) -> Result<Option<Self>> {
308 ensure!(
311 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
312 UnsupportedOperationSnafu {
313 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
314 }
315 );
316
317 let actual_schema: Vec<_> = actual
320 .field_columns()
321 .chain([actual.time_index_column()])
322 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
323 .collect();
324 let expect_schema: Vec<_> = mapper
325 .metadata()
326 .field_columns()
327 .chain([mapper.metadata().time_index_column()])
328 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
329 .collect();
330
331 let (index_or_defaults, fields) =
332 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
333
334 let compat_pk = FlatCompatPrimaryKey::default();
335
336 Ok(Some(Self {
337 index_or_defaults,
338 arrow_schema: Arc::new(Schema::new(fields)),
339 compat_pk,
340 }))
341 }
342
343 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
345 let len = batch.num_rows();
346 let columns = self
347 .index_or_defaults
348 .iter()
349 .map(|index_or_default| match index_or_default {
350 IndexOrDefault::Index { pos, cast_type } => {
351 let old_column = batch.column(*pos);
352
353 if let Some(ty) = cast_type {
354 let casted =
357 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
358 .context(ComputeArrowSnafu)?;
359 Ok(casted)
360 } else {
361 Ok(old_column.clone())
362 }
363 }
364 IndexOrDefault::DefaultValue {
365 column_id: _,
366 default_vector,
367 semantic_type,
368 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
369 })
370 .chain(
371 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
373 .iter()
374 .map(|col| Ok(col.clone())),
375 )
376 .collect::<Result<Vec<_>>>()?;
377
378 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
379 .context(NewRecordBatchSnafu)?;
380
381 self.compat_pk.compat(compat_batch)
383 }
384}
385
386fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
388 assert_eq!(1, vector.len());
389 let data_type = vector.data_type();
390 if is_tag && data_type.is_string() {
391 let values = vector.to_arrow_array();
392 if values.is_null(0) {
393 let keys = UInt32Array::new_null(to_len);
395 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
396 } else {
397 let keys = UInt32Array::from_value(0, to_len);
398 Ok(Arc::new(DictionaryArray::new(keys, values)))
399 }
400 } else {
401 let keys = UInt32Array::from_value(0, to_len);
402 take(
403 &vector.to_arrow_array(),
404 &keys,
405 Some(TakeOptions {
406 check_bounds: false,
407 }),
408 )
409 .context(ComputeArrowSnafu)
410 }
411}
412
413#[derive(Debug)]
415struct CompatPrimaryKey {
416 converter: Arc<dyn PrimaryKeyCodec>,
418 values: Vec<(ColumnId, Value)>,
420}
421
422impl CompatPrimaryKey {
423 fn compat(&self, mut batch: Batch) -> Result<Batch> {
425 let mut buffer = Vec::with_capacity(
426 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
427 );
428 buffer.extend_from_slice(batch.primary_key());
429 self.converter
430 .encode_values(&self.values, &mut buffer)
431 .context(EncodeSnafu)?;
432
433 batch.set_primary_key(buffer);
434
435 if let Some(pk_values) = &mut batch.pk_values {
437 pk_values.extend(&self.values);
438 }
439
440 Ok(batch)
441 }
442}
443
444#[derive(Debug)]
446struct CompatFields {
447 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
449 index_or_defaults: Vec<IndexOrDefault>,
451}
452
453impl CompatFields {
454 #[must_use]
456 fn compat(&self, batch: Batch) -> Batch {
457 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
458 debug_assert!(
459 self.actual_fields
460 .iter()
461 .zip(batch.fields())
462 .all(|((id, _), batch_column)| *id == batch_column.column_id)
463 );
464
465 let len = batch.num_rows();
466 let fields = self
467 .index_or_defaults
468 .iter()
469 .map(|index_or_default| match index_or_default {
470 IndexOrDefault::Index { pos, cast_type } => {
471 let old_column = &batch.fields()[*pos];
472
473 let data = if let Some(ty) = cast_type {
474 old_column.data.cast(ty).unwrap()
477 } else {
478 old_column.data.clone()
479 };
480 BatchColumn {
481 column_id: old_column.column_id,
482 data,
483 }
484 }
485 IndexOrDefault::DefaultValue {
486 column_id,
487 default_vector,
488 semantic_type: _,
489 } => {
490 let data = default_vector.replicate(&[len]);
491 BatchColumn {
492 column_id: *column_id,
493 data,
494 }
495 }
496 })
497 .collect();
498
499 batch.with_fields(fields).unwrap()
501 }
502}
503
504fn may_rewrite_primary_key(
505 expect: &RegionMetadata,
506 actual: &RegionMetadata,
507) -> Option<RewritePrimaryKey> {
508 if expect.primary_key_encoding == actual.primary_key_encoding {
509 return None;
510 }
511
512 let fields = expect.primary_key.clone();
513 let original = build_primary_key_codec(actual);
514 let new = build_primary_key_codec(expect);
515
516 Some(RewritePrimaryKey {
517 original,
518 new,
519 fields,
520 })
521}
522
523fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
525 ensure!(
526 actual.primary_key.len() <= expect.primary_key.len(),
527 CompatReaderSnafu {
528 region_id: expect.region_id,
529 reason: format!(
530 "primary key has more columns {} than expect {}",
531 actual.primary_key.len(),
532 expect.primary_key.len()
533 ),
534 }
535 );
536 ensure!(
537 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
538 CompatReaderSnafu {
539 region_id: expect.region_id,
540 reason: format!(
541 "primary key has different prefix, expect: {:?}, actual: {:?}",
542 expect.primary_key, actual.primary_key
543 ),
544 }
545 );
546
547 Ok(actual.primary_key.len() == expect.primary_key.len())
548}
549
550fn may_compat_primary_key(
552 expect: &RegionMetadata,
553 actual: &RegionMetadata,
554) -> Result<Option<CompatPrimaryKey>> {
555 if is_primary_key_same(expect, actual)? {
556 return Ok(None);
557 }
558
559 let to_add = &expect.primary_key[actual.primary_key.len()..];
561 let mut fields = Vec::with_capacity(to_add.len());
562 let mut values = Vec::with_capacity(to_add.len());
563 for column_id in to_add {
564 let column = expect.column_by_id(*column_id).unwrap();
566 fields.push((
567 *column_id,
568 SortField::new(column.column_schema.data_type.clone()),
569 ));
570 let default_value = column
571 .column_schema
572 .create_default()
573 .context(CreateDefaultSnafu {
574 region_id: expect.region_id,
575 column: &column.column_schema.name,
576 })?
577 .with_context(|| CompatReaderSnafu {
578 region_id: expect.region_id,
579 reason: format!(
580 "key column {} does not have a default value to read",
581 column.column_schema.name
582 ),
583 })?;
584 values.push((*column_id, default_value));
585 }
586 let converter =
588 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
589
590 Ok(Some(CompatPrimaryKey { converter, values }))
591}
592
593fn may_compat_fields(
595 mapper: &PrimaryKeyProjectionMapper,
596 actual: &RegionMetadata,
597) -> Result<Option<CompatFields>> {
598 let expect_fields = mapper.batch_fields();
599 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
600 if expect_fields == actual_fields {
601 return Ok(None);
602 }
603
604 let source_field_index: HashMap<_, _> = actual_fields
605 .iter()
606 .enumerate()
607 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
608 .collect();
609
610 let index_or_defaults = expect_fields
611 .iter()
612 .map(|(column_id, expect_data_type)| {
613 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
614 let mut cast_type = None;
615
616 if expect_data_type != *actual_data_type {
617 cast_type = Some(expect_data_type.clone())
618 }
619 Ok(IndexOrDefault::Index {
621 pos: *index,
622 cast_type,
623 })
624 } else {
625 let column = mapper.metadata().column_by_id(*column_id).unwrap();
627 let default_vector = column
629 .column_schema
630 .create_default_vector(1)
631 .context(CreateDefaultSnafu {
632 region_id: mapper.metadata().region_id,
633 column: &column.column_schema.name,
634 })?
635 .with_context(|| CompatReaderSnafu {
636 region_id: mapper.metadata().region_id,
637 reason: format!(
638 "column {} does not have a default value to read",
639 column.column_schema.name
640 ),
641 })?;
642 Ok(IndexOrDefault::DefaultValue {
643 column_id: column.column_id,
644 default_vector,
645 semantic_type: SemanticType::Field,
646 })
647 }
648 })
649 .collect::<Result<Vec<_>>>()?;
650
651 Ok(Some(CompatFields {
652 actual_fields,
653 index_or_defaults,
654 }))
655}
656
657#[derive(Debug)]
659enum IndexOrDefault {
660 Index {
662 pos: usize,
663 cast_type: Option<ConcreteDataType>,
664 },
665 DefaultValue {
667 column_id: ColumnId,
669 default_vector: VectorRef,
671 semantic_type: SemanticType,
673 },
674}
675
676struct RewritePrimaryKey {
678 original: Arc<dyn PrimaryKeyCodec>,
680 new: Arc<dyn PrimaryKeyCodec>,
682 fields: Vec<ColumnId>,
684}
685
686impl RewritePrimaryKey {
687 fn compat(&self, mut batch: Batch) -> Result<Batch> {
689 if batch.pk_values().is_none() {
690 let new_pk_values = self
691 .original
692 .decode(batch.primary_key())
693 .context(DecodeSnafu)?;
694 batch.set_pk_values(new_pk_values);
695 }
696 let values = batch.pk_values().unwrap();
698
699 let mut buffer = Vec::with_capacity(
700 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
701 );
702 match values {
703 CompositeValues::Dense(values) => {
704 self.new
705 .encode_values(values.as_slice(), &mut buffer)
706 .context(EncodeSnafu)?;
707 }
708 CompositeValues::Sparse(values) => {
709 let values = self
710 .fields
711 .iter()
712 .map(|id| {
713 let value = values.get_or_null(*id);
714 (*id, value.as_value_ref())
715 })
716 .collect::<Vec<_>>();
717 self.new
718 .encode_value_refs(&values, &mut buffer)
719 .context(EncodeSnafu)?;
720 }
721 }
722 batch.set_primary_key(buffer);
723
724 Ok(batch)
725 }
726}
727
728struct FlatRewritePrimaryKey {
730 codec: Arc<dyn PrimaryKeyCodec>,
732 metadata: RegionMetadataRef,
734 old_codec: Arc<dyn PrimaryKeyCodec>,
737}
738
739impl FlatRewritePrimaryKey {
740 fn new(
741 expect: &RegionMetadataRef,
742 actual: &RegionMetadataRef,
743 ) -> Option<FlatRewritePrimaryKey> {
744 if expect.primary_key_encoding == actual.primary_key_encoding {
745 return None;
746 }
747 let codec = build_primary_key_codec(expect);
748 let old_codec = build_primary_key_codec(actual);
749
750 Some(FlatRewritePrimaryKey {
751 codec,
752 metadata: expect.clone(),
753 old_codec,
754 })
755 }
756
757 fn rewrite_key(
760 &self,
761 append_values: &[(ColumnId, Value)],
762 batch: RecordBatch,
763 ) -> Result<RecordBatch> {
764 let old_pk_dict_array = batch
765 .column(primary_key_column_index(batch.num_columns()))
766 .as_any()
767 .downcast_ref::<PrimaryKeyArray>()
768 .unwrap();
769 let old_pk_values_array = old_pk_dict_array
770 .values()
771 .as_any()
772 .downcast_ref::<BinaryArray>()
773 .unwrap();
774 let mut builder = BinaryBuilder::with_capacity(
775 old_pk_values_array.len(),
776 old_pk_values_array.value_data().len(),
777 );
778
779 let mut buffer = Vec::with_capacity(
781 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
782 );
783 let mut column_id_values = Vec::new();
784 for value in old_pk_values_array.iter() {
786 let Some(old_pk) = value else {
787 builder.append_null();
788 continue;
789 };
790 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
792 pk_values.extend(append_values);
793
794 buffer.clear();
795 column_id_values.clear();
796 match pk_values {
798 CompositeValues::Dense(dense_values) => {
799 self.codec
800 .encode_values(dense_values.as_slice(), &mut buffer)
801 .context(EncodeSnafu)?;
802 }
803 CompositeValues::Sparse(sparse_values) => {
804 for id in &self.metadata.primary_key {
805 let value = sparse_values.get_or_null(*id);
806 column_id_values.push((*id, value.clone()));
807 }
808 self.codec
809 .encode_values(&column_id_values, &mut buffer)
810 .context(EncodeSnafu)?;
811 }
812 }
813 builder.append_value(&buffer);
814 }
815 let new_pk_values_array = Arc::new(builder.finish());
816 let new_pk_dict_array =
817 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
818
819 let mut columns = batch.columns().to_vec();
820 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
821
822 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
823 }
824}
825
826#[derive(Default)]
828struct FlatCompatPrimaryKey {
829 rewriter: Option<FlatRewritePrimaryKey>,
831 converter: Option<Arc<dyn PrimaryKeyCodec>>,
833 values: Vec<(ColumnId, Value)>,
835}
836
837impl FlatCompatPrimaryKey {
838 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
839 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
840
841 if is_primary_key_same(expect, actual)? {
842 return Ok(Self {
843 rewriter,
844 converter: None,
845 values: Vec::new(),
846 });
847 }
848
849 let to_add = &expect.primary_key[actual.primary_key.len()..];
851 let mut values = Vec::with_capacity(to_add.len());
852 let mut fields = Vec::with_capacity(to_add.len());
853 for column_id in to_add {
854 let column = expect.column_by_id(*column_id).unwrap();
856 fields.push((
857 *column_id,
858 SortField::new(column.column_schema.data_type.clone()),
859 ));
860 let default_value = column
861 .column_schema
862 .create_default()
863 .context(CreateDefaultSnafu {
864 region_id: expect.region_id,
865 column: &column.column_schema.name,
866 })?
867 .with_context(|| CompatReaderSnafu {
868 region_id: expect.region_id,
869 reason: format!(
870 "key column {} does not have a default value to read",
871 column.column_schema.name
872 ),
873 })?;
874 values.push((*column_id, default_value));
875 }
876 debug_assert!(!fields.is_empty());
878
879 let converter = Some(build_primary_key_codec_with_fields(
881 expect.primary_key_encoding,
882 fields.into_iter(),
883 ));
884
885 Ok(Self {
886 rewriter,
887 converter,
888 values,
889 })
890 }
891
892 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
896 if let Some(rewriter) = &self.rewriter {
897 return rewriter.rewrite_key(&self.values, batch);
899 }
900
901 self.append_key(batch)
902 }
903
904 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
906 let Some(converter) = &self.converter else {
907 return Ok(batch);
908 };
909
910 let old_pk_dict_array = batch
911 .column(primary_key_column_index(batch.num_columns()))
912 .as_any()
913 .downcast_ref::<PrimaryKeyArray>()
914 .unwrap();
915 let old_pk_values_array = old_pk_dict_array
916 .values()
917 .as_any()
918 .downcast_ref::<BinaryArray>()
919 .unwrap();
920 let mut builder = BinaryBuilder::with_capacity(
921 old_pk_values_array.len(),
922 old_pk_values_array.value_data().len()
923 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
924 );
925
926 let mut buffer = Vec::with_capacity(
928 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
929 + converter.estimated_size().unwrap_or_default(),
930 );
931
932 for value in old_pk_values_array.iter() {
934 let Some(old_pk) = value else {
935 builder.append_null();
936 continue;
937 };
938
939 buffer.clear();
940 buffer.extend_from_slice(old_pk);
941 converter
942 .encode_values(&self.values, &mut buffer)
943 .context(EncodeSnafu)?;
944
945 builder.append_value(&buffer);
946 }
947
948 let new_pk_values_array = Arc::new(builder.finish());
949 let new_pk_dict_array =
950 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
951
952 let mut columns = batch.columns().to_vec();
954 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
955
956 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
957 }
958}
959
960#[cfg(test)]
961mod tests {
962 use std::sync::Arc;
963
964 use api::v1::{OpType, SemanticType};
965 use datatypes::arrow::array::{
966 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
967 TimestampMillisecondArray, UInt8Array, UInt64Array,
968 };
969 use datatypes::arrow::datatypes::UInt32Type;
970 use datatypes::arrow::record_batch::RecordBatch;
971 use datatypes::prelude::ConcreteDataType;
972 use datatypes::schema::ColumnSchema;
973 use datatypes::value::ValueRef;
974 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
975 use mito_codec::row_converter::{
976 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
977 };
978 use store_api::codec::PrimaryKeyEncoding;
979 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
980 use store_api::storage::RegionId;
981
982 use super::*;
983 use crate::read::flat_projection::FlatProjectionMapper;
984 use crate::sst::parquet::flat_format::FlatReadFormat;
985 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
986 use crate::test_util::{VecBatchReader, check_reader_result};
987
988 fn new_metadata(
990 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
991 primary_key: &[ColumnId],
992 ) -> RegionMetadata {
993 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
994 for (id, semantic_type, data_type) in semantic_types {
995 let column_schema = match semantic_type {
996 SemanticType::Tag => {
997 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
998 }
999 SemanticType::Field => {
1000 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1001 }
1002 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1003 };
1004
1005 builder.push_column_metadata(ColumnMetadata {
1006 column_schema,
1007 semantic_type: *semantic_type,
1008 column_id: *id,
1009 });
1010 }
1011 builder.primary_key(primary_key.to_vec());
1012 builder.build().unwrap()
1013 }
1014
1015 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1017 let fields = (0..keys.len())
1018 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1019 .collect();
1020 let converter = DensePrimaryKeyCodec::with_fields(fields);
1021 let row = keys.iter().map(|str_opt| match str_opt {
1022 Some(v) => ValueRef::String(v),
1023 None => ValueRef::Null,
1024 });
1025
1026 converter.encode(row).unwrap()
1027 }
1028
1029 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1031 let fields = (0..keys.len())
1032 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1033 .collect();
1034 let converter = SparsePrimaryKeyCodec::with_fields(fields);
1035 let row = keys
1036 .iter()
1037 .map(|(id, str_opt)| match str_opt {
1038 Some(v) => (*id, ValueRef::String(v)),
1039 None => (*id, ValueRef::Null),
1040 })
1041 .collect::<Vec<_>>();
1042 let mut buffer = vec![];
1043 converter.encode_value_refs(&row, &mut buffer).unwrap();
1044 buffer
1045 }
1046
1047 fn new_batch(
1051 primary_key: &[u8],
1052 fields: &[(ColumnId, bool)],
1053 start_ts: i64,
1054 num_rows: usize,
1055 ) -> Batch {
1056 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1057 start_ts..start_ts + num_rows as i64,
1058 ));
1059 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1060 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1061 let field_columns = fields
1062 .iter()
1063 .map(|(id, is_null)| {
1064 let data = if *is_null {
1065 Arc::new(Int64Vector::from(vec![None; num_rows]))
1066 } else {
1067 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1068 };
1069 BatchColumn {
1070 column_id: *id,
1071 data,
1072 }
1073 })
1074 .collect();
1075 Batch::new(
1076 primary_key.to_vec(),
1077 timestamps,
1078 sequences,
1079 op_types,
1080 field_columns,
1081 )
1082 .unwrap()
1083 }
1084
1085 #[test]
1086 fn test_invalid_pk_len() {
1087 let reader_meta = new_metadata(
1088 &[
1089 (
1090 0,
1091 SemanticType::Timestamp,
1092 ConcreteDataType::timestamp_millisecond_datatype(),
1093 ),
1094 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1096 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1097 ],
1098 &[1, 2],
1099 );
1100 let expect_meta = new_metadata(
1101 &[
1102 (
1103 0,
1104 SemanticType::Timestamp,
1105 ConcreteDataType::timestamp_millisecond_datatype(),
1106 ),
1107 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1108 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1109 ],
1110 &[1],
1111 );
1112 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1113 }
1114
1115 #[test]
1116 fn test_different_pk() {
1117 let reader_meta = new_metadata(
1118 &[
1119 (
1120 0,
1121 SemanticType::Timestamp,
1122 ConcreteDataType::timestamp_millisecond_datatype(),
1123 ),
1124 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1125 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1126 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1127 ],
1128 &[2, 1],
1129 );
1130 let expect_meta = new_metadata(
1131 &[
1132 (
1133 0,
1134 SemanticType::Timestamp,
1135 ConcreteDataType::timestamp_millisecond_datatype(),
1136 ),
1137 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1138 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1139 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1140 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1141 ],
1142 &[1, 2, 4],
1143 );
1144 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1145 }
1146
1147 #[test]
1148 fn test_same_pk() {
1149 let reader_meta = new_metadata(
1150 &[
1151 (
1152 0,
1153 SemanticType::Timestamp,
1154 ConcreteDataType::timestamp_millisecond_datatype(),
1155 ),
1156 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1157 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1158 ],
1159 &[1],
1160 );
1161 assert!(
1162 may_compat_primary_key(&reader_meta, &reader_meta)
1163 .unwrap()
1164 .is_none()
1165 );
1166 }
1167
1168 #[test]
1169 fn test_same_pk_encoding() {
1170 let reader_meta = Arc::new(new_metadata(
1171 &[
1172 (
1173 0,
1174 SemanticType::Timestamp,
1175 ConcreteDataType::timestamp_millisecond_datatype(),
1176 ),
1177 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1178 ],
1179 &[1],
1180 ));
1181
1182 assert!(
1183 may_compat_primary_key(&reader_meta, &reader_meta)
1184 .unwrap()
1185 .is_none()
1186 );
1187 }
1188
1189 #[test]
1190 fn test_same_fields() {
1191 let reader_meta = Arc::new(new_metadata(
1192 &[
1193 (
1194 0,
1195 SemanticType::Timestamp,
1196 ConcreteDataType::timestamp_millisecond_datatype(),
1197 ),
1198 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1199 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1200 ],
1201 &[1],
1202 ));
1203 let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1204 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1205 }
1206
1207 #[tokio::test]
1208 async fn test_compat_reader() {
1209 let reader_meta = Arc::new(new_metadata(
1210 &[
1211 (
1212 0,
1213 SemanticType::Timestamp,
1214 ConcreteDataType::timestamp_millisecond_datatype(),
1215 ),
1216 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1217 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1218 ],
1219 &[1],
1220 ));
1221 let expect_meta = Arc::new(new_metadata(
1222 &[
1223 (
1224 0,
1225 SemanticType::Timestamp,
1226 ConcreteDataType::timestamp_millisecond_datatype(),
1227 ),
1228 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1229 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1230 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1231 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1232 ],
1233 &[1, 3],
1234 ));
1235 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1236 let k1 = encode_key(&[Some("a")]);
1237 let k2 = encode_key(&[Some("b")]);
1238 let source_reader = VecBatchReader::new(&[
1239 new_batch(&k1, &[(2, false)], 1000, 3),
1240 new_batch(&k2, &[(2, false)], 1000, 3),
1241 ]);
1242
1243 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1244 let k1 = encode_key(&[Some("a"), None]);
1245 let k2 = encode_key(&[Some("b"), None]);
1246 check_reader_result(
1247 &mut compat_reader,
1248 &[
1249 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1250 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1251 ],
1252 )
1253 .await;
1254 }
1255
1256 #[tokio::test]
1257 async fn test_compat_reader_different_order() {
1258 let reader_meta = Arc::new(new_metadata(
1259 &[
1260 (
1261 0,
1262 SemanticType::Timestamp,
1263 ConcreteDataType::timestamp_millisecond_datatype(),
1264 ),
1265 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1266 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1267 ],
1268 &[1],
1269 ));
1270 let expect_meta = Arc::new(new_metadata(
1271 &[
1272 (
1273 0,
1274 SemanticType::Timestamp,
1275 ConcreteDataType::timestamp_millisecond_datatype(),
1276 ),
1277 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1278 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1279 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1280 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1281 ],
1282 &[1],
1283 ));
1284 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1285 let k1 = encode_key(&[Some("a")]);
1286 let k2 = encode_key(&[Some("b")]);
1287 let source_reader = VecBatchReader::new(&[
1288 new_batch(&k1, &[(2, false)], 1000, 3),
1289 new_batch(&k2, &[(2, false)], 1000, 3),
1290 ]);
1291
1292 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1293 check_reader_result(
1294 &mut compat_reader,
1295 &[
1296 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1297 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1298 ],
1299 )
1300 .await;
1301 }
1302
1303 #[tokio::test]
1304 async fn test_compat_reader_different_types() {
1305 let actual_meta = Arc::new(new_metadata(
1306 &[
1307 (
1308 0,
1309 SemanticType::Timestamp,
1310 ConcreteDataType::timestamp_millisecond_datatype(),
1311 ),
1312 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1313 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1314 ],
1315 &[1],
1316 ));
1317 let expect_meta = Arc::new(new_metadata(
1318 &[
1319 (
1320 0,
1321 SemanticType::Timestamp,
1322 ConcreteDataType::timestamp_millisecond_datatype(),
1323 ),
1324 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1325 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1326 ],
1327 &[1],
1328 ));
1329 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1330 let k1 = encode_key(&[Some("a")]);
1331 let k2 = encode_key(&[Some("b")]);
1332 let source_reader = VecBatchReader::new(&[
1333 new_batch(&k1, &[(2, false)], 1000, 3),
1334 new_batch(&k2, &[(2, false)], 1000, 3),
1335 ]);
1336
1337 let fn_batch_cast = |batch: Batch| {
1338 let mut new_fields = batch.fields.clone();
1339 new_fields[0].data = new_fields[0]
1340 .data
1341 .cast(&ConcreteDataType::string_datatype())
1342 .unwrap();
1343
1344 batch.with_fields(new_fields).unwrap()
1345 };
1346 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1347 check_reader_result(
1348 &mut compat_reader,
1349 &[
1350 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1351 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1352 ],
1353 )
1354 .await;
1355 }
1356
1357 #[tokio::test]
1358 async fn test_compat_reader_projection() {
1359 let reader_meta = Arc::new(new_metadata(
1360 &[
1361 (
1362 0,
1363 SemanticType::Timestamp,
1364 ConcreteDataType::timestamp_millisecond_datatype(),
1365 ),
1366 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1367 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1368 ],
1369 &[1],
1370 ));
1371 let expect_meta = Arc::new(new_metadata(
1372 &[
1373 (
1374 0,
1375 SemanticType::Timestamp,
1376 ConcreteDataType::timestamp_millisecond_datatype(),
1377 ),
1378 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1379 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1381 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1382 ],
1383 &[1],
1384 ));
1385 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1387 let k1 = encode_key(&[Some("a")]);
1388 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1389
1390 let mut compat_reader =
1391 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1392 check_reader_result(
1393 &mut compat_reader,
1394 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1395 )
1396 .await;
1397
1398 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1400 let k1 = encode_key(&[Some("a")]);
1401 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1402
1403 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1404 check_reader_result(
1405 &mut compat_reader,
1406 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1407 )
1408 .await;
1409 }
1410
1411 #[tokio::test]
1412 async fn test_compat_reader_different_pk_encoding() {
1413 let mut reader_meta = new_metadata(
1414 &[
1415 (
1416 0,
1417 SemanticType::Timestamp,
1418 ConcreteDataType::timestamp_millisecond_datatype(),
1419 ),
1420 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1421 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1422 ],
1423 &[1],
1424 );
1425 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1426 let reader_meta = Arc::new(reader_meta);
1427 let mut expect_meta = new_metadata(
1428 &[
1429 (
1430 0,
1431 SemanticType::Timestamp,
1432 ConcreteDataType::timestamp_millisecond_datatype(),
1433 ),
1434 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1435 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1436 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1437 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1438 ],
1439 &[1, 3],
1440 );
1441 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1442 let expect_meta = Arc::new(expect_meta);
1443
1444 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1445 let k1 = encode_key(&[Some("a")]);
1446 let k2 = encode_key(&[Some("b")]);
1447 let source_reader = VecBatchReader::new(&[
1448 new_batch(&k1, &[(2, false)], 1000, 3),
1449 new_batch(&k2, &[(2, false)], 1000, 3),
1450 ]);
1451
1452 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1453 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1454 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1455 check_reader_result(
1456 &mut compat_reader,
1457 &[
1458 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1459 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1460 ],
1461 )
1462 .await;
1463 }
1464
1465 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1467 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1468 for &pk in primary_keys {
1469 builder.append(pk).unwrap();
1470 }
1471 Arc::new(builder.finish())
1472 }
1473
1474 #[test]
1475 fn test_flat_compat_batch_with_missing_columns() {
1476 let actual_metadata = Arc::new(new_metadata(
1477 &[
1478 (
1479 0,
1480 SemanticType::Timestamp,
1481 ConcreteDataType::timestamp_millisecond_datatype(),
1482 ),
1483 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1484 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1485 ],
1486 &[1],
1487 ));
1488
1489 let expected_metadata = Arc::new(new_metadata(
1490 &[
1491 (
1492 0,
1493 SemanticType::Timestamp,
1494 ConcreteDataType::timestamp_millisecond_datatype(),
1495 ),
1496 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1497 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1498 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1500 ],
1501 &[1],
1502 ));
1503
1504 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1505 let read_format = FlatReadFormat::new(
1506 actual_metadata.clone(),
1507 [0, 1, 2, 3].into_iter(),
1508 None,
1509 "test",
1510 false,
1511 )
1512 .unwrap();
1513 let format_projection = read_format.format_projection();
1514
1515 let compat_batch =
1516 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1517 .unwrap()
1518 .unwrap();
1519
1520 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1521 tag_builder.append_value("tag1");
1522 tag_builder.append_value("tag1");
1523 let tag_dict_array = Arc::new(tag_builder.finish());
1524
1525 let k1 = encode_key(&[Some("tag1")]);
1526 let input_columns: Vec<ArrayRef> = vec![
1527 tag_dict_array.clone(),
1528 Arc::new(Int64Array::from(vec![100, 200])),
1529 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1530 build_flat_test_pk_array(&[&k1, &k1]),
1531 Arc::new(UInt64Array::from_iter_values([1, 2])),
1532 Arc::new(UInt8Array::from_iter_values([
1533 OpType::Put as u8,
1534 OpType::Put as u8,
1535 ])),
1536 ];
1537 let input_schema =
1538 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1539 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1540
1541 let result = compat_batch.compat(input_batch).unwrap();
1542
1543 let expected_schema =
1544 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1545
1546 let expected_columns: Vec<ArrayRef> = vec![
1547 tag_dict_array.clone(),
1548 Arc::new(Int64Array::from(vec![100, 200])),
1549 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1550 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1551 build_flat_test_pk_array(&[&k1, &k1]),
1552 Arc::new(UInt64Array::from_iter_values([1, 2])),
1553 Arc::new(UInt8Array::from_iter_values([
1554 OpType::Put as u8,
1555 OpType::Put as u8,
1556 ])),
1557 ];
1558 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1559
1560 assert_eq!(expected_batch, result);
1561 }
1562
1563 #[test]
1564 fn test_flat_compat_batch_with_different_pk_encoding() {
1565 let mut actual_metadata = new_metadata(
1566 &[
1567 (
1568 0,
1569 SemanticType::Timestamp,
1570 ConcreteDataType::timestamp_millisecond_datatype(),
1571 ),
1572 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1573 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1574 ],
1575 &[1],
1576 );
1577 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1578 let actual_metadata = Arc::new(actual_metadata);
1579
1580 let mut expected_metadata = new_metadata(
1581 &[
1582 (
1583 0,
1584 SemanticType::Timestamp,
1585 ConcreteDataType::timestamp_millisecond_datatype(),
1586 ),
1587 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1588 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1589 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1590 ],
1591 &[1, 3],
1592 );
1593 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1594 let expected_metadata = Arc::new(expected_metadata);
1595
1596 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1597 let read_format = FlatReadFormat::new(
1598 actual_metadata.clone(),
1599 [0, 1, 2, 3].into_iter(),
1600 None,
1601 "test",
1602 false,
1603 )
1604 .unwrap();
1605 let format_projection = read_format.format_projection();
1606
1607 let compat_batch =
1608 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1609 .unwrap()
1610 .unwrap();
1611
1612 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1614 tag1_builder.append_value("tag1");
1615 tag1_builder.append_value("tag1");
1616 let tag1_dict_array = Arc::new(tag1_builder.finish());
1617
1618 let k1 = encode_key(&[Some("tag1")]);
1619 let input_columns: Vec<ArrayRef> = vec![
1620 tag1_dict_array.clone(),
1621 Arc::new(Int64Array::from(vec![100, 200])),
1622 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1623 build_flat_test_pk_array(&[&k1, &k1]),
1624 Arc::new(UInt64Array::from_iter_values([1, 2])),
1625 Arc::new(UInt8Array::from_iter_values([
1626 OpType::Put as u8,
1627 OpType::Put as u8,
1628 ])),
1629 ];
1630 let input_schema =
1631 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1632 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1633
1634 let result = compat_batch.compat(input_batch).unwrap();
1635
1636 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1637 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1638 null_tag_builder.append_nulls(2);
1639 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1640 let expected_columns: Vec<ArrayRef> = vec![
1641 tag1_dict_array.clone(),
1642 null_tag_dict_array,
1643 Arc::new(Int64Array::from(vec![100, 200])),
1644 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1645 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1646 Arc::new(UInt64Array::from_iter_values([1, 2])),
1647 Arc::new(UInt8Array::from_iter_values([
1648 OpType::Put as u8,
1649 OpType::Put as u8,
1650 ])),
1651 ];
1652 let output_schema =
1653 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1654 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1655
1656 assert_eq!(expected_batch, result);
1657 }
1658
1659 #[test]
1660 fn test_flat_compat_batch_compact_sparse() {
1661 let mut actual_metadata = new_metadata(
1662 &[
1663 (
1664 0,
1665 SemanticType::Timestamp,
1666 ConcreteDataType::timestamp_millisecond_datatype(),
1667 ),
1668 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1669 ],
1670 &[],
1671 );
1672 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1673 let actual_metadata = Arc::new(actual_metadata);
1674
1675 let mut expected_metadata = new_metadata(
1676 &[
1677 (
1678 0,
1679 SemanticType::Timestamp,
1680 ConcreteDataType::timestamp_millisecond_datatype(),
1681 ),
1682 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1683 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1684 ],
1685 &[],
1686 );
1687 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1688 let expected_metadata = Arc::new(expected_metadata);
1689
1690 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1691 let read_format = FlatReadFormat::new(
1692 actual_metadata.clone(),
1693 [0, 2, 3].into_iter(),
1694 None,
1695 "test",
1696 true,
1697 )
1698 .unwrap();
1699 let format_projection = read_format.format_projection();
1700
1701 let compat_batch =
1702 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1703 .unwrap()
1704 .unwrap();
1705
1706 let sparse_k1 = encode_sparse_key(&[]);
1707 let input_columns: Vec<ArrayRef> = vec![
1708 Arc::new(Int64Array::from(vec![100, 200])),
1709 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1710 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1711 Arc::new(UInt64Array::from_iter_values([1, 2])),
1712 Arc::new(UInt8Array::from_iter_values([
1713 OpType::Put as u8,
1714 OpType::Put as u8,
1715 ])),
1716 ];
1717 let input_schema =
1718 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1719 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1720
1721 let result = compat_batch.compat(input_batch).unwrap();
1722
1723 let expected_columns: Vec<ArrayRef> = vec![
1724 Arc::new(Int64Array::from(vec![100, 200])),
1725 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1726 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1727 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1728 Arc::new(UInt64Array::from_iter_values([1, 2])),
1729 Arc::new(UInt8Array::from_iter_values([
1730 OpType::Put as u8,
1731 OpType::Put as u8,
1732 ])),
1733 ];
1734 let output_schema =
1735 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1736 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1737
1738 assert_eq!(expected_batch, result);
1739 }
1740}