1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use common_recordbatch::recordbatch::align_json_array;
22use datatypes::arrow::array::{
23 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
24};
25use datatypes::arrow::compute::{TakeOptions, take};
26use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::data_type::ConcreteDataType;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::{Helper, VectorRef};
32use mito_codec::row_converter::{
33 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34 build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42 CastVectorSnafu, CompatReaderSnafu, ComputeArrowSnafu, ConvertVectorSnafu, CreateDefaultSnafu,
43 DecodeSnafu, EncodeSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, UnexpectedSnafu,
44 UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::read::projection::{PrimaryKeyProjectionMapper, ProjectionMapper};
48use crate::read::{Batch, BatchColumn, BatchReader};
49use crate::sst::parquet::flat_format::primary_key_column_index;
50use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
51use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
52
53pub struct CompatReader<R> {
55 reader: R,
57 compat: PrimaryKeyCompatBatch,
59}
60
61impl<R> CompatReader<R> {
62 pub fn new(
67 mapper: &ProjectionMapper,
68 reader_meta: RegionMetadataRef,
69 reader: R,
70 ) -> Result<CompatReader<R>> {
71 Ok(CompatReader {
72 reader,
73 compat: PrimaryKeyCompatBatch::new(mapper, reader_meta)?,
74 })
75 }
76}
77
78#[async_trait::async_trait]
79impl<R: BatchReader> BatchReader for CompatReader<R> {
80 async fn next_batch(&mut self) -> Result<Option<Batch>> {
81 let Some(mut batch) = self.reader.next_batch().await? else {
82 return Ok(None);
83 };
84
85 batch = self.compat.compat_batch(batch)?;
86
87 Ok(Some(batch))
88 }
89}
90
91pub(crate) enum CompatBatch {
93 PrimaryKey(PrimaryKeyCompatBatch),
95 Flat(FlatCompatBatch),
97}
98
99impl CompatBatch {
100 pub(crate) fn as_primary_key(&self) -> Option<&PrimaryKeyCompatBatch> {
102 match self {
103 CompatBatch::PrimaryKey(batch) => Some(batch),
104 _ => None,
105 }
106 }
107
108 pub(crate) fn as_flat(&self) -> Option<&FlatCompatBatch> {
110 match self {
111 CompatBatch::Flat(batch) => Some(batch),
112 _ => None,
113 }
114 }
115}
116
117pub(crate) struct PrimaryKeyCompatBatch {
119 rewrite_pk: Option<RewritePrimaryKey>,
121 compat_pk: Option<CompatPrimaryKey>,
123 compat_fields: Option<CompatFields>,
125}
126
127impl PrimaryKeyCompatBatch {
128 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
132 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
133 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
134 let mapper = mapper.as_primary_key().context(UnexpectedSnafu {
135 reason: "Unexpected format",
136 })?;
137 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
138
139 Ok(Self {
140 rewrite_pk,
141 compat_pk,
142 compat_fields,
143 })
144 }
145
146 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
148 if let Some(rewrite_pk) = &self.rewrite_pk {
149 batch = rewrite_pk.compat(batch)?;
150 }
151 if let Some(compat_pk) = &self.compat_pk {
152 batch = compat_pk.compat(batch)?;
153 }
154 if let Some(compat_fields) = &self.compat_fields {
155 batch = compat_fields.compat(batch)?;
156 }
157
158 Ok(batch)
159 }
160}
161
162pub(crate) fn has_same_columns_and_pk_encoding(
164 left: &RegionMetadata,
165 right: &RegionMetadata,
166) -> bool {
167 if left.primary_key_encoding != right.primary_key_encoding {
168 return false;
169 }
170
171 if left.column_metadatas.len() != right.column_metadatas.len() {
172 return false;
173 }
174
175 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
176 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
177 return false;
178 }
179 debug_assert_eq!(
180 left_col.column_schema.data_type,
181 right_col.column_schema.data_type
182 );
183 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
184 }
185
186 true
187}
188
189pub(crate) struct FlatCompatBatch {
191 index_or_defaults: Vec<IndexOrDefault>,
193 arrow_schema: SchemaRef,
195 compat_pk: FlatCompatPrimaryKey,
197}
198
199impl FlatCompatBatch {
200 pub(crate) fn try_new(
207 mapper: &FlatProjectionMapper,
208 actual: &RegionMetadataRef,
209 format_projection: &FormatProjection,
210 compaction: bool,
211 ) -> Result<Option<Self>> {
212 let actual_schema = flat_projected_columns(actual, format_projection);
213 let expect_schema = mapper.batch_schema();
214 if expect_schema == actual_schema {
215 return Ok(None);
218 }
219
220 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
221 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
223 }
224
225 let (index_or_defaults, fields) =
226 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
227
228 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
229
230 Ok(Some(Self {
231 index_or_defaults,
232 arrow_schema: Arc::new(Schema::new(fields)),
233 compat_pk,
234 }))
235 }
236
237 fn compute_index_and_fields(
238 actual_schema: &[(ColumnId, ConcreteDataType)],
239 expect_schema: &[(ColumnId, ConcreteDataType)],
240 expect_metadata: &RegionMetadata,
241 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
242 let actual_schema_index: HashMap<_, _> = actual_schema
244 .iter()
245 .enumerate()
246 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
247 .collect();
248
249 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
250 let mut fields = Vec::with_capacity(expect_schema.len());
251 for (column_id, expect_data_type) in expect_schema {
252 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
254 let expect_column = &expect_metadata.column_metadatas[column_index];
255 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
256 if expect_column.semantic_type == SemanticType::Tag {
258 fields.push(tag_maybe_to_dictionary_field(
259 &expect_column.column_schema.data_type,
260 column_field,
261 ));
262 } else {
263 fields.push(column_field.clone());
264 };
265
266 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
267 let mut cast_type = None;
268
269 if expect_data_type != *actual_data_type {
271 cast_type = Some(expect_data_type.clone())
272 }
273 index_or_defaults.push(IndexOrDefault::Index {
275 pos: *index,
276 cast_type,
277 });
278 } else {
279 let default_vector = expect_column
281 .column_schema
282 .create_default_vector(1)
283 .context(CreateDefaultSnafu {
284 region_id: expect_metadata.region_id,
285 column: &expect_column.column_schema.name,
286 })?
287 .with_context(|| CompatReaderSnafu {
288 region_id: expect_metadata.region_id,
289 reason: format!(
290 "column {} does not have a default value to read",
291 expect_column.column_schema.name
292 ),
293 })?;
294 index_or_defaults.push(IndexOrDefault::DefaultValue {
295 column_id: expect_column.column_id,
296 default_vector,
297 semantic_type: expect_column.semantic_type,
298 });
299 };
300 }
301 fields.extend_from_slice(&internal_fields());
302
303 Ok((index_or_defaults, fields))
304 }
305
306 fn try_new_compact_sparse(
307 mapper: &FlatProjectionMapper,
308 actual: &RegionMetadataRef,
309 ) -> Result<Option<Self>> {
310 ensure!(
313 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
314 UnsupportedOperationSnafu {
315 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
316 }
317 );
318
319 let actual_schema: Vec<_> = actual
322 .field_columns()
323 .chain([actual.time_index_column()])
324 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
325 .collect();
326 let expect_schema: Vec<_> = mapper
327 .metadata()
328 .field_columns()
329 .chain([mapper.metadata().time_index_column()])
330 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
331 .collect();
332
333 let (index_or_defaults, fields) =
334 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
335
336 let compat_pk = FlatCompatPrimaryKey::default();
337
338 Ok(Some(Self {
339 index_or_defaults,
340 arrow_schema: Arc::new(Schema::new(fields)),
341 compat_pk,
342 }))
343 }
344
345 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
347 let len = batch.num_rows();
348 let columns = self
349 .index_or_defaults
350 .iter()
351 .map(|index_or_default| match index_or_default {
352 IndexOrDefault::Index { pos, cast_type } => {
353 let old_column = batch.column(*pos);
354
355 if let Some(ty) = cast_type {
356 let casted = if let Some(json_type) = ty.as_json() {
357 align_json_array(old_column, &json_type.as_arrow_type())
358 .context(RecordBatchSnafu)?
359 } else {
360 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
361 .context(ComputeArrowSnafu)?
362 };
363 Ok(casted)
364 } else {
365 Ok(old_column.clone())
366 }
367 }
368 IndexOrDefault::DefaultValue {
369 column_id: _,
370 default_vector,
371 semantic_type,
372 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
373 })
374 .chain(
375 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
377 .iter()
378 .map(|col| Ok(col.clone())),
379 )
380 .collect::<Result<Vec<_>>>()?;
381
382 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
383 .context(NewRecordBatchSnafu)?;
384
385 self.compat_pk.compat(compat_batch)
387 }
388}
389
390fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
392 assert_eq!(1, vector.len());
393 let data_type = vector.data_type();
394 if is_tag && data_type.is_string() {
395 let values = vector.to_arrow_array();
396 if values.is_null(0) {
397 let keys = UInt32Array::new_null(to_len);
399 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
400 } else {
401 let keys = UInt32Array::from_value(0, to_len);
402 Ok(Arc::new(DictionaryArray::new(keys, values)))
403 }
404 } else {
405 let keys = UInt32Array::from_value(0, to_len);
406 take(
407 &vector.to_arrow_array(),
408 &keys,
409 Some(TakeOptions {
410 check_bounds: false,
411 }),
412 )
413 .context(ComputeArrowSnafu)
414 }
415}
416
417#[derive(Debug)]
419struct CompatPrimaryKey {
420 converter: Arc<dyn PrimaryKeyCodec>,
422 values: Vec<(ColumnId, Value)>,
424}
425
426impl CompatPrimaryKey {
427 fn compat(&self, mut batch: Batch) -> Result<Batch> {
429 let mut buffer = Vec::with_capacity(
430 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
431 );
432 buffer.extend_from_slice(batch.primary_key());
433 self.converter
434 .encode_values(&self.values, &mut buffer)
435 .context(EncodeSnafu)?;
436
437 batch.set_primary_key(buffer);
438
439 if let Some(pk_values) = &mut batch.pk_values {
441 pk_values.extend(&self.values);
442 }
443
444 Ok(batch)
445 }
446}
447
448#[derive(Debug)]
450struct CompatFields {
451 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
453 index_or_defaults: Vec<IndexOrDefault>,
455}
456
457impl CompatFields {
458 fn compat(&self, batch: Batch) -> Result<Batch> {
460 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
461 debug_assert!(
462 self.actual_fields
463 .iter()
464 .zip(batch.fields())
465 .all(|((id, _), batch_column)| *id == batch_column.column_id)
466 );
467
468 let len = batch.num_rows();
469 self.index_or_defaults
470 .iter()
471 .map(|index_or_default| match index_or_default {
472 IndexOrDefault::Index { pos, cast_type } => {
473 let old_column = &batch.fields()[*pos];
474
475 let data = if let Some(ty) = cast_type {
476 if let Some(json_type) = ty.as_json() {
477 let json_array = old_column.data.to_arrow_array();
478 let json_array =
479 align_json_array(&json_array, &json_type.as_arrow_type())
480 .context(RecordBatchSnafu)?;
481 Helper::try_into_vector(&json_array).context(ConvertVectorSnafu)?
482 } else {
483 old_column.data.cast(ty).with_context(|_| CastVectorSnafu {
484 from: old_column.data.data_type(),
485 to: ty.clone(),
486 })?
487 }
488 } else {
489 old_column.data.clone()
490 };
491 Ok(BatchColumn {
492 column_id: old_column.column_id,
493 data,
494 })
495 }
496 IndexOrDefault::DefaultValue {
497 column_id,
498 default_vector,
499 semantic_type: _,
500 } => {
501 let data = default_vector.replicate(&[len]);
502 Ok(BatchColumn {
503 column_id: *column_id,
504 data,
505 })
506 }
507 })
508 .collect::<Result<Vec<_>>>()
509 .and_then(|fields| batch.with_fields(fields))
510 }
511}
512
513fn may_rewrite_primary_key(
514 expect: &RegionMetadata,
515 actual: &RegionMetadata,
516) -> Option<RewritePrimaryKey> {
517 if expect.primary_key_encoding == actual.primary_key_encoding {
518 return None;
519 }
520
521 let fields = expect.primary_key.clone();
522 let original = build_primary_key_codec(actual);
523 let new = build_primary_key_codec(expect);
524
525 Some(RewritePrimaryKey {
526 original,
527 new,
528 fields,
529 })
530}
531
532fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
534 ensure!(
535 actual.primary_key.len() <= expect.primary_key.len(),
536 CompatReaderSnafu {
537 region_id: expect.region_id,
538 reason: format!(
539 "primary key has more columns {} than expect {}",
540 actual.primary_key.len(),
541 expect.primary_key.len()
542 ),
543 }
544 );
545 ensure!(
546 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
547 CompatReaderSnafu {
548 region_id: expect.region_id,
549 reason: format!(
550 "primary key has different prefix, expect: {:?}, actual: {:?}",
551 expect.primary_key, actual.primary_key
552 ),
553 }
554 );
555
556 Ok(actual.primary_key.len() == expect.primary_key.len())
557}
558
559fn may_compat_primary_key(
561 expect: &RegionMetadata,
562 actual: &RegionMetadata,
563) -> Result<Option<CompatPrimaryKey>> {
564 if is_primary_key_same(expect, actual)? {
565 return Ok(None);
566 }
567
568 let to_add = &expect.primary_key[actual.primary_key.len()..];
570 let mut fields = Vec::with_capacity(to_add.len());
571 let mut values = Vec::with_capacity(to_add.len());
572 for column_id in to_add {
573 let column = expect.column_by_id(*column_id).unwrap();
575 fields.push((
576 *column_id,
577 SortField::new(column.column_schema.data_type.clone()),
578 ));
579 let default_value = column
580 .column_schema
581 .create_default()
582 .context(CreateDefaultSnafu {
583 region_id: expect.region_id,
584 column: &column.column_schema.name,
585 })?
586 .with_context(|| CompatReaderSnafu {
587 region_id: expect.region_id,
588 reason: format!(
589 "key column {} does not have a default value to read",
590 column.column_schema.name
591 ),
592 })?;
593 values.push((*column_id, default_value));
594 }
595 let converter =
597 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
598
599 Ok(Some(CompatPrimaryKey { converter, values }))
600}
601
602fn may_compat_fields(
604 mapper: &PrimaryKeyProjectionMapper,
605 actual: &RegionMetadata,
606) -> Result<Option<CompatFields>> {
607 let expect_fields = mapper.batch_fields();
608 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
609 if expect_fields == actual_fields {
610 return Ok(None);
611 }
612
613 let source_field_index: HashMap<_, _> = actual_fields
614 .iter()
615 .enumerate()
616 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
617 .collect();
618
619 let index_or_defaults = expect_fields
620 .iter()
621 .map(|(column_id, expect_data_type)| {
622 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
623 let mut cast_type = None;
624
625 if expect_data_type != *actual_data_type {
626 cast_type = Some(expect_data_type.clone())
627 }
628 Ok(IndexOrDefault::Index {
630 pos: *index,
631 cast_type,
632 })
633 } else {
634 let column = mapper.metadata().column_by_id(*column_id).unwrap();
636 let default_vector = column
638 .column_schema
639 .create_default_vector(1)
640 .context(CreateDefaultSnafu {
641 region_id: mapper.metadata().region_id,
642 column: &column.column_schema.name,
643 })?
644 .with_context(|| CompatReaderSnafu {
645 region_id: mapper.metadata().region_id,
646 reason: format!(
647 "column {} does not have a default value to read",
648 column.column_schema.name
649 ),
650 })?;
651 Ok(IndexOrDefault::DefaultValue {
652 column_id: column.column_id,
653 default_vector,
654 semantic_type: SemanticType::Field,
655 })
656 }
657 })
658 .collect::<Result<Vec<_>>>()?;
659
660 Ok(Some(CompatFields {
661 actual_fields,
662 index_or_defaults,
663 }))
664}
665
666#[derive(Debug)]
668enum IndexOrDefault {
669 Index {
671 pos: usize,
672 cast_type: Option<ConcreteDataType>,
673 },
674 DefaultValue {
676 column_id: ColumnId,
678 default_vector: VectorRef,
680 semantic_type: SemanticType,
682 },
683}
684
685struct RewritePrimaryKey {
687 original: Arc<dyn PrimaryKeyCodec>,
689 new: Arc<dyn PrimaryKeyCodec>,
691 fields: Vec<ColumnId>,
693}
694
695impl RewritePrimaryKey {
696 fn compat(&self, mut batch: Batch) -> Result<Batch> {
698 if batch.pk_values().is_none() {
699 let new_pk_values = self
700 .original
701 .decode(batch.primary_key())
702 .context(DecodeSnafu)?;
703 batch.set_pk_values(new_pk_values);
704 }
705 let values = batch.pk_values().unwrap();
707
708 let mut buffer = Vec::with_capacity(
709 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
710 );
711 match values {
712 CompositeValues::Dense(values) => {
713 self.new
714 .encode_values(values.as_slice(), &mut buffer)
715 .context(EncodeSnafu)?;
716 }
717 CompositeValues::Sparse(values) => {
718 let values = self
719 .fields
720 .iter()
721 .map(|id| {
722 let value = values.get_or_null(*id);
723 (*id, value.as_value_ref())
724 })
725 .collect::<Vec<_>>();
726 self.new
727 .encode_value_refs(&values, &mut buffer)
728 .context(EncodeSnafu)?;
729 }
730 }
731 batch.set_primary_key(buffer);
732
733 Ok(batch)
734 }
735}
736
737struct FlatRewritePrimaryKey {
739 codec: Arc<dyn PrimaryKeyCodec>,
741 metadata: RegionMetadataRef,
743 old_codec: Arc<dyn PrimaryKeyCodec>,
746}
747
748impl FlatRewritePrimaryKey {
749 fn new(
750 expect: &RegionMetadataRef,
751 actual: &RegionMetadataRef,
752 ) -> Option<FlatRewritePrimaryKey> {
753 if expect.primary_key_encoding == actual.primary_key_encoding {
754 return None;
755 }
756 let codec = build_primary_key_codec(expect);
757 let old_codec = build_primary_key_codec(actual);
758
759 Some(FlatRewritePrimaryKey {
760 codec,
761 metadata: expect.clone(),
762 old_codec,
763 })
764 }
765
766 fn rewrite_key(
769 &self,
770 append_values: &[(ColumnId, Value)],
771 batch: RecordBatch,
772 ) -> Result<RecordBatch> {
773 let old_pk_dict_array = batch
774 .column(primary_key_column_index(batch.num_columns()))
775 .as_any()
776 .downcast_ref::<PrimaryKeyArray>()
777 .unwrap();
778 let old_pk_values_array = old_pk_dict_array
779 .values()
780 .as_any()
781 .downcast_ref::<BinaryArray>()
782 .unwrap();
783 let mut builder = BinaryBuilder::with_capacity(
784 old_pk_values_array.len(),
785 old_pk_values_array.value_data().len(),
786 );
787
788 let mut buffer = Vec::with_capacity(
790 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
791 );
792 let mut column_id_values = Vec::new();
793 for value in old_pk_values_array.iter() {
795 let Some(old_pk) = value else {
796 builder.append_null();
797 continue;
798 };
799 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
801 pk_values.extend(append_values);
802
803 buffer.clear();
804 column_id_values.clear();
805 match pk_values {
807 CompositeValues::Dense(dense_values) => {
808 self.codec
809 .encode_values(dense_values.as_slice(), &mut buffer)
810 .context(EncodeSnafu)?;
811 }
812 CompositeValues::Sparse(sparse_values) => {
813 for id in &self.metadata.primary_key {
814 let value = sparse_values.get_or_null(*id);
815 column_id_values.push((*id, value.clone()));
816 }
817 self.codec
818 .encode_values(&column_id_values, &mut buffer)
819 .context(EncodeSnafu)?;
820 }
821 }
822 builder.append_value(&buffer);
823 }
824 let new_pk_values_array = Arc::new(builder.finish());
825 let new_pk_dict_array =
826 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
827
828 let mut columns = batch.columns().to_vec();
829 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
830
831 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
832 }
833}
834
835#[derive(Default)]
837struct FlatCompatPrimaryKey {
838 rewriter: Option<FlatRewritePrimaryKey>,
840 converter: Option<Arc<dyn PrimaryKeyCodec>>,
842 values: Vec<(ColumnId, Value)>,
844}
845
846impl FlatCompatPrimaryKey {
847 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
848 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
849
850 if is_primary_key_same(expect, actual)? {
851 return Ok(Self {
852 rewriter,
853 converter: None,
854 values: Vec::new(),
855 });
856 }
857
858 let to_add = &expect.primary_key[actual.primary_key.len()..];
860 let mut values = Vec::with_capacity(to_add.len());
861 let mut fields = Vec::with_capacity(to_add.len());
862 for column_id in to_add {
863 let column = expect.column_by_id(*column_id).unwrap();
865 fields.push((
866 *column_id,
867 SortField::new(column.column_schema.data_type.clone()),
868 ));
869 let default_value = column
870 .column_schema
871 .create_default()
872 .context(CreateDefaultSnafu {
873 region_id: expect.region_id,
874 column: &column.column_schema.name,
875 })?
876 .with_context(|| CompatReaderSnafu {
877 region_id: expect.region_id,
878 reason: format!(
879 "key column {} does not have a default value to read",
880 column.column_schema.name
881 ),
882 })?;
883 values.push((*column_id, default_value));
884 }
885 debug_assert!(!fields.is_empty());
887
888 let converter = Some(build_primary_key_codec_with_fields(
890 expect.primary_key_encoding,
891 fields.into_iter(),
892 ));
893
894 Ok(Self {
895 rewriter,
896 converter,
897 values,
898 })
899 }
900
901 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
905 if let Some(rewriter) = &self.rewriter {
906 return rewriter.rewrite_key(&self.values, batch);
908 }
909
910 self.append_key(batch)
911 }
912
913 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
915 let Some(converter) = &self.converter else {
916 return Ok(batch);
917 };
918
919 let old_pk_dict_array = batch
920 .column(primary_key_column_index(batch.num_columns()))
921 .as_any()
922 .downcast_ref::<PrimaryKeyArray>()
923 .unwrap();
924 let old_pk_values_array = old_pk_dict_array
925 .values()
926 .as_any()
927 .downcast_ref::<BinaryArray>()
928 .unwrap();
929 let mut builder = BinaryBuilder::with_capacity(
930 old_pk_values_array.len(),
931 old_pk_values_array.value_data().len()
932 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
933 );
934
935 let mut buffer = Vec::with_capacity(
937 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
938 + converter.estimated_size().unwrap_or_default(),
939 );
940
941 for value in old_pk_values_array.iter() {
943 let Some(old_pk) = value else {
944 builder.append_null();
945 continue;
946 };
947
948 buffer.clear();
949 buffer.extend_from_slice(old_pk);
950 converter
951 .encode_values(&self.values, &mut buffer)
952 .context(EncodeSnafu)?;
953
954 builder.append_value(&buffer);
955 }
956
957 let new_pk_values_array = Arc::new(builder.finish());
958 let new_pk_dict_array =
959 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
960
961 let mut columns = batch.columns().to_vec();
963 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
964
965 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
966 }
967}
968
969#[cfg(test)]
970mod tests {
971 use std::sync::Arc;
972
973 use api::v1::{OpType, SemanticType};
974 use datatypes::arrow::array::{
975 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
976 TimestampMillisecondArray, UInt8Array, UInt64Array,
977 };
978 use datatypes::arrow::datatypes::UInt32Type;
979 use datatypes::arrow::record_batch::RecordBatch;
980 use datatypes::prelude::ConcreteDataType;
981 use datatypes::schema::ColumnSchema;
982 use datatypes::value::ValueRef;
983 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt8Vector, UInt64Vector};
984 use mito_codec::row_converter::{
985 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
986 };
987 use store_api::codec::PrimaryKeyEncoding;
988 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
989 use store_api::storage::RegionId;
990
991 use super::*;
992 use crate::read::flat_projection::FlatProjectionMapper;
993 use crate::sst::parquet::flat_format::FlatReadFormat;
994 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
995 use crate::test_util::{VecBatchReader, check_reader_result};
996
997 fn new_metadata(
999 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
1000 primary_key: &[ColumnId],
1001 ) -> RegionMetadata {
1002 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1003 for (id, semantic_type, data_type) in semantic_types {
1004 let column_schema = match semantic_type {
1005 SemanticType::Tag => {
1006 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
1007 }
1008 SemanticType::Field => {
1009 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
1010 }
1011 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
1012 };
1013
1014 builder.push_column_metadata(ColumnMetadata {
1015 column_schema,
1016 semantic_type: *semantic_type,
1017 column_id: *id,
1018 });
1019 }
1020 builder.primary_key(primary_key.to_vec());
1021 builder.build().unwrap()
1022 }
1023
1024 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
1026 let fields = (0..keys.len())
1027 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
1028 .collect();
1029 let converter = DensePrimaryKeyCodec::with_fields(fields);
1030 let row = keys.iter().map(|str_opt| match str_opt {
1031 Some(v) => ValueRef::String(v),
1032 None => ValueRef::Null,
1033 });
1034
1035 converter.encode(row).unwrap()
1036 }
1037
1038 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
1040 let fields = (0..keys.len())
1041 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
1042 .collect();
1043 let converter = SparsePrimaryKeyCodec::with_fields(fields);
1044 let row = keys
1045 .iter()
1046 .map(|(id, str_opt)| match str_opt {
1047 Some(v) => (*id, ValueRef::String(v)),
1048 None => (*id, ValueRef::Null),
1049 })
1050 .collect::<Vec<_>>();
1051 let mut buffer = vec![];
1052 converter.encode_value_refs(&row, &mut buffer).unwrap();
1053 buffer
1054 }
1055
1056 fn new_batch(
1060 primary_key: &[u8],
1061 fields: &[(ColumnId, bool)],
1062 start_ts: i64,
1063 num_rows: usize,
1064 ) -> Batch {
1065 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
1066 start_ts..start_ts + num_rows as i64,
1067 ));
1068 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
1069 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
1070 let field_columns = fields
1071 .iter()
1072 .map(|(id, is_null)| {
1073 let data = if *is_null {
1074 Arc::new(Int64Vector::from(vec![None; num_rows]))
1075 } else {
1076 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
1077 };
1078 BatchColumn {
1079 column_id: *id,
1080 data,
1081 }
1082 })
1083 .collect();
1084 Batch::new(
1085 primary_key.to_vec(),
1086 timestamps,
1087 sequences,
1088 op_types,
1089 field_columns,
1090 )
1091 .unwrap()
1092 }
1093
1094 #[test]
1095 fn test_invalid_pk_len() {
1096 let reader_meta = new_metadata(
1097 &[
1098 (
1099 0,
1100 SemanticType::Timestamp,
1101 ConcreteDataType::timestamp_millisecond_datatype(),
1102 ),
1103 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1104 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1105 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1106 ],
1107 &[1, 2],
1108 );
1109 let expect_meta = new_metadata(
1110 &[
1111 (
1112 0,
1113 SemanticType::Timestamp,
1114 ConcreteDataType::timestamp_millisecond_datatype(),
1115 ),
1116 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1117 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1118 ],
1119 &[1],
1120 );
1121 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1122 }
1123
1124 #[test]
1125 fn test_different_pk() {
1126 let reader_meta = new_metadata(
1127 &[
1128 (
1129 0,
1130 SemanticType::Timestamp,
1131 ConcreteDataType::timestamp_millisecond_datatype(),
1132 ),
1133 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1134 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1135 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1136 ],
1137 &[2, 1],
1138 );
1139 let expect_meta = new_metadata(
1140 &[
1141 (
1142 0,
1143 SemanticType::Timestamp,
1144 ConcreteDataType::timestamp_millisecond_datatype(),
1145 ),
1146 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1147 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
1148 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1149 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
1150 ],
1151 &[1, 2, 4],
1152 );
1153 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
1154 }
1155
1156 #[test]
1157 fn test_same_pk() {
1158 let reader_meta = new_metadata(
1159 &[
1160 (
1161 0,
1162 SemanticType::Timestamp,
1163 ConcreteDataType::timestamp_millisecond_datatype(),
1164 ),
1165 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1166 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1167 ],
1168 &[1],
1169 );
1170 assert!(
1171 may_compat_primary_key(&reader_meta, &reader_meta)
1172 .unwrap()
1173 .is_none()
1174 );
1175 }
1176
1177 #[test]
1178 fn test_same_pk_encoding() {
1179 let reader_meta = Arc::new(new_metadata(
1180 &[
1181 (
1182 0,
1183 SemanticType::Timestamp,
1184 ConcreteDataType::timestamp_millisecond_datatype(),
1185 ),
1186 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1187 ],
1188 &[1],
1189 ));
1190
1191 assert!(
1192 may_compat_primary_key(&reader_meta, &reader_meta)
1193 .unwrap()
1194 .is_none()
1195 );
1196 }
1197
1198 #[test]
1199 fn test_same_fields() {
1200 let reader_meta = Arc::new(new_metadata(
1201 &[
1202 (
1203 0,
1204 SemanticType::Timestamp,
1205 ConcreteDataType::timestamp_millisecond_datatype(),
1206 ),
1207 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1208 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1209 ],
1210 &[1],
1211 ));
1212 let mapper = PrimaryKeyProjectionMapper::all(&reader_meta).unwrap();
1213 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
1214 }
1215
1216 #[tokio::test]
1217 async fn test_compat_reader() {
1218 let reader_meta = Arc::new(new_metadata(
1219 &[
1220 (
1221 0,
1222 SemanticType::Timestamp,
1223 ConcreteDataType::timestamp_millisecond_datatype(),
1224 ),
1225 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1226 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1227 ],
1228 &[1],
1229 ));
1230 let expect_meta = Arc::new(new_metadata(
1231 &[
1232 (
1233 0,
1234 SemanticType::Timestamp,
1235 ConcreteDataType::timestamp_millisecond_datatype(),
1236 ),
1237 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1238 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1239 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1240 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1241 ],
1242 &[1, 3],
1243 ));
1244 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1245 let k1 = encode_key(&[Some("a")]);
1246 let k2 = encode_key(&[Some("b")]);
1247 let source_reader = VecBatchReader::new(&[
1248 new_batch(&k1, &[(2, false)], 1000, 3),
1249 new_batch(&k2, &[(2, false)], 1000, 3),
1250 ]);
1251
1252 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1253 let k1 = encode_key(&[Some("a"), None]);
1254 let k2 = encode_key(&[Some("b"), None]);
1255 check_reader_result(
1256 &mut compat_reader,
1257 &[
1258 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1259 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1260 ],
1261 )
1262 .await;
1263 }
1264
1265 #[tokio::test]
1266 async fn test_compat_reader_different_order() {
1267 let reader_meta = Arc::new(new_metadata(
1268 &[
1269 (
1270 0,
1271 SemanticType::Timestamp,
1272 ConcreteDataType::timestamp_millisecond_datatype(),
1273 ),
1274 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1275 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1276 ],
1277 &[1],
1278 ));
1279 let expect_meta = Arc::new(new_metadata(
1280 &[
1281 (
1282 0,
1283 SemanticType::Timestamp,
1284 ConcreteDataType::timestamp_millisecond_datatype(),
1285 ),
1286 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1287 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1288 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1289 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1290 ],
1291 &[1],
1292 ));
1293 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1294 let k1 = encode_key(&[Some("a")]);
1295 let k2 = encode_key(&[Some("b")]);
1296 let source_reader = VecBatchReader::new(&[
1297 new_batch(&k1, &[(2, false)], 1000, 3),
1298 new_batch(&k2, &[(2, false)], 1000, 3),
1299 ]);
1300
1301 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1302 check_reader_result(
1303 &mut compat_reader,
1304 &[
1305 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
1306 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
1307 ],
1308 )
1309 .await;
1310 }
1311
1312 #[tokio::test]
1313 async fn test_compat_reader_different_types() {
1314 let actual_meta = Arc::new(new_metadata(
1315 &[
1316 (
1317 0,
1318 SemanticType::Timestamp,
1319 ConcreteDataType::timestamp_millisecond_datatype(),
1320 ),
1321 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1322 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1323 ],
1324 &[1],
1325 ));
1326 let expect_meta = Arc::new(new_metadata(
1327 &[
1328 (
1329 0,
1330 SemanticType::Timestamp,
1331 ConcreteDataType::timestamp_millisecond_datatype(),
1332 ),
1333 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1334 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
1335 ],
1336 &[1],
1337 ));
1338 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1339 let k1 = encode_key(&[Some("a")]);
1340 let k2 = encode_key(&[Some("b")]);
1341 let source_reader = VecBatchReader::new(&[
1342 new_batch(&k1, &[(2, false)], 1000, 3),
1343 new_batch(&k2, &[(2, false)], 1000, 3),
1344 ]);
1345
1346 let fn_batch_cast = |batch: Batch| {
1347 let mut new_fields = batch.fields.clone();
1348 new_fields[0].data = new_fields[0]
1349 .data
1350 .cast(&ConcreteDataType::string_datatype())
1351 .unwrap();
1352
1353 batch.with_fields(new_fields).unwrap()
1354 };
1355 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
1356 check_reader_result(
1357 &mut compat_reader,
1358 &[
1359 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
1360 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
1361 ],
1362 )
1363 .await;
1364 }
1365
1366 #[tokio::test]
1367 async fn test_compat_reader_projection() {
1368 let reader_meta = Arc::new(new_metadata(
1369 &[
1370 (
1371 0,
1372 SemanticType::Timestamp,
1373 ConcreteDataType::timestamp_millisecond_datatype(),
1374 ),
1375 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1376 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1377 ],
1378 &[1],
1379 ));
1380 let expect_meta = Arc::new(new_metadata(
1381 &[
1382 (
1383 0,
1384 SemanticType::Timestamp,
1385 ConcreteDataType::timestamp_millisecond_datatype(),
1386 ),
1387 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1388 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1389 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1390 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1391 ],
1392 &[1],
1393 ));
1394 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter(), false).unwrap();
1396 let k1 = encode_key(&[Some("a")]);
1397 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
1398
1399 let mut compat_reader =
1400 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
1401 check_reader_result(
1402 &mut compat_reader,
1403 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
1404 )
1405 .await;
1406
1407 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter(), false).unwrap();
1409 let k1 = encode_key(&[Some("a")]);
1410 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
1411
1412 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1413 check_reader_result(
1414 &mut compat_reader,
1415 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
1416 )
1417 .await;
1418 }
1419
1420 #[tokio::test]
1421 async fn test_compat_reader_different_pk_encoding() {
1422 let mut reader_meta = new_metadata(
1423 &[
1424 (
1425 0,
1426 SemanticType::Timestamp,
1427 ConcreteDataType::timestamp_millisecond_datatype(),
1428 ),
1429 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1430 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1431 ],
1432 &[1],
1433 );
1434 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
1435 let reader_meta = Arc::new(reader_meta);
1436 let mut expect_meta = new_metadata(
1437 &[
1438 (
1439 0,
1440 SemanticType::Timestamp,
1441 ConcreteDataType::timestamp_millisecond_datatype(),
1442 ),
1443 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1444 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1445 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1446 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
1447 ],
1448 &[1, 3],
1449 );
1450 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1451 let expect_meta = Arc::new(expect_meta);
1452
1453 let mapper = ProjectionMapper::all(&expect_meta, false).unwrap();
1454 let k1 = encode_key(&[Some("a")]);
1455 let k2 = encode_key(&[Some("b")]);
1456 let source_reader = VecBatchReader::new(&[
1457 new_batch(&k1, &[(2, false)], 1000, 3),
1458 new_batch(&k2, &[(2, false)], 1000, 3),
1459 ]);
1460
1461 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
1462 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
1463 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
1464 check_reader_result(
1465 &mut compat_reader,
1466 &[
1467 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
1468 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
1469 ],
1470 )
1471 .await;
1472 }
1473
1474 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
1476 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
1477 for &pk in primary_keys {
1478 builder.append(pk).unwrap();
1479 }
1480 Arc::new(builder.finish())
1481 }
1482
1483 #[test]
1484 fn test_flat_compat_batch_with_missing_columns() {
1485 let actual_metadata = Arc::new(new_metadata(
1486 &[
1487 (
1488 0,
1489 SemanticType::Timestamp,
1490 ConcreteDataType::timestamp_millisecond_datatype(),
1491 ),
1492 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1493 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1494 ],
1495 &[1],
1496 ));
1497
1498 let expected_metadata = Arc::new(new_metadata(
1499 &[
1500 (
1501 0,
1502 SemanticType::Timestamp,
1503 ConcreteDataType::timestamp_millisecond_datatype(),
1504 ),
1505 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1506 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1507 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1509 ],
1510 &[1],
1511 ));
1512
1513 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1514 let read_format = FlatReadFormat::new(
1515 actual_metadata.clone(),
1516 [0, 1, 2, 3].into_iter(),
1517 None,
1518 "test",
1519 false,
1520 )
1521 .unwrap();
1522 let format_projection = read_format.format_projection();
1523
1524 let compat_batch =
1525 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1526 .unwrap()
1527 .unwrap();
1528
1529 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1530 tag_builder.append_value("tag1");
1531 tag_builder.append_value("tag1");
1532 let tag_dict_array = Arc::new(tag_builder.finish());
1533
1534 let k1 = encode_key(&[Some("tag1")]);
1535 let input_columns: Vec<ArrayRef> = vec![
1536 tag_dict_array.clone(),
1537 Arc::new(Int64Array::from(vec![100, 200])),
1538 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1539 build_flat_test_pk_array(&[&k1, &k1]),
1540 Arc::new(UInt64Array::from_iter_values([1, 2])),
1541 Arc::new(UInt8Array::from_iter_values([
1542 OpType::Put as u8,
1543 OpType::Put as u8,
1544 ])),
1545 ];
1546 let input_schema =
1547 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1548 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1549
1550 let result = compat_batch.compat(input_batch).unwrap();
1551
1552 let expected_schema =
1553 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1554
1555 let expected_columns: Vec<ArrayRef> = vec![
1556 tag_dict_array.clone(),
1557 Arc::new(Int64Array::from(vec![100, 200])),
1558 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1559 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1560 build_flat_test_pk_array(&[&k1, &k1]),
1561 Arc::new(UInt64Array::from_iter_values([1, 2])),
1562 Arc::new(UInt8Array::from_iter_values([
1563 OpType::Put as u8,
1564 OpType::Put as u8,
1565 ])),
1566 ];
1567 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
1568
1569 assert_eq!(expected_batch, result);
1570 }
1571
1572 #[test]
1573 fn test_flat_compat_batch_with_different_pk_encoding() {
1574 let mut actual_metadata = new_metadata(
1575 &[
1576 (
1577 0,
1578 SemanticType::Timestamp,
1579 ConcreteDataType::timestamp_millisecond_datatype(),
1580 ),
1581 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1582 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1583 ],
1584 &[1],
1585 );
1586 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
1587 let actual_metadata = Arc::new(actual_metadata);
1588
1589 let mut expected_metadata = new_metadata(
1590 &[
1591 (
1592 0,
1593 SemanticType::Timestamp,
1594 ConcreteDataType::timestamp_millisecond_datatype(),
1595 ),
1596 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
1597 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1598 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
1599 ],
1600 &[1, 3],
1601 );
1602 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1603 let expected_metadata = Arc::new(expected_metadata);
1604
1605 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1606 let read_format = FlatReadFormat::new(
1607 actual_metadata.clone(),
1608 [0, 1, 2, 3].into_iter(),
1609 None,
1610 "test",
1611 false,
1612 )
1613 .unwrap();
1614 let format_projection = read_format.format_projection();
1615
1616 let compat_batch =
1617 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
1618 .unwrap()
1619 .unwrap();
1620
1621 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
1623 tag1_builder.append_value("tag1");
1624 tag1_builder.append_value("tag1");
1625 let tag1_dict_array = Arc::new(tag1_builder.finish());
1626
1627 let k1 = encode_key(&[Some("tag1")]);
1628 let input_columns: Vec<ArrayRef> = vec![
1629 tag1_dict_array.clone(),
1630 Arc::new(Int64Array::from(vec![100, 200])),
1631 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1632 build_flat_test_pk_array(&[&k1, &k1]),
1633 Arc::new(UInt64Array::from_iter_values([1, 2])),
1634 Arc::new(UInt8Array::from_iter_values([
1635 OpType::Put as u8,
1636 OpType::Put as u8,
1637 ])),
1638 ];
1639 let input_schema =
1640 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1641 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1642
1643 let result = compat_batch.compat(input_batch).unwrap();
1644
1645 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
1646 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
1647 null_tag_builder.append_nulls(2);
1648 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
1649 let expected_columns: Vec<ArrayRef> = vec![
1650 tag1_dict_array.clone(),
1651 null_tag_dict_array,
1652 Arc::new(Int64Array::from(vec![100, 200])),
1653 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1654 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1655 Arc::new(UInt64Array::from_iter_values([1, 2])),
1656 Arc::new(UInt8Array::from_iter_values([
1657 OpType::Put as u8,
1658 OpType::Put as u8,
1659 ])),
1660 ];
1661 let output_schema =
1662 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1663 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1664
1665 assert_eq!(expected_batch, result);
1666 }
1667
1668 #[test]
1669 fn test_flat_compat_batch_compact_sparse() {
1670 let mut actual_metadata = new_metadata(
1671 &[
1672 (
1673 0,
1674 SemanticType::Timestamp,
1675 ConcreteDataType::timestamp_millisecond_datatype(),
1676 ),
1677 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1678 ],
1679 &[],
1680 );
1681 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1682 let actual_metadata = Arc::new(actual_metadata);
1683
1684 let mut expected_metadata = new_metadata(
1685 &[
1686 (
1687 0,
1688 SemanticType::Timestamp,
1689 ConcreteDataType::timestamp_millisecond_datatype(),
1690 ),
1691 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1692 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1693 ],
1694 &[],
1695 );
1696 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1697 let expected_metadata = Arc::new(expected_metadata);
1698
1699 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1700 let read_format = FlatReadFormat::new(
1701 actual_metadata.clone(),
1702 [0, 2, 3].into_iter(),
1703 None,
1704 "test",
1705 true,
1706 )
1707 .unwrap();
1708 let format_projection = read_format.format_projection();
1709
1710 let compat_batch =
1711 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1712 .unwrap()
1713 .unwrap();
1714
1715 let sparse_k1 = encode_sparse_key(&[]);
1716 let input_columns: Vec<ArrayRef> = vec![
1717 Arc::new(Int64Array::from(vec![100, 200])),
1718 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1719 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1720 Arc::new(UInt64Array::from_iter_values([1, 2])),
1721 Arc::new(UInt8Array::from_iter_values([
1722 OpType::Put as u8,
1723 OpType::Put as u8,
1724 ])),
1725 ];
1726 let input_schema =
1727 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1728 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1729
1730 let result = compat_batch.compat(input_batch).unwrap();
1731
1732 let expected_columns: Vec<ArrayRef> = vec![
1733 Arc::new(Int64Array::from(vec![100, 200])),
1734 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1735 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1736 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1737 Arc::new(UInt64Array::from_iter_values([1, 2])),
1738 Arc::new(UInt8Array::from_iter_values([
1739 OpType::Put as u8,
1740 OpType::Put as u8,
1741 ])),
1742 ];
1743 let output_schema =
1744 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1745 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1746
1747 assert_eq!(expected_batch, result);
1748 }
1749}