1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{take, TakeOptions};
25use datatypes::arrow::datatypes::{Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use mito_codec::row_converter::{
32 build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
33 SortField,
34};
35use snafu::{ensure, OptionExt, ResultExt};
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::storage::ColumnId;
38
39use crate::error::{
40 CompatReaderSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu,
41 NewRecordBatchSnafu, Result, UnexpectedSnafu,
42};
43use crate::read::flat_projection::{flat_projected_columns, FlatProjectionMapper};
44use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
45use crate::read::{Batch, BatchColumn, BatchReader};
46use crate::sst::parquet::flat_format::primary_key_column_index;
47use crate::sst::parquet::format::{FormatProjection, PrimaryKeyArray, INTERNAL_COLUMN_NUM};
48use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
49
50pub struct CompatReader<R> {
52 reader: R,
54 compat: PrimaryKeyCompatBatch,
56}
57
58impl<R> CompatReader<R> {
59 pub fn new(
64 mapper: &ProjectionMapper,
65 reader_meta: RegionMetadataRef,
66 reader: R,
67 ) -> Result<CompatReader<R>> {
68 Ok(CompatReader {
69 reader,
70 compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
71 })
72 }
73}
74
75#[async_trait::async_trait]
76impl<R: BatchReader> BatchReader for CompatReader<R> {
77 async fn next_batch(&mut self) -> Result<Option<Batch>> {
78 let Some(mut batch) = self.reader.next_batch().await? else {
79 return Ok(None);
80 };
81
82 batch = self.compat.compat_batch(batch)?;
83
84 Ok(Some(batch))
85 }
86}
87
88pub(crate) enum CompatBatch {
90 PrimaryKey(PrimaryKeyCompatBatch),
92 #[allow(dead_code)]
94 Flat(FlatCompatBatch),
95}
96
97impl CompatBatch {
98 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
100 match self {
101 CompatBatch::PrimaryKey(batch) => Some(batch),
102 _ => None,
103 }
104 }
105
106 #[allow(dead_code)]
108 pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
109 match self {
110 CompatBatch::Flat(batch) => Some(batch),
111 _ => None,
112 }
113 }
114}
115
116pub(crate) struct PrimaryKeyCompatBatch {
118 rewrite_pk: Option<RewritePrimaryKey>,
120 compat_pk: Option<CompatPrimaryKey>,
122 compat_fields: Option<CompatFields>,
124}
125
126impl PrimaryKeyCompatBatch {
127 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
131 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
132 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
133 let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
134 reason: "Unexpected format",
135 })?;
136 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
137
138 Ok(Self {
139 rewrite_pk,
140 compat_pk,
141 compat_fields,
142 })
143 }
144
145 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
147 if let Some(rewrite_pk) = &self.rewrite_pk {
148 batch = rewrite_pk.compat(batch)?;
149 }
150 if let Some(compat_pk) = &self.compat_pk {
151 batch = compat_pk.compat(batch)?;
152 }
153 if let Some(compat_fields) = &self.compat_fields {
154 batch = compat_fields.compat(batch);
155 }
156
157 Ok(batch)
158 }
159}
160
161pub(crate) fn has_same_columns_and_pk_encoding(
163 left: &RegionMetadata,
164 right: &RegionMetadata,
165) -> bool {
166 if left.primary_key_encoding != right.primary_key_encoding {
167 return false;
168 }
169
170 if left.column_metadatas.len() != right.column_metadatas.len() {
171 return false;
172 }
173
174 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
175 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
176 return false;
177 }
178 debug_assert_eq!(
179 left_col.column_schema.data_type,
180 right_col.column_schema.data_type
181 );
182 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
183 }
184
185 true
186}
187
188#[allow(dead_code)]
190pub(crate) struct FlatCompatBatch {
191 index_or_defaults: Vec<IndexOrDefault>,
193 arrow_schema: SchemaRef,
195 compat_pk: FlatCompatPrimaryKey,
197}
198
199impl FlatCompatBatch {
200 pub(crate) fn try_new(
206 mapper: &FlatProjectionMapper,
207 actual: &RegionMetadataRef,
208 format_projection: &FormatProjection,
209 ) -> Result<Self> {
210 let actual_schema = flat_projected_columns(actual, format_projection);
211 let expect_schema = mapper.batch_schema();
212 debug_assert_ne!(expect_schema, actual_schema);
214
215 let actual_schema_index: HashMap<_, _> = actual_schema
217 .iter()
218 .enumerate()
219 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
220 .collect();
221
222 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
223 let mut fields = Vec::with_capacity(expect_schema.len());
224 for (column_id, expect_data_type) in expect_schema {
225 let column_index = mapper.metadata().column_index_by_id(*column_id).unwrap();
227 let expect_column = &mapper.metadata().column_metadatas[column_index];
228 let column_field = &mapper.metadata().schema.arrow_schema().fields()[column_index];
229 if expect_column.semantic_type == SemanticType::Tag {
231 fields.push(tag_maybe_to_dictionary_field(
232 &expect_column.column_schema.data_type,
233 column_field,
234 ));
235 } else {
236 fields.push(column_field.clone());
237 };
238
239 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
240 let mut cast_type = None;
241
242 if expect_data_type != *actual_data_type {
244 cast_type = Some(expect_data_type.clone())
245 }
246 index_or_defaults.push(IndexOrDefault::Index {
248 pos: *index,
249 cast_type,
250 });
251 } else {
252 let default_vector = expect_column
254 .column_schema
255 .create_default_vector(1)
256 .context(CreateDefaultSnafu {
257 region_id: mapper.metadata().region_id,
258 column: &expect_column.column_schema.name,
259 })?
260 .with_context(|| CompatReaderSnafu {
261 region_id: mapper.metadata().region_id,
262 reason: format!(
263 "column {} does not have a default value to read",
264 expect_column.column_schema.name
265 ),
266 })?;
267 index_or_defaults.push(IndexOrDefault::DefaultValue {
268 column_id: expect_column.column_id,
269 default_vector,
270 semantic_type: expect_column.semantic_type,
271 });
272 };
273 }
274 fields.extend_from_slice(&internal_fields());
275
276 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
277
278 Ok(Self {
279 index_or_defaults,
280 arrow_schema: Arc::new(Schema::new(fields)),
281 compat_pk,
282 })
283 }
284
285 #[allow(dead_code)]
287 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
288 let len = batch.num_rows();
289 let columns = self
290 .index_or_defaults
291 .iter()
292 .map(|index_or_default| match index_or_default {
293 IndexOrDefault::Index { pos, cast_type } => {
294 let old_column = batch.column(*pos);
295
296 if let Some(ty) = cast_type {
297 let casted =
300 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
301 .context(ComputeArrowSnafu)?;
302 Ok(casted)
303 } else {
304 Ok(old_column.clone())
305 }
306 }
307 IndexOrDefault::DefaultValue {
308 column_id: _,
309 default_vector,
310 semantic_type,
311 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
312 })
313 .chain(
314 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
316 .iter()
317 .map(|col| Ok(col.clone())),
318 )
319 .collect::<Result<Vec<_>>>()?;
320
321 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
322 .context(NewRecordBatchSnafu)?;
323
324 self.compat_pk.compat(compat_batch)
326 }
327}
328
329fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
331 assert_eq!(1, vector.len());
332 if is_tag {
333 let values = vector.to_arrow_array();
334 if values.is_null(0) {
335 let keys = UInt32Array::new_null(to_len);
337 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
338 } else {
339 let keys = UInt32Array::from_value(0, to_len);
340 Ok(Arc::new(DictionaryArray::new(keys, values)))
341 }
342 } else {
343 let keys = UInt32Array::from_value(0, to_len);
344 take(
345 &vector.to_arrow_array(),
346 &keys,
347 Some(TakeOptions {
348 check_bounds: false,
349 }),
350 )
351 .context(ComputeArrowSnafu)
352 }
353}
354
355#[derive(Debug)]
357struct CompatPrimaryKey {
358 converter: Arc<dyn PrimaryKeyCodec>,
360 values: Vec<(ColumnId, Value)>,
362}
363
364impl CompatPrimaryKey {
365 fn compat(&self, mut batch: Batch) -> Result<Batch> {
367 let mut buffer = Vec::with_capacity(
368 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
369 );
370 buffer.extend_from_slice(batch.primary_key());
371 self.converter
372 .encode_values(&self.values, &mut buffer)
373 .context(EncodeSnafu)?;
374
375 batch.set_primary_key(buffer);
376
377 if let Some(pk_values) = &mut batch.pk_values {
379 pk_values.extend(&self.values);
380 }
381
382 Ok(batch)
383 }
384}
385
386#[derive(Debug)]
388struct CompatFields {
389 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
391 index_or_defaults: Vec<IndexOrDefault>,
393}
394
395impl CompatFields {
396 #[must_use]
398 fn compat(&self, batch: Batch) -> Batch {
399 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
400 debug_assert!(self
401 .actual_fields
402 .iter()
403 .zip(batch.fields())
404 .all(|((id, _), batch_column)| *id == batch_column.column_id));
405
406 let len = batch.num_rows();
407 let fields = self
408 .index_or_defaults
409 .iter()
410 .map(|index_or_default| match index_or_default {
411 IndexOrDefault::Index { pos, cast_type } => {
412 let old_column = &batch.fields()[*pos];
413
414 let data = if let Some(ty) = cast_type {
415 old_column.data.cast(ty).unwrap()
418 } else {
419 old_column.data.clone()
420 };
421 BatchColumn {
422 column_id: old_column.column_id,
423 data,
424 }
425 }
426 IndexOrDefault::DefaultValue {
427 column_id,
428 default_vector,
429 semantic_type: _,
430 } => {
431 let data = default_vector.replicate(&[len]);
432 BatchColumn {
433 column_id: *column_id,
434 data,
435 }
436 }
437 })
438 .collect();
439
440 batch.with_fields(fields).unwrap()
442 }
443}
444
445fn may_rewrite_primary_key(
446 expect: &RegionMetadata,
447 actual: &RegionMetadata,
448) -> Option<RewritePrimaryKey> {
449 if expect.primary_key_encoding == actual.primary_key_encoding {
450 return None;
451 }
452
453 let fields = expect.primary_key.clone();
454 let original = build_primary_key_codec(actual);
455 let new = build_primary_key_codec(expect);
456
457 Some(RewritePrimaryKey {
458 original,
459 new,
460 fields,
461 })
462}
463
464fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
466 ensure!(
467 actual.primary_key.len() <= expect.primary_key.len(),
468 CompatReaderSnafu {
469 region_id: expect.region_id,
470 reason: format!(
471 "primary key has more columns {} than expect {}",
472 actual.primary_key.len(),
473 expect.primary_key.len()
474 ),
475 }
476 );
477 ensure!(
478 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
479 CompatReaderSnafu {
480 region_id: expect.region_id,
481 reason: format!(
482 "primary key has different prefix, expect: {:?}, actual: {:?}",
483 expect.primary_key, actual.primary_key
484 ),
485 }
486 );
487
488 Ok(actual.primary_key.len() == expect.primary_key.len())
489}
490
491fn may_compat_primary_key(
493 expect: &RegionMetadata,
494 actual: &RegionMetadata,
495) -> Result<Option<CompatPrimaryKey>> {
496 if is_primary_key_same(expect, actual)? {
497 return Ok(None);
498 }
499
500 let to_add = &expect.primary_key[actual.primary_key.len()..];
502 let mut fields = Vec::with_capacity(to_add.len());
503 let mut values = Vec::with_capacity(to_add.len());
504 for column_id in to_add {
505 let column = expect.column_by_id(*column_id).unwrap();
507 fields.push((
508 *column_id,
509 SortField::new(column.column_schema.data_type.clone()),
510 ));
511 let default_value = column
512 .column_schema
513 .create_default()
514 .context(CreateDefaultSnafu {
515 region_id: expect.region_id,
516 column: &column.column_schema.name,
517 })?
518 .with_context(|| CompatReaderSnafu {
519 region_id: expect.region_id,
520 reason: format!(
521 "key column {} does not have a default value to read",
522 column.column_schema.name
523 ),
524 })?;
525 values.push((*column_id, default_value));
526 }
527 let converter =
529 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
530
531 Ok(Some(CompatPrimaryKey { converter, values }))
532}
533
534fn may_compat_fields(
536 mapper: &PrimaryKeyProjectionMapper,
537 actual: &RegionMetadata,
538) -> Result<Option<CompatFields>> {
539 let expect_fields = mapper.batch_fields();
540 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
541 if expect_fields == actual_fields {
542 return Ok(None);
543 }
544
545 let source_field_index: HashMap<_, _> = actual_fields
546 .iter()
547 .enumerate()
548 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
549 .collect();
550
551 let index_or_defaults = expect_fields
552 .iter()
553 .map(|(column_id, expect_data_type)| {
554 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
555 let mut cast_type = None;
556
557 if expect_data_type != *actual_data_type {
558 cast_type = Some(expect_data_type.clone())
559 }
560 Ok(IndexOrDefault::Index {
562 pos: *index,
563 cast_type,
564 })
565 } else {
566 let column = mapper.metadata().column_by_id(*column_id).unwrap();
568 let default_vector = column
570 .column_schema
571 .create_default_vector(1)
572 .context(CreateDefaultSnafu {
573 region_id: mapper.metadata().region_id,
574 column: &column.column_schema.name,
575 })?
576 .with_context(|| CompatReaderSnafu {
577 region_id: mapper.metadata().region_id,
578 reason: format!(
579 "column {} does not have a default value to read",
580 column.column_schema.name
581 ),
582 })?;
583 Ok(IndexOrDefault::DefaultValue {
584 column_id: column.column_id,
585 default_vector,
586 semantic_type: SemanticType::Field,
587 })
588 }
589 })
590 .collect::<Result<Vec<_>>>()?;
591
592 Ok(Some(CompatFields {
593 actual_fields,
594 index_or_defaults,
595 }))
596}
597
598#[derive(Debug)]
600enum IndexOrDefault {
601 Index {
603 pos: usize,
604 cast_type: Option<ConcreteDataType>,
605 },
606 DefaultValue {
608 column_id: ColumnId,
610 default_vector: VectorRef,
612 semantic_type: SemanticType,
614 },
615}
616
617struct RewritePrimaryKey {
619 original: Arc<dyn PrimaryKeyCodec>,
621 new: Arc<dyn PrimaryKeyCodec>,
623 fields: Vec<ColumnId>,
625}
626
627impl RewritePrimaryKey {
628 fn compat(&self, mut batch: Batch) -> Result<Batch> {
630 let values = if let Some(pk_values) = batch.pk_values() {
631 pk_values
632 } else {
633 let new_pk_values = self
634 .original
635 .decode(batch.primary_key())
636 .context(DecodeSnafu)?;
637 batch.set_pk_values(new_pk_values);
638 batch.pk_values().as_ref().unwrap()
640 };
641
642 let mut buffer = Vec::with_capacity(
643 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
644 );
645 match values {
646 CompositeValues::Dense(values) => {
647 self.new
648 .encode_values(values.as_slice(), &mut buffer)
649 .context(EncodeSnafu)?;
650 }
651 CompositeValues::Sparse(values) => {
652 let values = self
653 .fields
654 .iter()
655 .map(|id| {
656 let value = values.get_or_null(*id);
657 (*id, value.as_value_ref())
658 })
659 .collect::<Vec<_>>();
660 self.new
661 .encode_value_refs(&values, &mut buffer)
662 .context(EncodeSnafu)?;
663 }
664 }
665 batch.set_primary_key(buffer);
666
667 Ok(batch)
668 }
669}
670
671struct FlatRewritePrimaryKey {
673 codec: Arc<dyn PrimaryKeyCodec>,
675 metadata: RegionMetadataRef,
677 old_codec: Arc<dyn PrimaryKeyCodec>,
680}
681
682impl FlatRewritePrimaryKey {
683 fn new(
684 expect: &RegionMetadataRef,
685 actual: &RegionMetadataRef,
686 ) -> Option<FlatRewritePrimaryKey> {
687 if expect.primary_key_encoding == actual.primary_key_encoding {
688 return None;
689 }
690 let codec = build_primary_key_codec(expect);
691 let old_codec = build_primary_key_codec(actual);
692
693 Some(FlatRewritePrimaryKey {
694 codec,
695 metadata: expect.clone(),
696 old_codec,
697 })
698 }
699
700 fn rewrite_key(
703 &self,
704 append_values: &[(ColumnId, Value)],
705 batch: RecordBatch,
706 ) -> Result<RecordBatch> {
707 let old_pk_dict_array = batch
708 .column(primary_key_column_index(batch.num_columns()))
709 .as_any()
710 .downcast_ref::<PrimaryKeyArray>()
711 .unwrap();
712 let old_pk_values_array = old_pk_dict_array
713 .values()
714 .as_any()
715 .downcast_ref::<BinaryArray>()
716 .unwrap();
717 let mut builder = BinaryBuilder::with_capacity(
718 old_pk_values_array.len(),
719 old_pk_values_array.value_data().len(),
720 );
721
722 let mut buffer = Vec::with_capacity(
724 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
725 );
726 let mut column_id_values = Vec::new();
727 for value in old_pk_values_array.iter() {
729 let Some(old_pk) = value else {
730 builder.append_null();
731 continue;
732 };
733 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
735 pk_values.extend(append_values);
736
737 buffer.clear();
738 column_id_values.clear();
739 match pk_values {
741 CompositeValues::Dense(dense_values) => {
742 self.codec
743 .encode_values(dense_values.as_slice(), &mut buffer)
744 .context(EncodeSnafu)?;
745 }
746 CompositeValues::Sparse(sparse_values) => {
747 for id in &self.metadata.primary_key {
748 let value = sparse_values.get_or_null(*id);
749 column_id_values.push((*id, value.clone()));
750 }
751 self.codec
752 .encode_values(&column_id_values, &mut buffer)
753 .context(EncodeSnafu)?;
754 }
755 }
756 builder.append_value(&buffer);
757 }
758 let new_pk_values_array = Arc::new(builder.finish());
759 let new_pk_dict_array =
760 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
761
762 let mut columns = batch.columns().to_vec();
763 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
764
765 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
766 }
767}
768
769struct FlatCompatPrimaryKey {
771 rewriter: Option<FlatRewritePrimaryKey>,
773 converter: Option<Arc<dyn PrimaryKeyCodec>>,
775 values: Vec<(ColumnId, Value)>,
777}
778
779impl FlatCompatPrimaryKey {
780 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
781 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
782
783 if is_primary_key_same(expect, actual)? {
784 return Ok(Self {
785 rewriter,
786 converter: None,
787 values: Vec::new(),
788 });
789 }
790
791 let to_add = &expect.primary_key[actual.primary_key.len()..];
793 let mut values = Vec::with_capacity(to_add.len());
794 let mut fields = Vec::with_capacity(to_add.len());
795 for column_id in to_add {
796 let column = expect.column_by_id(*column_id).unwrap();
798 fields.push((
799 *column_id,
800 SortField::new(column.column_schema.data_type.clone()),
801 ));
802 let default_value = column
803 .column_schema
804 .create_default()
805 .context(CreateDefaultSnafu {
806 region_id: expect.region_id,
807 column: &column.column_schema.name,
808 })?
809 .with_context(|| CompatReaderSnafu {
810 region_id: expect.region_id,
811 reason: format!(
812 "key column {} does not have a default value to read",
813 column.column_schema.name
814 ),
815 })?;
816 values.push((*column_id, default_value));
817 }
818 debug_assert!(!fields.is_empty());
820
821 let converter = Some(build_primary_key_codec_with_fields(
823 expect.primary_key_encoding,
824 fields.into_iter(),
825 ));
826
827 Ok(Self {
828 rewriter,
829 converter,
830 values,
831 })
832 }
833
834 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
838 if let Some(rewriter) = &self.rewriter {
839 return rewriter.rewrite_key(&self.values, batch);
841 }
842
843 self.append_key(batch)
844 }
845
846 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
848 let Some(converter) = &self.converter else {
849 return Ok(batch);
850 };
851
852 let old_pk_dict_array = batch
853 .column(primary_key_column_index(batch.num_columns()))
854 .as_any()
855 .downcast_ref::<PrimaryKeyArray>()
856 .unwrap();
857 let old_pk_values_array = old_pk_dict_array
858 .values()
859 .as_any()
860 .downcast_ref::<BinaryArray>()
861 .unwrap();
862 let mut builder = BinaryBuilder::with_capacity(
863 old_pk_values_array.len(),
864 old_pk_values_array.value_data().len()
865 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
866 );
867
868 let mut buffer = Vec::with_capacity(
870 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
871 + converter.estimated_size().unwrap_or_default(),
872 );
873
874 for value in old_pk_values_array.iter() {
876 let Some(old_pk) = value else {
877 builder.append_null();
878 continue;
879 };
880
881 buffer.clear();
882 buffer.extend_from_slice(old_pk);
883 converter
884 .encode_values(&self.values, &mut buffer)
885 .context(EncodeSnafu)?;
886
887 builder.append_value(&buffer);
888 }
889
890 let new_pk_values_array = Arc::new(builder.finish());
891 let new_pk_dict_array =
892 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
893
894 let mut columns = batch.columns().to_vec();
896 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
897
898 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
899 }
900}
901
902#[cfg(test)]
903mod tests {
904 use std::sync::Arc;
905
906 use api::v1::{OpType, SemanticType};
907 use datatypes::arrow::array::{
908 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
909 TimestampMillisecondArray, UInt64Array, UInt8Array,
910 };
911 use datatypes::arrow::datatypes::UInt32Type;
912 use datatypes::arrow::record_batch::RecordBatch;
913 use datatypes::prelude::ConcreteDataType;
914 use datatypes::schema::ColumnSchema;
915 use datatypes::value::ValueRef;
916 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
917 use mito_codec::row_converter::{
918 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
919 };
920 use store_api::codec::PrimaryKeyEncoding;
921 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
922 use store_api::storage::RegionId;
923
924 use super::*;
925 use crate::read::flat_projection::FlatProjectionMapper;
926 use crate::sst::parquet::flat_format::FlatReadFormat;
927 use crate::sst::{to_flat_sst_arrow_schema, FlatSchemaOptions};
928 use crate::test_util::{check_reader_result, VecBatchReader};
929
930 fn new_metadata(
932 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
933 primary_key: &[ColumnId],
934 ) -> RegionMetadata {
935 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
936 for (id, semantic_type, data_type) in semantic_types {
937 let column_schema = match semantic_type {
938 SemanticType::Tag => {
939 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
940 }
941 SemanticType::Field => {
942 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
943 }
944 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
945 };
946
947 builder.push_column_metadata(ColumnMetadata {
948 column_schema,
949 semantic_type: *semantic_type,
950 column_id: *id,
951 });
952 }
953 builder.primary_key(primary_key.to_vec());
954 builder.build().unwrap()
955 }
956
957 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
959 let fields = (0..keys.len())
960 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
961 .collect();
962 let converter = DensePrimaryKeyCodec::with_fields(fields);
963 let row = keys.iter().map(|str_opt| match str_opt {
964 Some(v) => ValueRef::String(v),
965 None => ValueRef::Null,
966 });
967
968 converter.encode(row).unwrap()
969 }
970
971 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
973 let fields = (0..keys.len())
974 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
975 .collect();
976 let converter = SparsePrimaryKeyCodec::with_fields(fields);
977 let row = keys
978 .iter()
979 .map(|(id, str_opt)| match str_opt {
980 Some(v) => (*id, ValueRef::String(v)),
981 None => (*id, ValueRef::Null),
982 })
983 .collect::<Vec<_>>();
984 let mut buffer = vec![];
985 converter.encode_value_refs(&row, &mut buffer).unwrap();
986 buffer
987 }
988
989 fn new_batch(
993 primary_key: &[u8],
994 fields: &[(ColumnId, bool)],
995 start_ts: i64,
996 num_rows: usize,
997 ) -> Batch {
998 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
999 start_ts..start_ts + num_rows as i64,
1000 ));
1001 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1002 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1003 let field_columns = fields
1004 .iter()
1005 .map(|(id, is_null)| {
1006 let data = if *is_null {
1007 Arc::new(Int64Vector::from(vec![None; num_rows]))
1008 } else {
1009 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1010 };
1011 BatchColumn {
1012 column_id: *id,
1013 data,
1014 }
1015 })
1016 .collect();
1017 Batch::new(
1018 primary_key.to_vec(),
1019 timestamps,
1020 sequences,
1021 op_types,
1022 field_columns,
1023 )
1024 .unwrap()
1025 }
1026
1027 #[test]
1028 fn test_invalid_pk_len() {
1029 let reader_meta = new_metadata(
1030 &[
1031 (
1032 0,
1033 SemanticType::Timestamp,
1034 ConcreteDataType::timestamp_millisecond_datatype(),
1035 ),
1036 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1037 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1038 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1039 ],
1040 &[1, 2],
1041 );
1042 let expect_meta = new_metadata(
1043 &[
1044 (
1045 0,
1046 SemanticType::Timestamp,
1047 ConcreteDataType::timestamp_millisecond_datatype(),
1048 ),
1049 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1050 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1051 ],
1052 &[1],
1053 );
1054 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1055 }
1056
1057 #[test]
1058 fn test_different_pk() {
1059 let reader_meta = new_metadata(
1060 &[
1061 (
1062 0,
1063 SemanticType::Timestamp,
1064 ConcreteDataType::timestamp_millisecond_datatype(),
1065 ),
1066 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1067 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1068 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1069 ],
1070 &[2, 1],
1071 );
1072 let expect_meta = new_metadata(
1073 &[
1074 (
1075 0,
1076 SemanticType::Timestamp,
1077 ConcreteDataType::timestamp_millisecond_datatype(),
1078 ),
1079 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1080 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1081 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1082 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1083 ],
1084 &[1, 2, 4],
1085 );
1086 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1087 }
1088
1089 #[test]
1090 fn test_same_pk() {
1091 let reader_meta = new_metadata(
1092 &[
1093 (
1094 0,
1095 SemanticType::Timestamp,
1096 ConcreteDataType::timestamp_millisecond_datatype(),
1097 ),
1098 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1099 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1100 ],
1101 &[1],
1102 );
1103 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
1104 .unwrap()
1105 .is_none());
1106 }
1107
1108 #[test]
1109 fn test_same_pk_encoding() {
1110 let reader_meta = Arc::new(new_metadata(
1111 &[
1112 (
1113 0,
1114 SemanticType::Timestamp,
1115 ConcreteDataType::timestamp_millisecond_datatype(),
1116 ),
1117 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1118 ],
1119 &[1],
1120 ));
1121
1122 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
1123 .unwrap()
1124 .is_none());
1125 }
1126
1127 #[test]
1128 fn test_same_fields() {
1129 let reader_meta = Arc::new(new_metadata(
1130 &[
1131 (
1132 0,
1133 SemanticType::Timestamp,
1134 ConcreteDataType::timestamp_millisecond_datatype(),
1135 ),
1136 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1137 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1138 ],
1139 &[1],
1140 ));
1141 let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1142 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1143 }
1144
1145 #[tokio::test]
1146 async fn test_compat_reader() {
1147 let reader_meta = Arc::new(new_metadata(
1148 &[
1149 (
1150 0,
1151 SemanticType::Timestamp,
1152 ConcreteDataType::timestamp_millisecond_datatype(),
1153 ),
1154 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1155 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1156 ],
1157 &[1],
1158 ));
1159 let expect_meta = Arc::new(new_metadata(
1160 &[
1161 (
1162 0,
1163 SemanticType::Timestamp,
1164 ConcreteDataType::timestamp_millisecond_datatype(),
1165 ),
1166 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1167 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1168 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1169 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1170 ],
1171 &[1, 3],
1172 ));
1173 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1174 let k1 = encode_key(&[Some("a")]);
1175 let k2 = encode_key(&[Some("b")]);
1176 let source_reader = VecBatchReader::new(&[
1177 new_batch(&k1, &[(2, false)], 1000, 3),
1178 new_batch(&k2, &[(2, false)], 1000, 3),
1179 ]);
1180
1181 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1182 let k1 = encode_key(&[Some("a"), None]);
1183 let k2 = encode_key(&[Some("b"), None]);
1184 check_reader_result(
1185 &mut compat_reader,
1186 &[
1187 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1188 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1189 ],
1190 )
1191 .await;
1192 }
1193
1194 #[tokio::test]
1195 async fn test_compat_reader_different_order() {
1196 let reader_meta = Arc::new(new_metadata(
1197 &[
1198 (
1199 0,
1200 SemanticType::Timestamp,
1201 ConcreteDataType::timestamp_millisecond_datatype(),
1202 ),
1203 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1204 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1205 ],
1206 &[1],
1207 ));
1208 let expect_meta = Arc::new(new_metadata(
1209 &[
1210 (
1211 0,
1212 SemanticType::Timestamp,
1213 ConcreteDataType::timestamp_millisecond_datatype(),
1214 ),
1215 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1216 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1217 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1218 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1219 ],
1220 &[1],
1221 ));
1222 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1223 let k1 = encode_key(&[Some("a")]);
1224 let k2 = encode_key(&[Some("b")]);
1225 let source_reader = VecBatchReader::new(&[
1226 new_batch(&k1, &[(2, false)], 1000, 3),
1227 new_batch(&k2, &[(2, false)], 1000, 3),
1228 ]);
1229
1230 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1231 check_reader_result(
1232 &mut compat_reader,
1233 &[
1234 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1235 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1236 ],
1237 )
1238 .await;
1239 }
1240
1241 #[tokio::test]
1242 async fn test_compat_reader_different_types() {
1243 let actual_meta = Arc::new(new_metadata(
1244 &[
1245 (
1246 0,
1247 SemanticType::Timestamp,
1248 ConcreteDataType::timestamp_millisecond_datatype(),
1249 ),
1250 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1251 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1252 ],
1253 &[1],
1254 ));
1255 let expect_meta = Arc::new(new_metadata(
1256 &[
1257 (
1258 0,
1259 SemanticType::Timestamp,
1260 ConcreteDataType::timestamp_millisecond_datatype(),
1261 ),
1262 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1263 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1264 ],
1265 &[1],
1266 ));
1267 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1268 let k1 = encode_key(&[Some("a")]);
1269 let k2 = encode_key(&[Some("b")]);
1270 let source_reader = VecBatchReader::new(&[
1271 new_batch(&k1, &[(2, false)], 1000, 3),
1272 new_batch(&k2, &[(2, false)], 1000, 3),
1273 ]);
1274
1275 let fn_batch_cast = |batch: Batch| {
1276 let mut new_fields = batch.fields.clone();
1277 new_fields[0].data = new_fields[0]
1278 .data
1279 .cast(&ConcreteDataType::string_datatype())
1280 .unwrap();
1281
1282 batch.with_fields(new_fields).unwrap()
1283 };
1284 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1285 check_reader_result(
1286 &mut compat_reader,
1287 &[
1288 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1289 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1290 ],
1291 )
1292 .await;
1293 }
1294
1295 #[tokio::test]
1296 async fn test_compat_reader_projection() {
1297 let reader_meta = Arc::new(new_metadata(
1298 &[
1299 (
1300 0,
1301 SemanticType::Timestamp,
1302 ConcreteDataType::timestamp_millisecond_datatype(),
1303 ),
1304 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1305 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1306 ],
1307 &[1],
1308 ));
1309 let expect_meta = Arc::new(new_metadata(
1310 &[
1311 (
1312 0,
1313 SemanticType::Timestamp,
1314 ConcreteDataType::timestamp_millisecond_datatype(),
1315 ),
1316 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1317 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1318 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1319 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1320 ],
1321 &[1],
1322 ));
1323 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1325 let k1 = encode_key(&[Some("a")]);
1326 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1327
1328 let mut compat_reader =
1329 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1330 check_reader_result(
1331 &mut compat_reader,
1332 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1333 )
1334 .await;
1335
1336 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1338 let k1 = encode_key(&[Some("a")]);
1339 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1340
1341 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1342 check_reader_result(
1343 &mut compat_reader,
1344 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1345 )
1346 .await;
1347 }
1348
1349 #[tokio::test]
1350 async fn test_compat_reader_different_pk_encoding() {
1351 let mut reader_meta = new_metadata(
1352 &[
1353 (
1354 0,
1355 SemanticType::Timestamp,
1356 ConcreteDataType::timestamp_millisecond_datatype(),
1357 ),
1358 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1359 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1360 ],
1361 &[1],
1362 );
1363 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1364 let reader_meta = Arc::new(reader_meta);
1365 let mut expect_meta = new_metadata(
1366 &[
1367 (
1368 0,
1369 SemanticType::Timestamp,
1370 ConcreteDataType::timestamp_millisecond_datatype(),
1371 ),
1372 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1373 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1374 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1375 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1376 ],
1377 &[1, 3],
1378 );
1379 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1380 let expect_meta = Arc::new(expect_meta);
1381
1382 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1383 let k1 = encode_key(&[Some("a")]);
1384 let k2 = encode_key(&[Some("b")]);
1385 let source_reader = VecBatchReader::new(&[
1386 new_batch(&k1, &[(2, false)], 1000, 3),
1387 new_batch(&k2, &[(2, false)], 1000, 3),
1388 ]);
1389
1390 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1391 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1392 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1393 check_reader_result(
1394 &mut compat_reader,
1395 &[
1396 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1397 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1398 ],
1399 )
1400 .await;
1401 }
1402
1403 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1405 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1406 for &pk in primary_keys {
1407 builder.append(pk).unwrap();
1408 }
1409 Arc::new(builder.finish())
1410 }
1411
1412 #[test]
1413 fn test_flat_compat_batch_with_missing_columns() {
1414 let actual_metadata = Arc::new(new_metadata(
1415 &[
1416 (
1417 0,
1418 SemanticType::Timestamp,
1419 ConcreteDataType::timestamp_millisecond_datatype(),
1420 ),
1421 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1422 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1423 ],
1424 &[1],
1425 ));
1426
1427 let expected_metadata = Arc::new(new_metadata(
1428 &[
1429 (
1430 0,
1431 SemanticType::Timestamp,
1432 ConcreteDataType::timestamp_millisecond_datatype(),
1433 ),
1434 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1435 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1436 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1438 ],
1439 &[1],
1440 ));
1441
1442 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1443 let read_format =
1444 FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
1445 let format_projection = read_format.format_projection();
1446
1447 let compat_batch =
1448 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
1449
1450 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1451 tag_builder.append_value("tag1");
1452 tag_builder.append_value("tag1");
1453 let tag_dict_array = Arc::new(tag_builder.finish());
1454
1455 let k1 = encode_key(&[Some("tag1")]);
1456 let input_columns: Vec<ArrayRef> = vec![
1457 tag_dict_array.clone(),
1458 Arc::new(Int64Array::from(vec![100, 200])),
1459 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1460 build_flat_test_pk_array(&[&k1, &k1]),
1461 Arc::new(UInt64Array::from_iter_values([1, 2])),
1462 Arc::new(UInt8Array::from_iter_values([
1463 OpType::Put as u8,
1464 OpType::Put as u8,
1465 ])),
1466 ];
1467 let input_schema =
1468 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1469 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1470
1471 let result = compat_batch.compat(input_batch).unwrap();
1472
1473 let expected_schema =
1474 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1475
1476 let expected_columns: Vec<ArrayRef> = vec![
1477 tag_dict_array.clone(),
1478 Arc::new(Int64Array::from(vec![100, 200])),
1479 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1480 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1481 build_flat_test_pk_array(&[&k1, &k1]),
1482 Arc::new(UInt64Array::from_iter_values([1, 2])),
1483 Arc::new(UInt8Array::from_iter_values([
1484 OpType::Put as u8,
1485 OpType::Put as u8,
1486 ])),
1487 ];
1488 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1489
1490 assert_eq!(expected_batch, result);
1491 }
1492
1493 #[test]
1494 fn test_flat_compat_batch_with_different_pk_encoding() {
1495 let mut actual_metadata = new_metadata(
1496 &[
1497 (
1498 0,
1499 SemanticType::Timestamp,
1500 ConcreteDataType::timestamp_millisecond_datatype(),
1501 ),
1502 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1503 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1504 ],
1505 &[1],
1506 );
1507 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1508 let actual_metadata = Arc::new(actual_metadata);
1509
1510 let mut expected_metadata = new_metadata(
1511 &[
1512 (
1513 0,
1514 SemanticType::Timestamp,
1515 ConcreteDataType::timestamp_millisecond_datatype(),
1516 ),
1517 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1518 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1519 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1520 ],
1521 &[1, 3],
1522 );
1523 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1524 let expected_metadata = Arc::new(expected_metadata);
1525
1526 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1527 let read_format =
1528 FlatReadFormat::new(actual_metadata.clone(), [0, 1, 2, 3].into_iter(), false);
1529 let format_projection = read_format.format_projection();
1530
1531 let compat_batch =
1532 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection).unwrap();
1533
1534 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1536 tag1_builder.append_value("tag1");
1537 tag1_builder.append_value("tag1");
1538 let tag1_dict_array = Arc::new(tag1_builder.finish());
1539
1540 let k1 = encode_key(&[Some("tag1")]);
1541 let input_columns: Vec<ArrayRef> = vec![
1542 tag1_dict_array.clone(),
1543 Arc::new(Int64Array::from(vec![100, 200])),
1544 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1545 build_flat_test_pk_array(&[&k1, &k1]),
1546 Arc::new(UInt64Array::from_iter_values([1, 2])),
1547 Arc::new(UInt8Array::from_iter_values([
1548 OpType::Put as u8,
1549 OpType::Put as u8,
1550 ])),
1551 ];
1552 let input_schema =
1553 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1554 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1555
1556 let result = compat_batch.compat(input_batch).unwrap();
1557
1558 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1559 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1560 null_tag_builder.append_nulls(2);
1561 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1562 let expected_columns: Vec<ArrayRef> = vec![
1563 tag1_dict_array.clone(),
1564 null_tag_dict_array,
1565 Arc::new(Int64Array::from(vec![100, 200])),
1566 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1567 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1568 Arc::new(UInt64Array::from_iter_values([1, 2])),
1569 Arc::new(UInt8Array::from_iter_values([
1570 OpType::Put as u8,
1571 OpType::Put as u8,
1572 ])),
1573 ];
1574 let output_schema =
1575 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1576 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1577
1578 assert_eq!(expected_batch, result);
1579 }
1580}