1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use common_recordbatch::recordbatch::align_json_array;
22use datatypes::arrow::array::{
23 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
24};
25use datatypes::arrow::compute::{TakeOptions, take};
26use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::{Helper, VectorRef};
32use mito_codec::row_converter::{
33 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34 build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42 CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
43 DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
44 UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
48use crate::read::{Batch, BatchColumn, BatchReader};
49use crate::sst::parquet::flat_format::primary_key_column_index;
50use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
51use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
52
53pub struct CompatReader<R> {
55 reader: R,
57 compat: PrimaryKeyCompatBatch,
59}
60
61impl<R> CompatReader<R> {
62 pub fn new(
67 mapper: &ProjectionMapper,
68 reader_meta: RegionMetadataRef,
69 reader: R,
70 ) -> Result<CompatReader<R>> {
71 Ok(CompatReader {
72 reader,
73 compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
74 })
75 }
76}
77
78#[async_trait::async_trait]
79impl<R: BatchReader> BatchReader for CompatReader<R> {
80 async fn next_batch(&mut self) -> Result<Option<Batch>> {
81 let Some(mut batch) = self.reader.next_batch().await? else {
82 return Ok(None);
83 };
84
85 batch = self.compat.compat_batch(batch)?;
86
87 Ok(Some(batch))
88 }
89}
90
91pub(crate) enum CompatBatch {
93 PrimaryKey(PrimaryKeyCompatBatch),
95 Flat(FlatCompatBatch),
97}
98
99impl CompatBatch {
100 #[allow(dead_code)]
102 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
103 match self {
104 CompatBatch::PrimaryKey(batch) => Some(batch),
105 _ => None,
106 }
107 }
108
109 pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
111 match self {
112 CompatBatch::Flat(batch) => Some(batch),
113 _ => None,
114 }
115 }
116}
117
118pub(crate) struct PrimaryKeyCompatBatch {
120 rewrite_pk: Option<RewritePrimaryKey>,
122 compat_pk: Option<CompatPrimaryKey>,
124 compat_fields: Option<CompatFields>,
126}
127
128impl PrimaryKeyCompatBatch {
129 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
133 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
134 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
135 let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
136 reason: "Unexpected format",
137 })?;
138 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
139
140 Ok(Self {
141 rewrite_pk,
142 compat_pk,
143 compat_fields,
144 })
145 }
146
147 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
149 if let Some(rewrite_pk) = &self.rewrite_pk {
150 batch = rewrite_pk.compat(batch)?;
151 }
152 if let Some(compat_pk) = &self.compat_pk {
153 batch = compat_pk.compat(batch)?;
154 }
155 if let Some(compat_fields) = &self.compat_fields {
156 batch = compat_fields.compat(batch)?;
157 }
158
159 Ok(batch)
160 }
161}
162
163pub(crate) fn has_same_columns_and_pk_encoding(
165 left: &RegionMetadata,
166 right: &RegionMetadata,
167) -> bool {
168 if left.primary_key_encoding != right.primary_key_encoding {
169 return false;
170 }
171
172 if left.column_metadatas.len() != right.column_metadatas.len() {
173 return false;
174 }
175
176 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
177 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
178 return false;
179 }
180 debug_assert_eq!(
181 left_col.column_schema.data_type,
182 right_col.column_schema.data_type
183 );
184 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
185 }
186
187 true
188}
189
190pub(crate) struct FlatCompatBatch {
192 index_or_defaults: Vec<IndexOrDefault>,
194 arrow_schema: SchemaRef,
196 compat_pk: FlatCompatPrimaryKey,
198}
199
200impl FlatCompatBatch {
201 pub(crate) fn try_new(
208 mapper: &FlatProjectionMapper,
209 actual: &RegionMetadataRef,
210 format_projection: &FormatProjection,
211 compaction: bool,
212 ) -> Result<Option<Self>> {
213 let actual_schema = flat_projected_columns(actual, format_projection);
214 let expect_schema = mapper.batch_schema();
215 if expect_schema == actual_schema {
216 return Ok(None);
219 }
220
221 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
222 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
224 }
225
226 let (index_or_defaults, fields) =
227 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
228
229 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
230
231 Ok(Some(Self {
232 index_or_defaults,
233 arrow_schema: Arc::new(Schema::new(fields)),
234 compat_pk,
235 }))
236 }
237
238 fn compute_index_and_fields(
239 actual_schema: &[(ColumnId, ConcreteDataType)],
240 expect_schema: &[(ColumnId, ConcreteDataType)],
241 expect_metadata: &RegionMetadata,
242 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
243 let actual_schema_index: HashMap<_, _> = actual_schema
245 .iter()
246 .enumerate()
247 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
248 .collect();
249
250 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
251 let mut fields = Vec::with_capacity(expect_schema.len());
252 for (column_id, expect_data_type) in expect_schema {
253 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
255 let expect_column = &expect_metadata.column_metadatas[column_index];
256 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
257 if expect_column.semantic_type == SemanticType::Tag {
259 fields.push(tag_maybe_to_dictionary_field(
260 &expect_column.column_schema.data_type,
261 column_field,
262 ));
263 } else {
264 fields.push(column_field.clone());
265 };
266
267 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
268 let mut cast_type = None;
269
270 if expect_data_type != *actual_data_type {
272 cast_type = Some(expect_data_type.clone())
273 }
274 index_or_defaults.push(IndexOrDefault::Index {
276 pos: *index,
277 cast_type,
278 });
279 } else {
280 let default_vector = expect_column
282 .column_schema
283 .create_default_vector(1)
284 .context(CreateDefaultSnafu {
285 region_id: expect_metadata.region_id,
286 column: &expect_column.column_schema.name,
287 })?
288 .with_context(|| CompatReaderSnafu {
289 region_id: expect_metadata.region_id,
290 reason: format!(
291 "column {} does not have a default value to read",
292 expect_column.column_schema.name
293 ),
294 })?;
295 index_or_defaults.push(IndexOrDefault::DefaultValue {
296 column_id: expect_column.column_id,
297 default_vector,
298 semantic_type: expect_column.semantic_type,
299 });
300 };
301 }
302 fields.extend_from_slice(&internal_fields());
303
304 Ok((index_or_defaults, fields))
305 }
306
307 fn try_new_compact_sparse(
308 mapper: &FlatProjectionMapper,
309 actual: &RegionMetadataRef,
310 ) -> Result<Option<Self>> {
311 ensure!(
314 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
315 UnsupportedOperationSnafu {
316 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
317 }
318 );
319
320 let actual_schema: Vec<_> = actual
323 .field_columns()
324 .chain([actual.time_index_column()])
325 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
326 .collect();
327 let expect_schema: Vec<_> = mapper
328 .metadata()
329 .field_columns()
330 .chain([mapper.metadata().time_index_column()])
331 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
332 .collect();
333
334 let (index_or_defaults, fields) =
335 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
336
337 let compat_pk = FlatCompatPrimaryKey::default();
338
339 Ok(Some(Self {
340 index_or_defaults,
341 arrow_schema: Arc::new(Schema::new(fields)),
342 compat_pk,
343 }))
344 }
345
346 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
348 let len = batch.num_rows();
349 let columns = self
350 .index_or_defaults
351 .iter()
352 .map(|index_or_default| match index_or_default {
353 IndexOrDefault::Index { pos, cast_type } => {
354 let old_column = batch.column(*pos);
355
356 if let Some(ty) = cast_type {
357 let casted = if let Some(json_type) = ty.as_json() {
358 align_json_array(old_column, &json_type.as_arrow_type())
359 .context(RecordBatchSnafu)?
360 } else {
361 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
362 .context(ComputeArrowSnafu)?
363 };
364 Ok(casted)
365 } else {
366 Ok(old_column.clone())
367 }
368 }
369 IndexOrDefault::DefaultValue {
370 column_id: _,
371 default_vector,
372 semantic_type,
373 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
374 })
375 .chain(
376 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
378 .iter()
379 .map(|col| Ok(col.clone())),
380 )
381 .collect::<Result<Vec<_>>>()?;
382
383 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
384 .context(NewRecordBatchSnafu)?;
385
386 self.compat_pk.compat(compat_batch)
388 }
389}
390
391fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
393 assert_eq!(1, vector.len());
394 let data_type = vector.data_type();
395 if is_tag && data_type.is_string() {
396 let values = vector.to_arrow_array();
397 if values.is_null(0) {
398 let keys = UInt32Array::new_null(to_len);
400 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
401 } else {
402 let keys = UInt32Array::from_value(0, to_len);
403 Ok(Arc::new(DictionaryArray::new(keys, values)))
404 }
405 } else {
406 let keys = UInt32Array::from_value(0, to_len);
407 take(
408 &vector.to_arrow_array(),
409 &keys,
410 Some(TakeOptions {
411 check_bounds: false,
412 }),
413 )
414 .context(ComputeArrowSnafu)
415 }
416}
417
418#[derive(Debug)]
420struct CompatPrimaryKey {
421 converter: Arc<dyn PrimaryKeyCodec>,
423 values: Vec<(ColumnId, Value)>,
425}
426
427impl CompatPrimaryKey {
428 fn compat(&self, mut batch: Batch) -> Result<Batch> {
430 let mut buffer = Vec::with_capacity(
431 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
432 );
433 buffer.extend_from_slice(batch.primary_key());
434 self.converter
435 .encode_values(&self.values, &mut buffer)
436 .context(EncodeSnafu)?;
437
438 batch.set_primary_key(buffer);
439
440 if let Some(pk_values) = &mut batch.pk_values {
442 pk_values.extend(&self.values);
443 }
444
445 Ok(batch)
446 }
447}
448
449#[derive(Debug)]
451struct CompatFields {
452 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
454 index_or_defaults: Vec<IndexOrDefault>,
456}
457
458impl CompatFields {
459 fn compat(&self, batch: Batch) -> Result<Batch> {
461 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
462 debug_assert!(
463 self.actual_fields
464 .iter()
465 .zip(batch.fields())
466 .all(|((id, _), batch_column)| *id == batch_column.column_id)
467 );
468
469 let len = batch.num_rows();
470 self.index_or_defaults
471 .iter()
472 .map(|index_or_default| match index_or_default {
473 IndexOrDefault::Index { pos, cast_type } => {
474 let old_column = &batch.fields()[*pos];
475
476 let data = if let Some(ty) = cast_type {
477 if let Some(json_type) = ty.as_json() {
478 let json_array = old_column.data.to_arrow_array();
479 let json_array =
480 align_json_array(&json_array, &json_type.as_arrow_type())
481 .context(RecordBatchSnafu)?;
482 Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
483 } else {
484 old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
485 from: old_column.data.data_type(),
486 to: ty.clone(),
487 })?
488 }
489 } else {
490 old_column.data.clone()
491 };
492 Ok(BatchColumn {
493 column_id: old_column.column_id,
494 data,
495 })
496 }
497 IndexOrDefault::DefaultValue {
498 column_id,
499 default_vector,
500 semantic_type: _,
501 } => {
502 let data = default_vector.replicate(&[len]);
503 Ok(BatchColumn {
504 column_id: *column_id,
505 data,
506 })
507 }
508 })
509 .collect::<Result<Vec<_>>>()
510 .and_then(|fields| batch.with_fields(fields))
511 }
512}
513
514fn may_rewrite_primary_key(
515 expect: &RegionMetadata,
516 actual: &RegionMetadata,
517) -> Option<RewritePrimaryKey> {
518 if expect.primary_key_encoding == actual.primary_key_encoding {
519 return None;
520 }
521
522 let fields = expect.primary_key.clone();
523 let original = build_primary_key_codec(actual);
524 let new = build_primary_key_codec(expect);
525
526 Some(RewritePrimaryKey {
527 original,
528 new,
529 fields,
530 })
531}
532
533fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
535 ensure!(
536 actual.primary_key.len() <= expect.primary_key.len(),
537 CompatReaderSnafu {
538 region_id: expect.region_id,
539 reason: format!(
540 "primary key has more columns {} than expect {}",
541 actual.primary_key.len(),
542 expect.primary_key.len()
543 ),
544 }
545 );
546 ensure!(
547 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
548 CompatReaderSnafu {
549 region_id: expect.region_id,
550 reason: format!(
551 "primary key has different prefix, expect: {:?}, actual: {:?}",
552 expect.primary_key, actual.primary_key
553 ),
554 }
555 );
556
557 Ok(actual.primary_key.len() == expect.primary_key.len())
558}
559
560fn may_compat_primary_key(
562 expect: &RegionMetadata,
563 actual: &RegionMetadata,
564) -> Result<Option<CompatPrimaryKey>> {
565 if is_primary_key_same(expect, actual)? {
566 return Ok(None);
567 }
568
569 let to_add = &expect.primary_key[actual.primary_key.len()..];
571 let mut fields = Vec::with_capacity(to_add.len());
572 let mut values = Vec::with_capacity(to_add.len());
573 for column_id in to_add {
574 let column = expect.column_by_id(*column_id).unwrap();
576 fields.push((
577 *column_id,
578 SortField::new(column.column_schema.data_type.clone()),
579 ));
580 let default_value = column
581 .column_schema
582 .create_default()
583 .context(CreateDefaultSnafu {
584 region_id: expect.region_id,
585 column: &column.column_schema.name,
586 })?
587 .with_context(|| CompatReaderSnafu {
588 region_id: expect.region_id,
589 reason: format!(
590 "key column {} does not have a default value to read",
591 column.column_schema.name
592 ),
593 })?;
594 values.push((*column_id, default_value));
595 }
596 let converter =
598 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
599
600 Ok(Some(CompatPrimaryKey { converter, values }))
601}
602
603fn may_compat_fields(
605 mapper: &PrimaryKeyProjectionMapper,
606 actual: &RegionMetadata,
607) -> Result<Option<CompatFields>> {
608 let expect_fields = mapper.batch_fields();
609 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
610 if expect_fields == actual_fields {
611 return Ok(None);
612 }
613
614 let source_field_index: HashMap<_, _> = actual_fields
615 .iter()
616 .enumerate()
617 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
618 .collect();
619
620 let index_or_defaults = expect_fields
621 .iter()
622 .map(|(column_id, expect_data_type)| {
623 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
624 let mut cast_type = None;
625
626 if expect_data_type != *actual_data_type {
627 cast_type = Some(expect_data_type.clone())
628 }
629 Ok(IndexOrDefault::Index {
631 pos: *index,
632 cast_type,
633 })
634 } else {
635 let column = mapper.metadata().column_by_id(*column_id).unwrap();
637 let default_vector = column
639 .column_schema
640 .create_default_vector(1)
641 .context(CreateDefaultSnafu {
642 region_id: mapper.metadata().region_id,
643 column: &column.column_schema.name,
644 })?
645 .with_context(|| CompatReaderSnafu {
646 region_id: mapper.metadata().region_id,
647 reason: format!(
648 "column {} does not have a default value to read",
649 column.column_schema.name
650 ),
651 })?;
652 Ok(IndexOrDefault::DefaultValue {
653 column_id: column.column_id,
654 default_vector,
655 semantic_type: SemanticType::Field,
656 })
657 }
658 })
659 .collect::<Result<Vec<_>>>()?;
660
661 Ok(Some(CompatFields {
662 actual_fields,
663 index_or_defaults,
664 }))
665}
666
667#[derive(Debug)]
669enum IndexOrDefault {
670 Index {
672 pos: usize,
673 cast_type: Option<ConcreteDataType>,
674 },
675 DefaultValue {
677 column_id: ColumnId,
679 default_vector: VectorRef,
681 semantic_type: SemanticType,
683 },
684}
685
686struct RewritePrimaryKey {
688 original: Arc<dyn PrimaryKeyCodec>,
690 new: Arc<dyn PrimaryKeyCodec>,
692 fields: Vec<ColumnId>,
694}
695
696impl RewritePrimaryKey {
697 fn compat(&self, mut batch: Batch) -> Result<Batch> {
699 if batch.pk_values().is_none() {
700 let new_pk_values = self
701 .original
702 .decode(batch.primary_key())
703 .context(DecodeSnafu)?;
704 batch.set_pk_values(new_pk_values);
705 }
706 let values = batch.pk_values().unwrap();
708
709 let mut buffer = Vec::with_capacity(
710 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
711 );
712 match values {
713 CompositeValues::Dense(values) => {
714 self.new
715 .encode_values(values.as_slice(), &mut buffer)
716 .context(EncodeSnafu)?;
717 }
718 CompositeValues::Sparse(values) => {
719 let values = self
720 .fields
721 .iter()
722 .map(|id| {
723 let value = values.get_or_null(*id);
724 (*id, value.as_value_ref())
725 })
726 .collect::<Vec<_>>();
727 self.new
728 .encode_value_refs(&values, &mut buffer)
729 .context(EncodeSnafu)?;
730 }
731 }
732 batch.set_primary_key(buffer);
733
734 Ok(batch)
735 }
736}
737
738struct FlatRewritePrimaryKey {
740 codec: Arc<dyn PrimaryKeyCodec>,
742 metadata: RegionMetadataRef,
744 old_codec: Arc<dyn PrimaryKeyCodec>,
747}
748
749impl FlatRewritePrimaryKey {
750 fn new(
751 expect: &RegionMetadataRef,
752 actual: &RegionMetadataRef,
753 ) -> Option<FlatRewritePrimaryKey> {
754 if expect.primary_key_encoding == actual.primary_key_encoding {
755 return None;
756 }
757 let codec = build_primary_key_codec(expect);
758 let old_codec = build_primary_key_codec(actual);
759
760 Some(FlatRewritePrimaryKey {
761 codec,
762 metadata: expect.clone(),
763 old_codec,
764 })
765 }
766
767 fn rewrite_key(
770 &self,
771 append_values: &[(ColumnId, Value)],
772 batch: RecordBatch,
773 ) -> Result<RecordBatch> {
774 let old_pk_dict_array = batch
775 .column(primary_key_column_index(batch.num_columns()))
776 .as_any()
777 .downcast_ref::<PrimaryKeyArray>()
778 .unwrap();
779 let old_pk_values_array = old_pk_dict_array
780 .values()
781 .as_any()
782 .downcast_ref::<BinaryArray>()
783 .unwrap();
784 let mut builder = BinaryBuilder::with_capacity(
785 old_pk_values_array.len(),
786 old_pk_values_array.value_data().len(),
787 );
788
789 let mut buffer = Vec::with_capacity(
791 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
792 );
793 let mut column_id_values = Vec::new();
794 for value in old_pk_values_array.iter() {
796 let Some(old_pk) = value else {
797 builder.append_null();
798 continue;
799 };
800 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
802 pk_values.extend(append_values);
803
804 buffer.clear();
805 column_id_values.clear();
806 match pk_values {
808 CompositeValues::Dense(dense_values) => {
809 self.codec
810 .encode_values(dense_values.as_slice(), &mut buffer)
811 .context(EncodeSnafu)?;
812 }
813 CompositeValues::Sparse(sparse_values) => {
814 for id in &self.metadata.primary_key {
815 let value = sparse_values.get_or_null(*id);
816 column_id_values.push((*id, value.clone()));
817 }
818 self.codec
819 .encode_values(&column_id_values, &mut buffer)
820 .context(EncodeSnafu)?;
821 }
822 }
823 builder.append_value(&buffer);
824 }
825 let new_pk_values_array = Arc::new(builder.finish());
826 let new_pk_dict_array =
827 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
828
829 let mut columns = batch.columns().to_vec();
830 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
831
832 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
833 }
834}
835
836#[derive(Default)]
838struct FlatCompatPrimaryKey {
839 rewriter: Option<FlatRewritePrimaryKey>,
841 converter: Option<Arc<dyn PrimaryKeyCodec>>,
843 values: Vec<(ColumnId, Value)>,
845}
846
847impl FlatCompatPrimaryKey {
848 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
849 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
850
851 if is_primary_key_same(expect, actual)? {
852 return Ok(Self {
853 rewriter,
854 converter: None,
855 values: Vec::new(),
856 });
857 }
858
859 let to_add = &expect.primary_key[actual.primary_key.len()..];
861 let mut values = Vec::with_capacity(to_add.len());
862 let mut fields = Vec::with_capacity(to_add.len());
863 for column_id in to_add {
864 let column = expect.column_by_id(*column_id).unwrap();
866 fields.push((
867 *column_id,
868 SortField::new(column.column_schema.data_type.clone()),
869 ));
870 let default_value = column
871 .column_schema
872 .create_default()
873 .context(CreateDefaultSnafu {
874 region_id: expect.region_id,
875 column: &column.column_schema.name,
876 })?
877 .with_context(|| CompatReaderSnafu {
878 region_id: expect.region_id,
879 reason: format!(
880 "key column {} does not have a default value to read",
881 column.column_schema.name
882 ),
883 })?;
884 values.push((*column_id, default_value));
885 }
886 debug_assert!(!fields.is_empty());
888
889 let converter = Some(build_primary_key_codec_with_fields(
891 expect.primary_key_encoding,
892 fields.into_iter(),
893 ));
894
895 Ok(Self {
896 rewriter,
897 converter,
898 values,
899 })
900 }
901
902 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
906 if let Some(rewriter) = &self.rewriter {
907 return rewriter.rewrite_key(&self.values, batch);
909 }
910
911 self.append_key(batch)
912 }
913
914 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
916 let Some(converter) = &self.converter else {
917 return Ok(batch);
918 };
919
920 let old_pk_dict_array = batch
921 .column(primary_key_column_index(batch.num_columns()))
922 .as_any()
923 .downcast_ref::<PrimaryKeyArray>()
924 .unwrap();
925 let old_pk_values_array = old_pk_dict_array
926 .values()
927 .as_any()
928 .downcast_ref::<BinaryArray>()
929 .unwrap();
930 let mut builder = BinaryBuilder::with_capacity(
931 old_pk_values_array.len(),
932 old_pk_values_array.value_data().len()
933 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
934 );
935
936 let mut buffer = Vec::with_capacity(
938 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
939 + converter.estimated_size().unwrap_or_default(),
940 );
941
942 for value in old_pk_values_array.iter() {
944 let Some(old_pk) = value else {
945 builder.append_null();
946 continue;
947 };
948
949 buffer.clear();
950 buffer.extend_from_slice(old_pk);
951 converter
952 .encode_values(&self.values, &mut buffer)
953 .context(EncodeSnafu)?;
954
955 builder.append_value(&buffer);
956 }
957
958 let new_pk_values_array = Arc::new(builder.finish());
959 let new_pk_dict_array =
960 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
961
962 let mut columns = batch.columns().to_vec();
964 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
965
966 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
967 }
968}
969
970#[cfg(test)]
971mod tests {
972 use std::sync::Arc;
973
974 use api::v1::{OpType, SemanticType};
975 use datatypes::arrow::array::{
976 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
977 TimestampMillisecondArray, UInt8Array, UInt64Array,
978 };
979 use datatypes::arrow::datatypes::UInt32Type;
980 use datatypes::arrow::record_batch::RecordBatch;
981 use datatypes::prelude::ConcreteDataType;
982 use datatypes::schema::ColumnSchema;
983 use datatypes::value::ValueRef;
984 use mito_codec::row_converter::{
985 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
986 };
987 use store_api::codec::PrimaryKeyEncoding;
988 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
989 use store_api::storage::RegionId;
990
991 use super::*;
992 use crate::read::flat_projection::FlatProjectionMapper;
993 use crate::sst::parquet::flat_format::FlatReadFormat;
994 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
995
996 fn new_metadata(
998 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
999 primary_key: &[ColumnId],
1000 ) -> RegionMetadata {
1001 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1002 for (id, semantic_type, data_type) in semantic_types {
1003 let column_schema = match semantic_type {
1004 SemanticType::Tag => {
1005 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
1006 }
1007 SemanticType::Field => {
1008 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1009 }
1010 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1011 };
1012
1013 builder.push_column_metadata(ColumnMetadata {
1014 column_schema,
1015 semantic_type: *semantic_type,
1016 column_id: *id,
1017 });
1018 }
1019 builder.primary_key(primary_key.to_vec());
1020 builder.build().unwrap()
1021 }
1022
1023 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1025 let fields = (0..keys.len())
1026 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1027 .collect();
1028 let converter = DensePrimaryKeyCodec::with_fields(fields);
1029 let row = keys.iter().map(|str_opt| match str_opt {
1030 Some(v) => ValueRef::String(v),
1031 None => ValueRef::Null,
1032 });
1033
1034 converter.encode(row).unwrap()
1035 }
1036
1037 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1039 let fields = (0..keys.len())
1040 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1041 .collect();
1042 let converter = SparsePrimaryKeyCodec::with_fields(fields);
1043 let row = keys
1044 .iter()
1045 .map(|(id, str_opt)| match str_opt {
1046 Some(v) => (*id, ValueRef::String(v)),
1047 None => (*id, ValueRef::Null),
1048 })
1049 .collect::<Vec<_>>();
1050 let mut buffer = vec![];
1051 converter.encode_value_refs(&row, &mut buffer).unwrap();
1052 buffer
1053 }
1054
1055 #[test]
1056 fn test_invalid_pk_len() {
1057 let reader_meta = new_metadata(
1058 &[
1059 (
1060 0,
1061 SemanticType::Timestamp,
1062 ConcreteDataType::timestamp_millisecond_datatype(),
1063 ),
1064 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1065 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1066 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1067 ],
1068 &[1, 2],
1069 );
1070 let expect_meta = new_metadata(
1071 &[
1072 (
1073 0,
1074 SemanticType::Timestamp,
1075 ConcreteDataType::timestamp_millisecond_datatype(),
1076 ),
1077 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1078 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1079 ],
1080 &[1],
1081 );
1082 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1083 }
1084
1085 #[test]
1086 fn test_different_pk() {
1087 let reader_meta = new_metadata(
1088 &[
1089 (
1090 0,
1091 SemanticType::Timestamp,
1092 ConcreteDataType::timestamp_millisecond_datatype(),
1093 ),
1094 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1095 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1096 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1097 ],
1098 &[2, 1],
1099 );
1100 let expect_meta = new_metadata(
1101 &[
1102 (
1103 0,
1104 SemanticType::Timestamp,
1105 ConcreteDataType::timestamp_millisecond_datatype(),
1106 ),
1107 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1108 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1109 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1110 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1111 ],
1112 &[1, 2, 4],
1113 );
1114 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1115 }
1116
1117 #[test]
1118 fn test_same_pk() {
1119 let reader_meta = new_metadata(
1120 &[
1121 (
1122 0,
1123 SemanticType::Timestamp,
1124 ConcreteDataType::timestamp_millisecond_datatype(),
1125 ),
1126 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1127 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1128 ],
1129 &[1],
1130 );
1131 assert!(
1132 may_compat_primary_key(&reader_meta, &reader_meta)
1133 .unwrap()
1134 .is_none()
1135 );
1136 }
1137
1138 #[test]
1139 fn test_same_pk_encoding() {
1140 let reader_meta = Arc::new(new_metadata(
1141 &[
1142 (
1143 0,
1144 SemanticType::Timestamp,
1145 ConcreteDataType::timestamp_millisecond_datatype(),
1146 ),
1147 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1148 ],
1149 &[1],
1150 ));
1151
1152 assert!(
1153 may_compat_primary_key(&reader_meta, &reader_meta)
1154 .unwrap()
1155 .is_none()
1156 );
1157 }
1158
1159 #[test]
1160 fn test_same_fields() {
1161 let reader_meta = Arc::new(new_metadata(
1162 &[
1163 (
1164 0,
1165 SemanticType::Timestamp,
1166 ConcreteDataType::timestamp_millisecond_datatype(),
1167 ),
1168 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1169 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1170 ],
1171 &[1],
1172 ));
1173 let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1174 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1175 }
1176
1177 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1179 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1180 for &pk in primary_keys {
1181 builder.append(pk).unwrap();
1182 }
1183 Arc::new(builder.finish())
1184 }
1185
1186 #[test]
1187 fn test_flat_compat_batch_with_missing_columns() {
1188 let actual_metadata = Arc::new(new_metadata(
1189 &[
1190 (
1191 0,
1192 SemanticType::Timestamp,
1193 ConcreteDataType::timestamp_millisecond_datatype(),
1194 ),
1195 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1196 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1197 ],
1198 &[1],
1199 ));
1200
1201 let expected_metadata = Arc::new(new_metadata(
1202 &[
1203 (
1204 0,
1205 SemanticType::Timestamp,
1206 ConcreteDataType::timestamp_millisecond_datatype(),
1207 ),
1208 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1209 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1210 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1212 ],
1213 &[1],
1214 ));
1215
1216 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1217 let read_format = FlatReadFormat::new(
1218 actual_metadata.clone(),
1219 [0, 1, 2, 3].into_iter(),
1220 None,
1221 "test",
1222 false,
1223 )
1224 .unwrap();
1225 let format_projection = read_format.format_projection();
1226
1227 let compat_batch =
1228 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1229 .unwrap()
1230 .unwrap();
1231
1232 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1233 tag_builder.append_value("tag1");
1234 tag_builder.append_value("tag1");
1235 let tag_dict_array = Arc::new(tag_builder.finish());
1236
1237 let k1 = encode_key(&[Some("tag1")]);
1238 let input_columns: Vec<ArrayRef> = vec![
1239 tag_dict_array.clone(),
1240 Arc::new(Int64Array::from(vec![100, 200])),
1241 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1242 build_flat_test_pk_array(&[&k1, &k1]),
1243 Arc::new(UInt64Array::from_iter_values([1, 2])),
1244 Arc::new(UInt8Array::from_iter_values([
1245 OpType::Put as u8,
1246 OpType::Put as u8,
1247 ])),
1248 ];
1249 let input_schema =
1250 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1251 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1252
1253 let result = compat_batch.compat(input_batch).unwrap();
1254
1255 let expected_schema =
1256 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1257
1258 let expected_columns: Vec<ArrayRef> = vec![
1259 tag_dict_array.clone(),
1260 Arc::new(Int64Array::from(vec![100, 200])),
1261 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1262 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1263 build_flat_test_pk_array(&[&k1, &k1]),
1264 Arc::new(UInt64Array::from_iter_values([1, 2])),
1265 Arc::new(UInt8Array::from_iter_values([
1266 OpType::Put as u8,
1267 OpType::Put as u8,
1268 ])),
1269 ];
1270 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1271
1272 assert_eq!(expected_batch, result);
1273 }
1274
1275 #[test]
1276 fn test_flat_compat_batch_with_read_projection_superset() {
1277 let actual_metadata = Arc::new(new_metadata(
1278 &[
1279 (
1280 0,
1281 SemanticType::Timestamp,
1282 ConcreteDataType::timestamp_millisecond_datatype(),
1283 ),
1284 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1285 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1286 ],
1287 &[1],
1288 ));
1289
1290 let expected_metadata = Arc::new(new_metadata(
1291 &[
1292 (
1293 0,
1294 SemanticType::Timestamp,
1295 ConcreteDataType::timestamp_millisecond_datatype(),
1296 ),
1297 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1298 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1299 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1301 ],
1302 &[1],
1303 ));
1304
1305 let mapper = FlatProjectionMapper::new_with_read_columns(
1307 &expected_metadata,
1308 vec![1, 2],
1309 vec![1, 2, 3],
1310 )
1311 .unwrap();
1312 let read_format = FlatReadFormat::new(
1313 actual_metadata.clone(),
1314 [1, 2, 3].into_iter(),
1315 None,
1316 "test",
1317 false,
1318 )
1319 .unwrap();
1320 let format_projection = read_format.format_projection();
1321
1322 let compat_batch =
1323 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1324 .unwrap()
1325 .unwrap();
1326
1327 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1328 tag_builder.append_value("tag1");
1329 tag_builder.append_value("tag1");
1330 let tag_dict_array = Arc::new(tag_builder.finish());
1331
1332 let k1 = encode_key(&[Some("tag1")]);
1333 let input_columns: Vec<ArrayRef> = vec![
1334 tag_dict_array.clone(),
1335 Arc::new(Int64Array::from(vec![100, 200])),
1336 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1337 build_flat_test_pk_array(&[&k1, &k1]),
1338 Arc::new(UInt64Array::from_iter_values([1, 2])),
1339 Arc::new(UInt8Array::from_iter_values([
1340 OpType::Put as u8,
1341 OpType::Put as u8,
1342 ])),
1343 ];
1344 let input_schema =
1345 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1346 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1347
1348 let result = compat_batch.compat(input_batch).unwrap();
1349
1350 let expected_schema =
1351 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1352 let expected_columns: Vec<ArrayRef> = vec![
1353 tag_dict_array.clone(),
1354 Arc::new(Int64Array::from(vec![100, 200])),
1355 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1356 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1357 build_flat_test_pk_array(&[&k1, &k1]),
1358 Arc::new(UInt64Array::from_iter_values([1, 2])),
1359 Arc::new(UInt8Array::from_iter_values([
1360 OpType::Put as u8,
1361 OpType::Put as u8,
1362 ])),
1363 ];
1364 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1365
1366 assert_eq!(expected_batch, result);
1367 }
1368
1369 #[test]
1370 fn test_flat_compat_batch_with_different_pk_encoding() {
1371 let mut actual_metadata = new_metadata(
1372 &[
1373 (
1374 0,
1375 SemanticType::Timestamp,
1376 ConcreteDataType::timestamp_millisecond_datatype(),
1377 ),
1378 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1379 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1380 ],
1381 &[1],
1382 );
1383 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1384 let actual_metadata = Arc::new(actual_metadata);
1385
1386 let mut expected_metadata = new_metadata(
1387 &[
1388 (
1389 0,
1390 SemanticType::Timestamp,
1391 ConcreteDataType::timestamp_millisecond_datatype(),
1392 ),
1393 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1394 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1395 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1396 ],
1397 &[1, 3],
1398 );
1399 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1400 let expected_metadata = Arc::new(expected_metadata);
1401
1402 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1403 let read_format = FlatReadFormat::new(
1404 actual_metadata.clone(),
1405 [0, 1, 2, 3].into_iter(),
1406 None,
1407 "test",
1408 false,
1409 )
1410 .unwrap();
1411 let format_projection = read_format.format_projection();
1412
1413 let compat_batch =
1414 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1415 .unwrap()
1416 .unwrap();
1417
1418 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1420 tag1_builder.append_value("tag1");
1421 tag1_builder.append_value("tag1");
1422 let tag1_dict_array = Arc::new(tag1_builder.finish());
1423
1424 let k1 = encode_key(&[Some("tag1")]);
1425 let input_columns: Vec<ArrayRef> = vec![
1426 tag1_dict_array.clone(),
1427 Arc::new(Int64Array::from(vec![100, 200])),
1428 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1429 build_flat_test_pk_array(&[&k1, &k1]),
1430 Arc::new(UInt64Array::from_iter_values([1, 2])),
1431 Arc::new(UInt8Array::from_iter_values([
1432 OpType::Put as u8,
1433 OpType::Put as u8,
1434 ])),
1435 ];
1436 let input_schema =
1437 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1438 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1439
1440 let result = compat_batch.compat(input_batch).unwrap();
1441
1442 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1443 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1444 null_tag_builder.append_nulls(2);
1445 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1446 let expected_columns: Vec<ArrayRef> = vec![
1447 tag1_dict_array.clone(),
1448 null_tag_dict_array,
1449 Arc::new(Int64Array::from(vec![100, 200])),
1450 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1451 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1452 Arc::new(UInt64Array::from_iter_values([1, 2])),
1453 Arc::new(UInt8Array::from_iter_values([
1454 OpType::Put as u8,
1455 OpType::Put as u8,
1456 ])),
1457 ];
1458 let output_schema =
1459 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1460 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1461
1462 assert_eq!(expected_batch, result);
1463 }
1464
1465 #[test]
1466 fn test_flat_compat_batch_compact_sparse() {
1467 let mut actual_metadata = new_metadata(
1468 &[
1469 (
1470 0,
1471 SemanticType::Timestamp,
1472 ConcreteDataType::timestamp_millisecond_datatype(),
1473 ),
1474 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1475 ],
1476 &[],
1477 );
1478 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1479 let actual_metadata = Arc::new(actual_metadata);
1480
1481 let mut expected_metadata = new_metadata(
1482 &[
1483 (
1484 0,
1485 SemanticType::Timestamp,
1486 ConcreteDataType::timestamp_millisecond_datatype(),
1487 ),
1488 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1489 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1490 ],
1491 &[],
1492 );
1493 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1494 let expected_metadata = Arc::new(expected_metadata);
1495
1496 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1497 let read_format = FlatReadFormat::new(
1498 actual_metadata.clone(),
1499 [0, 2, 3].into_iter(),
1500 None,
1501 "test",
1502 true,
1503 )
1504 .unwrap();
1505 let format_projection = read_format.format_projection();
1506
1507 let compat_batch =
1508 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1509 .unwrap()
1510 .unwrap();
1511
1512 let sparse_k1 = encode_sparse_key(&[]);
1513 let input_columns: Vec<ArrayRef> = vec![
1514 Arc::new(Int64Array::from(vec![100, 200])),
1515 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1516 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1517 Arc::new(UInt64Array::from_iter_values([1, 2])),
1518 Arc::new(UInt8Array::from_iter_values([
1519 OpType::Put as u8,
1520 OpType::Put as u8,
1521 ])),
1522 ];
1523 let input_schema =
1524 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1525 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1526
1527 let result = compat_batch.compat(input_batch).unwrap();
1528
1529 let expected_columns: Vec<ArrayRef> = vec![
1530 Arc::new(Int64Array::from(vec![100, 200])),
1531 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1532 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1533 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1534 Arc::new(UInt64Array::from_iter_values([1, 2])),
1535 Arc::new(UInt8Array::from_iter_values([
1536 OpType::Put as u8,
1537 OpType::Put as u8,
1538 ])),
1539 ];
1540 let output_schema =
1541 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1542 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1543
1544 assert_eq!(expected_batch, result);
1545 }
1546}