1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::prelude::DataType;
29use datatypes::value::Value;
30use datatypes::vectors::VectorRef;
31use datatypes::vectors::json::array::JsonArray;
32use mito_codec::row_converter::{
33 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
34 build_primary_key_codec_with_fields,
35};
36use snafu::{OptionExt, ResultExt, ensure};
37use store_api::codec::PrimaryKeyEncoding;
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::storage::ColumnId;
40
41use crate::error::{
42 CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
43 EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
44};
45use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
46use crate::sst::parquet::flat_format::primary_key_column_index;
47use crate::sst::parquet::format::{FormatProjection, INTERNAL_COLUMN_NUM, PrimaryKeyArray};
48use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
49
50pub(crate) fn has_same_columns_and_pk_encoding(
52 left: &RegionMetadata,
53 right: &RegionMetadata,
54) -> bool {
55 if left.primary_key_encoding != right.primary_key_encoding {
56 return false;
57 }
58
59 if left.column_metadatas.len() != right.column_metadatas.len() {
60 return false;
61 }
62
63 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
64 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
65 return false;
66 }
67 debug_assert_eq!(
68 left_col.column_schema.data_type,
69 right_col.column_schema.data_type
70 );
71 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
72 }
73
74 true
75}
76
77pub(crate) struct FlatCompatBatch {
79 index_or_defaults: Vec<IndexOrDefault>,
81 arrow_schema: SchemaRef,
83 compat_pk: FlatCompatPrimaryKey,
85}
86
87impl FlatCompatBatch {
88 pub(crate) fn try_new(
95 mapper: &FlatProjectionMapper,
96 actual: &RegionMetadataRef,
97 format_projection: &FormatProjection,
98 compaction: bool,
99 ) -> Result<Option<Self>> {
100 let actual_schema = flat_projected_columns(actual, format_projection);
101 let expect_schema = mapper.batch_schema();
102 if expect_schema == actual_schema {
103 return Ok(None);
106 }
107
108 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
109 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
111 }
112
113 let (index_or_defaults, fields) =
114 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
115
116 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
117
118 Ok(Some(Self {
119 index_or_defaults,
120 arrow_schema: Arc::new(Schema::new(fields)),
121 compat_pk,
122 }))
123 }
124
125 fn compute_index_and_fields(
126 actual_schema: &[(ColumnId, ConcreteDataType)],
127 expect_schema: &[(ColumnId, ConcreteDataType)],
128 expect_metadata: &RegionMetadata,
129 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
130 let actual_schema_index: HashMap<_, _> = actual_schema
132 .iter()
133 .enumerate()
134 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
135 .collect();
136
137 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
138 let mut fields = Vec::with_capacity(expect_schema.len());
139 for (column_id, expect_data_type) in expect_schema {
140 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
142 let expect_column = &expect_metadata.column_metadatas[column_index];
143 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
144 if expect_column.semantic_type == SemanticType::Tag {
146 let field = tag_maybe_to_dictionary_field(
147 &expect_column.column_schema.data_type,
148 column_field,
149 );
150 fields.push(Arc::new(with_field_id(
151 (*field).clone(),
152 expect_column.column_id,
153 )));
154 } else {
155 fields.push(Arc::new(with_field_id(
156 (**column_field).clone(),
157 expect_column.column_id,
158 )));
159 };
160
161 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
162 let mut cast_type = None;
163
164 if expect_data_type != *actual_data_type {
166 cast_type = Some(expect_data_type.clone())
167 }
168 index_or_defaults.push(IndexOrDefault::Index {
170 pos: *index,
171 cast_type,
172 });
173 } else {
174 let default_vector = expect_column
176 .column_schema
177 .create_default_vector(1)
178 .context(CreateDefaultSnafu {
179 region_id: expect_metadata.region_id,
180 column: &expect_column.column_schema.name,
181 })?
182 .with_context(|| CompatReaderSnafu {
183 region_id: expect_metadata.region_id,
184 reason: format!(
185 "column {} does not have a default value to read",
186 expect_column.column_schema.name
187 ),
188 })?;
189 index_or_defaults.push(IndexOrDefault::DefaultValue {
190 default_vector,
191 semantic_type: expect_column.semantic_type,
192 });
193 };
194 }
195 fields.extend_from_slice(&internal_fields());
196
197 Ok((index_or_defaults, fields))
198 }
199
200 fn try_new_compact_sparse(
201 mapper: &FlatProjectionMapper,
202 actual: &RegionMetadataRef,
203 ) -> Result<Option<Self>> {
204 ensure!(
207 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
208 UnsupportedOperationSnafu {
209 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
210 }
211 );
212
213 let actual_schema: Vec<_> = actual
216 .field_columns()
217 .chain([actual.time_index_column()])
218 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
219 .collect();
220 let expect_schema: Vec<_> = mapper
221 .metadata()
222 .field_columns()
223 .chain([mapper.metadata().time_index_column()])
224 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
225 .collect();
226
227 let (index_or_defaults, fields) =
228 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
229
230 let compat_pk = FlatCompatPrimaryKey::default();
231
232 Ok(Some(Self {
233 index_or_defaults,
234 arrow_schema: Arc::new(Schema::new(fields)),
235 compat_pk,
236 }))
237 }
238
239 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
241 let len = batch.num_rows();
242 let columns = self
243 .index_or_defaults
244 .iter()
245 .map(|index_or_default| match index_or_default {
246 IndexOrDefault::Index { pos, cast_type } => {
247 let old_column = batch.column(*pos);
248
249 if let Some(ty) = cast_type {
250 let casted = if let Some(json_type) = ty.as_json()
251 && json_type.is_json2()
252 {
253 JsonArray::from(old_column)
254 .try_align(&json_type.as_arrow_type())
255 .context(ConvertValueSnafu)?
256 } else {
257 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
258 .context(ComputeArrowSnafu)?
259 };
260 Ok(casted)
261 } else {
262 Ok(old_column.clone())
263 }
264 }
265 IndexOrDefault::DefaultValue {
266 default_vector,
267 semantic_type,
268 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
269 })
270 .chain(
271 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
273 .iter()
274 .map(|col| Ok(col.clone())),
275 )
276 .collect::<Result<Vec<_>>>()?;
277
278 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
279 .context(NewRecordBatchSnafu)?;
280
281 self.compat_pk.compat(compat_batch)
283 }
284}
285
286fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
288 assert_eq!(1, vector.len());
289 let data_type = vector.data_type();
290 if is_tag && data_type.is_string() {
291 let values = vector.to_arrow_array();
292 if values.is_null(0) {
293 let keys = UInt32Array::new_null(to_len);
295 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
296 } else {
297 let keys = UInt32Array::from_value(0, to_len);
298 Ok(Arc::new(DictionaryArray::new(keys, values)))
299 }
300 } else {
301 let keys = UInt32Array::from_value(0, to_len);
302 take(
303 &vector.to_arrow_array(),
304 &keys,
305 Some(TakeOptions {
306 check_bounds: false,
307 }),
308 )
309 .context(ComputeArrowSnafu)
310 }
311}
312
313fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
315 ensure!(
316 actual.primary_key.len() <= expect.primary_key.len(),
317 CompatReaderSnafu {
318 region_id: expect.region_id,
319 reason: format!(
320 "primary key has more columns {} than expect {}",
321 actual.primary_key.len(),
322 expect.primary_key.len()
323 ),
324 }
325 );
326 ensure!(
327 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
328 CompatReaderSnafu {
329 region_id: expect.region_id,
330 reason: format!(
331 "primary key has different prefix, expect: {:?}, actual: {:?}",
332 expect.primary_key, actual.primary_key
333 ),
334 }
335 );
336
337 Ok(actual.primary_key.len() == expect.primary_key.len())
338}
339
340#[derive(Debug)]
342enum IndexOrDefault {
343 Index {
345 pos: usize,
346 cast_type: Option<ConcreteDataType>,
347 },
348 DefaultValue {
350 default_vector: VectorRef,
352 semantic_type: SemanticType,
354 },
355}
356
357struct FlatRewritePrimaryKey {
359 codec: Arc<dyn PrimaryKeyCodec>,
361 metadata: RegionMetadataRef,
363 old_codec: Arc<dyn PrimaryKeyCodec>,
366}
367
368impl FlatRewritePrimaryKey {
369 fn new(
370 expect: &RegionMetadataRef,
371 actual: &RegionMetadataRef,
372 ) -> Option<FlatRewritePrimaryKey> {
373 if expect.primary_key_encoding == actual.primary_key_encoding {
374 return None;
375 }
376 let codec = build_primary_key_codec(expect);
377 let old_codec = build_primary_key_codec(actual);
378
379 Some(FlatRewritePrimaryKey {
380 codec,
381 metadata: expect.clone(),
382 old_codec,
383 })
384 }
385
386 fn rewrite_key(
389 &self,
390 append_values: &[(ColumnId, Value)],
391 batch: RecordBatch,
392 ) -> Result<RecordBatch> {
393 let old_pk_dict_array = batch
394 .column(primary_key_column_index(batch.num_columns()))
395 .as_any()
396 .downcast_ref::<PrimaryKeyArray>()
397 .unwrap();
398 let old_pk_values_array = old_pk_dict_array
399 .values()
400 .as_any()
401 .downcast_ref::<BinaryArray>()
402 .unwrap();
403 let mut builder = BinaryBuilder::with_capacity(
404 old_pk_values_array.len(),
405 old_pk_values_array.value_data().len(),
406 );
407
408 let mut buffer = Vec::with_capacity(
410 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
411 );
412 let mut column_id_values = Vec::new();
413 for value in old_pk_values_array.iter() {
415 let Some(old_pk) = value else {
416 builder.append_null();
417 continue;
418 };
419 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
421 pk_values.extend(append_values);
422
423 buffer.clear();
424 column_id_values.clear();
425 match pk_values {
427 CompositeValues::Dense(dense_values) => {
428 self.codec
429 .encode_values(dense_values.as_slice(), &mut buffer)
430 .context(EncodeSnafu)?;
431 }
432 CompositeValues::Sparse(sparse_values) => {
433 for id in &self.metadata.primary_key {
434 let value = sparse_values.get_or_null(*id);
435 column_id_values.push((*id, value.clone()));
436 }
437 self.codec
438 .encode_values(&column_id_values, &mut buffer)
439 .context(EncodeSnafu)?;
440 }
441 }
442 builder.append_value(&buffer);
443 }
444 let new_pk_values_array = Arc::new(builder.finish());
445 let new_pk_dict_array =
446 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
447
448 let mut columns = batch.columns().to_vec();
449 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
450
451 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
452 }
453}
454
455#[derive(Default)]
457struct FlatCompatPrimaryKey {
458 rewriter: Option<FlatRewritePrimaryKey>,
460 converter: Option<Arc<dyn PrimaryKeyCodec>>,
462 values: Vec<(ColumnId, Value)>,
464}
465
466impl FlatCompatPrimaryKey {
467 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
468 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
469
470 if is_primary_key_same(expect, actual)? {
471 return Ok(Self {
472 rewriter,
473 converter: None,
474 values: Vec::new(),
475 });
476 }
477
478 let to_add = &expect.primary_key[actual.primary_key.len()..];
480 let mut values = Vec::with_capacity(to_add.len());
481 let mut fields = Vec::with_capacity(to_add.len());
482 for column_id in to_add {
483 let column = expect.column_by_id(*column_id).unwrap();
485 fields.push((
486 *column_id,
487 SortField::new(column.column_schema.data_type.clone()),
488 ));
489 let default_value = column
490 .column_schema
491 .create_default()
492 .context(CreateDefaultSnafu {
493 region_id: expect.region_id,
494 column: &column.column_schema.name,
495 })?
496 .with_context(|| CompatReaderSnafu {
497 region_id: expect.region_id,
498 reason: format!(
499 "key column {} does not have a default value to read",
500 column.column_schema.name
501 ),
502 })?;
503 values.push((*column_id, default_value));
504 }
505 debug_assert!(!fields.is_empty());
507
508 let converter = Some(build_primary_key_codec_with_fields(
510 expect.primary_key_encoding,
511 fields.into_iter(),
512 ));
513
514 Ok(Self {
515 rewriter,
516 converter,
517 values,
518 })
519 }
520
521 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
525 if let Some(rewriter) = &self.rewriter {
526 return rewriter.rewrite_key(&self.values, batch);
528 }
529
530 self.append_key(batch)
531 }
532
533 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
535 let Some(converter) = &self.converter else {
536 return Ok(batch);
537 };
538
539 let old_pk_dict_array = batch
540 .column(primary_key_column_index(batch.num_columns()))
541 .as_any()
542 .downcast_ref::<PrimaryKeyArray>()
543 .unwrap();
544 let old_pk_values_array = old_pk_dict_array
545 .values()
546 .as_any()
547 .downcast_ref::<BinaryArray>()
548 .unwrap();
549 let mut builder = BinaryBuilder::with_capacity(
550 old_pk_values_array.len(),
551 old_pk_values_array.value_data().len()
552 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
553 );
554
555 let mut buffer = Vec::with_capacity(
557 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
558 + converter.estimated_size().unwrap_or_default(),
559 );
560
561 for value in old_pk_values_array.iter() {
563 let Some(old_pk) = value else {
564 builder.append_null();
565 continue;
566 };
567
568 buffer.clear();
569 buffer.extend_from_slice(old_pk);
570 converter
571 .encode_values(&self.values, &mut buffer)
572 .context(EncodeSnafu)?;
573
574 builder.append_value(&buffer);
575 }
576
577 let new_pk_values_array = Arc::new(builder.finish());
578 let new_pk_dict_array =
579 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
580
581 let mut columns = batch.columns().to_vec();
583 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
584
585 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
586 }
587}
588
589#[cfg(test)]
590mod tests {
591 use std::sync::Arc;
592
593 use api::v1::{OpType, SemanticType};
594 use datatypes::arrow::array::{
595 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
596 TimestampMillisecondArray, UInt8Array, UInt64Array,
597 };
598 use datatypes::arrow::datatypes::UInt32Type;
599 use datatypes::arrow::record_batch::RecordBatch;
600 use datatypes::prelude::ConcreteDataType;
601 use datatypes::schema::ColumnSchema;
602 use datatypes::value::ValueRef;
603 use mito_codec::row_converter::{
604 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
605 };
606 use store_api::codec::PrimaryKeyEncoding;
607 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
608 use store_api::storage::RegionId;
609
610 use super::*;
611 use crate::read::flat_projection::FlatProjectionMapper;
612 use crate::read::read_columns::ReadColumns;
613 use crate::sst::parquet::flat_format::FlatReadFormat;
614 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
615
616 fn new_metadata(
618 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
619 primary_key: &[ColumnId],
620 ) -> RegionMetadata {
621 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
622 for (id, semantic_type, data_type) in semantic_types {
623 let column_schema = match semantic_type {
624 SemanticType::Tag => {
625 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
626 }
627 SemanticType::Field => {
628 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
629 }
630 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
631 };
632
633 builder.push_column_metadata(ColumnMetadata {
634 column_schema,
635 semantic_type: *semantic_type,
636 column_id: *id,
637 });
638 }
639 builder.primary_key(primary_key.to_vec());
640 builder.build().unwrap()
641 }
642
643 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
645 let fields = (0..keys.len())
646 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
647 .collect();
648 let converter = DensePrimaryKeyCodec::with_fields(fields);
649 let row = keys.iter().map(|str_opt| match str_opt {
650 Some(v) => ValueRef::String(v),
651 None => ValueRef::Null,
652 });
653
654 converter.encode(row).unwrap()
655 }
656
657 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
659 let fields = (0..keys.len())
660 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
661 .collect();
662 let converter = SparsePrimaryKeyCodec::with_fields(fields);
663 let row = keys
664 .iter()
665 .map(|(id, str_opt)| match str_opt {
666 Some(v) => (*id, ValueRef::String(v)),
667 None => (*id, ValueRef::Null),
668 })
669 .collect::<Vec<_>>();
670 let mut buffer = vec![];
671 converter.encode_value_refs(&row, &mut buffer).unwrap();
672 buffer
673 }
674
675 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
677 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
678 for &pk in primary_keys {
679 builder.append(pk).unwrap();
680 }
681 Arc::new(builder.finish())
682 }
683
684 #[test]
685 fn test_flat_compat_batch_with_missing_columns() {
686 let actual_metadata = Arc::new(new_metadata(
687 &[
688 (
689 0,
690 SemanticType::Timestamp,
691 ConcreteDataType::timestamp_millisecond_datatype(),
692 ),
693 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
694 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
695 ],
696 &[1],
697 ));
698
699 let expected_metadata = Arc::new(new_metadata(
700 &[
701 (
702 0,
703 SemanticType::Timestamp,
704 ConcreteDataType::timestamp_millisecond_datatype(),
705 ),
706 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
707 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
708 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
710 ],
711 &[1],
712 ));
713
714 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
715 let read_format = FlatReadFormat::new(
716 actual_metadata.clone(),
717 ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
718 None,
719 "test",
720 false,
721 )
722 .unwrap();
723 let format_projection = read_format.format_projection();
724
725 let compat_batch =
726 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
727 .unwrap()
728 .unwrap();
729
730 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
731 tag_builder.append_value("tag1");
732 tag_builder.append_value("tag1");
733 let tag_dict_array = Arc::new(tag_builder.finish());
734
735 let k1 = encode_key(&[Some("tag1")]);
736 let input_columns: Vec<ArrayRef> = vec![
737 tag_dict_array.clone(),
738 Arc::new(Int64Array::from(vec![100, 200])),
739 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
740 build_flat_test_pk_array(&[&k1, &k1]),
741 Arc::new(UInt64Array::from_iter_values([1, 2])),
742 Arc::new(UInt8Array::from_iter_values([
743 OpType::Put as u8,
744 OpType::Put as u8,
745 ])),
746 ];
747 let input_schema =
748 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
749 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
750
751 let result = compat_batch.compat(input_batch).unwrap();
752
753 let expected_schema =
754 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
755
756 let expected_columns: Vec<ArrayRef> = vec![
757 tag_dict_array.clone(),
758 Arc::new(Int64Array::from(vec![100, 200])),
759 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
760 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
761 build_flat_test_pk_array(&[&k1, &k1]),
762 Arc::new(UInt64Array::from_iter_values([1, 2])),
763 Arc::new(UInt8Array::from_iter_values([
764 OpType::Put as u8,
765 OpType::Put as u8,
766 ])),
767 ];
768 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
769
770 assert_eq!(expected_batch, result);
771 }
772
773 #[test]
774 fn test_flat_compat_batch_with_read_projection_superset() {
775 let actual_metadata = Arc::new(new_metadata(
776 &[
777 (
778 0,
779 SemanticType::Timestamp,
780 ConcreteDataType::timestamp_millisecond_datatype(),
781 ),
782 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
783 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
784 ],
785 &[1],
786 ));
787
788 let expected_metadata = Arc::new(new_metadata(
789 &[
790 (
791 0,
792 SemanticType::Timestamp,
793 ConcreteDataType::timestamp_millisecond_datatype(),
794 ),
795 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
796 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
797 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
799 ],
800 &[1],
801 ));
802
803 let mapper = FlatProjectionMapper::new_with_read_columns(
804 &expected_metadata,
805 vec![1, 2],
806 ReadColumns::from_deduped_column_ids([1, 2, 3]),
807 )
808 .unwrap();
809 let read_format = FlatReadFormat::new(
810 actual_metadata.clone(),
811 ReadColumns::from_deduped_column_ids([1, 2, 3]),
812 None,
813 "test",
814 false,
815 )
816 .unwrap();
817 let format_projection = read_format.format_projection();
818
819 let compat_batch =
820 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
821 .unwrap()
822 .unwrap();
823
824 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
825 tag_builder.append_value("tag1");
826 tag_builder.append_value("tag1");
827 let tag_dict_array = Arc::new(tag_builder.finish());
828
829 let k1 = encode_key(&[Some("tag1")]);
830 let input_columns: Vec<ArrayRef> = vec![
831 tag_dict_array.clone(),
832 Arc::new(Int64Array::from(vec![100, 200])),
833 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
834 build_flat_test_pk_array(&[&k1, &k1]),
835 Arc::new(UInt64Array::from_iter_values([1, 2])),
836 Arc::new(UInt8Array::from_iter_values([
837 OpType::Put as u8,
838 OpType::Put as u8,
839 ])),
840 ];
841 let input_schema =
842 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
843 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
844
845 let result = compat_batch.compat(input_batch).unwrap();
846
847 let expected_schema =
848 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
849 let expected_columns: Vec<ArrayRef> = vec![
850 tag_dict_array.clone(),
851 Arc::new(Int64Array::from(vec![100, 200])),
852 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
853 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
854 build_flat_test_pk_array(&[&k1, &k1]),
855 Arc::new(UInt64Array::from_iter_values([1, 2])),
856 Arc::new(UInt8Array::from_iter_values([
857 OpType::Put as u8,
858 OpType::Put as u8,
859 ])),
860 ];
861 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
862
863 assert_eq!(expected_batch, result);
864 }
865
866 #[test]
867 fn test_flat_compat_batch_with_different_pk_encoding() {
868 let mut actual_metadata = new_metadata(
869 &[
870 (
871 0,
872 SemanticType::Timestamp,
873 ConcreteDataType::timestamp_millisecond_datatype(),
874 ),
875 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
876 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
877 ],
878 &[1],
879 );
880 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
881 let actual_metadata = Arc::new(actual_metadata);
882
883 let mut expected_metadata = new_metadata(
884 &[
885 (
886 0,
887 SemanticType::Timestamp,
888 ConcreteDataType::timestamp_millisecond_datatype(),
889 ),
890 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
891 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
892 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
893 ],
894 &[1, 3],
895 );
896 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
897 let expected_metadata = Arc::new(expected_metadata);
898
899 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
900 let read_format = FlatReadFormat::new(
901 actual_metadata.clone(),
902 ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
903 None,
904 "test",
905 false,
906 )
907 .unwrap();
908 let format_projection = read_format.format_projection();
909
910 let compat_batch =
911 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, false)
912 .unwrap()
913 .unwrap();
914
915 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
917 tag1_builder.append_value("tag1");
918 tag1_builder.append_value("tag1");
919 let tag1_dict_array = Arc::new(tag1_builder.finish());
920
921 let k1 = encode_key(&[Some("tag1")]);
922 let input_columns: Vec<ArrayRef> = vec![
923 tag1_dict_array.clone(),
924 Arc::new(Int64Array::from(vec![100, 200])),
925 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
926 build_flat_test_pk_array(&[&k1, &k1]),
927 Arc::new(UInt64Array::from_iter_values([1, 2])),
928 Arc::new(UInt8Array::from_iter_values([
929 OpType::Put as u8,
930 OpType::Put as u8,
931 ])),
932 ];
933 let input_schema =
934 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
935 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
936
937 let result = compat_batch.compat(input_batch).unwrap();
938
939 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
940 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
941 null_tag_builder.append_nulls(2);
942 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
943 let expected_columns: Vec<ArrayRef> = vec![
944 tag1_dict_array.clone(),
945 null_tag_dict_array,
946 Arc::new(Int64Array::from(vec![100, 200])),
947 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
948 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
949 Arc::new(UInt64Array::from_iter_values([1, 2])),
950 Arc::new(UInt8Array::from_iter_values([
951 OpType::Put as u8,
952 OpType::Put as u8,
953 ])),
954 ];
955 let output_schema =
956 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
957 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
958
959 assert_eq!(expected_batch, result);
960 }
961
962 #[test]
963 fn test_flat_compat_batch_compact_sparse() {
964 let mut actual_metadata = new_metadata(
965 &[
966 (
967 0,
968 SemanticType::Timestamp,
969 ConcreteDataType::timestamp_millisecond_datatype(),
970 ),
971 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
972 ],
973 &[],
974 );
975 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
976 let actual_metadata = Arc::new(actual_metadata);
977
978 let mut expected_metadata = new_metadata(
979 &[
980 (
981 0,
982 SemanticType::Timestamp,
983 ConcreteDataType::timestamp_millisecond_datatype(),
984 ),
985 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
986 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
987 ],
988 &[],
989 );
990 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
991 let expected_metadata = Arc::new(expected_metadata);
992
993 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
994 let read_format = FlatReadFormat::new(
995 actual_metadata.clone(),
996 ReadColumns::from_deduped_column_ids([0, 2, 3]),
997 None,
998 "test",
999 true,
1000 )
1001 .unwrap();
1002 let format_projection = read_format.format_projection();
1003
1004 let compat_batch =
1005 FlatCompatBatch::try_new(&mapper, &actual_metadata, format_projection, true)
1006 .unwrap()
1007 .unwrap();
1008
1009 let sparse_k1 = encode_sparse_key(&[]);
1010 let input_columns: Vec<ArrayRef> = vec![
1011 Arc::new(Int64Array::from(vec![100, 200])),
1012 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1013 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1014 Arc::new(UInt64Array::from_iter_values([1, 2])),
1015 Arc::new(UInt8Array::from_iter_values([
1016 OpType::Put as u8,
1017 OpType::Put as u8,
1018 ])),
1019 ];
1020 let input_schema =
1021 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1022 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1023
1024 let result = compat_batch.compat(input_batch).unwrap();
1025
1026 let expected_columns: Vec<ArrayRef> = vec![
1027 Arc::new(Int64Array::from(vec![100, 200])),
1028 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1029 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1030 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1031 Arc::new(UInt64Array::from_iter_values([1, 2])),
1032 Arc::new(UInt8Array::from_iter_values([
1033 OpType::Put as u8,
1034 OpType::Put as u8,
1035 ])),
1036 ];
1037 let output_schema =
1038 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1039 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1040
1041 assert_eq!(expected_batch, result);
1042 }
1043}