1use std::collections::HashMap;
18use std::sync::Arc;
19
20use api::v1::SemanticType;
21use datatypes::arrow::array::{
22 Array, ArrayRef, BinaryArray, BinaryBuilder, DictionaryArray, UInt32Array,
23};
24use datatypes::arrow::compute::{TakeOptions, take};
25use datatypes::arrow::datatypes::{FieldRef, Schema, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::data_type::ConcreteDataType;
28use datatypes::extension::json::is_structured_json_field;
29use datatypes::prelude::DataType;
30use datatypes::value::Value;
31use datatypes::vectors::VectorRef;
32use datatypes::vectors::json::array::JsonArray;
33use mito_codec::row_converter::{
34 CompositeValues, PrimaryKeyCodec, SortField, build_primary_key_codec,
35 build_primary_key_codec_with_fields,
36};
37use snafu::{OptionExt, ResultExt, ensure};
38use store_api::codec::PrimaryKeyEncoding;
39use store_api::metadata::{RegionMetadata, RegionMetadataRef};
40use store_api::storage::ColumnId;
41
42use crate::error::{
43 CompatReaderSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu, DecodeSnafu,
44 EncodeSnafu, NewRecordBatchSnafu, Result, UnsupportedOperationSnafu,
45};
46use crate::read::flat_projection::{FlatProjectionMapper, flat_projected_columns};
47use crate::sst::parquet::flat_format::{FlatReadFormat, primary_key_column_index};
48use crate::sst::parquet::format::{INTERNAL_COLUMN_NUM, PrimaryKeyArray};
49use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
50
51pub(crate) fn has_same_columns_and_pk_encoding(
54 projection_mapper: &FlatProjectionMapper,
55 read_format: &FlatReadFormat,
56 compaction: bool,
57) -> bool {
58 let left = projection_mapper.metadata();
59 let right = read_format.metadata();
60 if left.primary_key_encoding != right.primary_key_encoding {
61 return false;
62 }
63
64 if left.column_metadatas.len() != right.column_metadatas.len() {
65 return false;
66 }
67
68 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
69 if left_col.column_id != right_col.column_id {
70 return false;
71 }
72 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
73 }
74
75 &projection_mapper.input_arrow_schema(compaction) == read_format.arrow_schema()
76}
77
78pub(crate) struct FlatCompatBatch {
80 index_or_defaults: Vec<IndexOrDefault>,
82 arrow_schema: SchemaRef,
84 compat_pk: FlatCompatPrimaryKey,
86}
87
88impl FlatCompatBatch {
89 pub(crate) fn try_new(
95 mapper: &FlatProjectionMapper,
96 read_format: &FlatReadFormat,
97 compaction: bool,
98 ) -> Result<Option<Self>> {
99 let actual = read_format.metadata();
100 let format_projection = read_format.format_projection();
101 let mut actual_schema = flat_projected_columns(actual, format_projection);
102 if read_format
103 .arrow_schema()
104 .fields()
105 .iter()
106 .any(is_structured_json_field)
107 {
108 for field in read_format.arrow_schema().fields() {
109 if is_structured_json_field(field)
110 && let Some(column_id) =
111 actual.column_by_name(field.name()).map(|x| x.column_id)
112 && let Some(i) = actual_schema.iter().position(|x| x.0 == column_id)
113 {
114 actual_schema[i].1 = ConcreteDataType::from_arrow_type(field.data_type());
115 }
116 }
117 }
118
119 let expect_schema = mapper.batch_schema();
120 if expect_schema == actual_schema {
121 return Ok(None);
124 }
125
126 if actual.primary_key_encoding == PrimaryKeyEncoding::Sparse && compaction {
127 return FlatCompatBatch::try_new_compact_sparse(mapper, actual);
129 }
130
131 let (index_or_defaults, fields) =
132 Self::compute_index_and_fields(&actual_schema, expect_schema, mapper.metadata())?;
133
134 let compat_pk = FlatCompatPrimaryKey::new(mapper.metadata(), actual)?;
135
136 Ok(Some(Self {
137 index_or_defaults,
138 arrow_schema: Arc::new(Schema::new(fields)),
139 compat_pk,
140 }))
141 }
142
143 fn compute_index_and_fields(
144 actual_schema: &[(ColumnId, ConcreteDataType)],
145 expect_schema: &[(ColumnId, ConcreteDataType)],
146 expect_metadata: &RegionMetadata,
147 ) -> Result<(Vec<IndexOrDefault>, Vec<FieldRef>)> {
148 let actual_schema_index: HashMap<_, _> = actual_schema
150 .iter()
151 .enumerate()
152 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
153 .collect();
154
155 let mut index_or_defaults = Vec::with_capacity(expect_schema.len());
156 let mut fields = Vec::with_capacity(expect_schema.len());
157 for (column_id, expect_data_type) in expect_schema {
158 let column_index = expect_metadata.column_index_by_id(*column_id).unwrap();
160 let expect_column = &expect_metadata.column_metadatas[column_index];
161 let column_field = &expect_metadata.schema.arrow_schema().fields()[column_index];
162 if expect_column.semantic_type == SemanticType::Tag {
164 let field = tag_maybe_to_dictionary_field(
165 &expect_column.column_schema.data_type,
166 column_field,
167 );
168 fields.push(Arc::new(with_field_id(
169 (*field).clone(),
170 expect_column.column_id,
171 )));
172 } else {
173 let field = with_field_id(
174 Arc::unwrap_or_clone(column_field.clone()),
175 expect_column.column_id,
176 )
177 .with_data_type(expect_data_type.as_arrow_type());
178 fields.push(Arc::new(field))
179 };
180
181 if let Some((index, actual_data_type)) = actual_schema_index.get(column_id) {
182 let mut cast_type = None;
183
184 if expect_data_type != *actual_data_type {
186 cast_type = Some(expect_data_type.clone())
187 }
188 index_or_defaults.push(IndexOrDefault::Index {
190 pos: *index,
191 cast_type,
192 });
193 } else {
194 let default_vector = expect_column
196 .column_schema
197 .create_default_vector(1)
198 .context(CreateDefaultSnafu {
199 region_id: expect_metadata.region_id,
200 column: &expect_column.column_schema.name,
201 })?
202 .with_context(|| CompatReaderSnafu {
203 region_id: expect_metadata.region_id,
204 reason: format!(
205 "column {} does not have a default value to read",
206 expect_column.column_schema.name
207 ),
208 })?;
209 index_or_defaults.push(IndexOrDefault::DefaultValue {
210 default_vector,
211 semantic_type: expect_column.semantic_type,
212 });
213 };
214 }
215 fields.extend_from_slice(&internal_fields());
216
217 Ok((index_or_defaults, fields))
218 }
219
220 fn try_new_compact_sparse(
221 mapper: &FlatProjectionMapper,
222 actual: &RegionMetadataRef,
223 ) -> Result<Option<Self>> {
224 ensure!(
227 mapper.metadata().primary_key_encoding == PrimaryKeyEncoding::Sparse,
228 UnsupportedOperationSnafu {
229 err_msg: "Flat format doesn't support converting sparse encoding back to dense encoding"
230 }
231 );
232
233 let actual_schema: Vec<_> = actual
236 .field_columns()
237 .chain([actual.time_index_column()])
238 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
239 .collect();
240 let expect_schema: Vec<_> = mapper
241 .metadata()
242 .field_columns()
243 .chain([mapper.metadata().time_index_column()])
244 .map(|col| (col.column_id, col.column_schema.data_type.clone()))
245 .collect();
246
247 let (index_or_defaults, fields) =
248 Self::compute_index_and_fields(&actual_schema, &expect_schema, mapper.metadata())?;
249
250 let compat_pk = FlatCompatPrimaryKey::default();
251
252 Ok(Some(Self {
253 index_or_defaults,
254 arrow_schema: Arc::new(Schema::new(fields)),
255 compat_pk,
256 }))
257 }
258
259 pub(crate) fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
261 let len = batch.num_rows();
262 let columns = self
263 .index_or_defaults
264 .iter()
265 .map(|index_or_default| match index_or_default {
266 IndexOrDefault::Index { pos, cast_type } => {
267 let old_column = batch.column(*pos);
268
269 if let Some(ty) = cast_type {
270 let casted = if let Some(json_type) = ty.as_json()
271 && json_type.is_json2()
272 {
273 JsonArray::from(old_column)
274 .try_align(&json_type.as_arrow_type())
275 .context(ConvertValueSnafu)?
276 } else {
277 datatypes::arrow::compute::cast(old_column, &ty.as_arrow_type())
278 .context(ComputeArrowSnafu)?
279 };
280 Ok(casted)
281 } else {
282 Ok(old_column.clone())
283 }
284 }
285 IndexOrDefault::DefaultValue {
286 default_vector,
287 semantic_type,
288 } => repeat_vector(default_vector, len, *semantic_type == SemanticType::Tag),
289 })
290 .chain(
291 batch.columns()[batch.num_columns() - INTERNAL_COLUMN_NUM..]
293 .iter()
294 .map(|col| Ok(col.clone())),
295 )
296 .collect::<Result<Vec<_>>>()?;
297
298 let compat_batch = RecordBatch::try_new(self.arrow_schema.clone(), columns)
299 .context(NewRecordBatchSnafu)?;
300
301 self.compat_pk.compat(compat_batch)
303 }
304}
305
306fn repeat_vector(vector: &VectorRef, to_len: usize, is_tag: bool) -> Result<ArrayRef> {
308 assert_eq!(1, vector.len());
309 let data_type = vector.data_type();
310 if is_tag && data_type.is_string() {
311 let values = vector.to_arrow_array();
312 if values.is_null(0) {
313 let keys = UInt32Array::new_null(to_len);
315 Ok(Arc::new(DictionaryArray::new(keys, values.slice(0, 0))))
316 } else {
317 let keys = UInt32Array::from_value(0, to_len);
318 Ok(Arc::new(DictionaryArray::new(keys, values)))
319 }
320 } else {
321 let keys = UInt32Array::from_value(0, to_len);
322 take(
323 &vector.to_arrow_array(),
324 &keys,
325 Some(TakeOptions {
326 check_bounds: false,
327 }),
328 )
329 .context(ComputeArrowSnafu)
330 }
331}
332
333fn is_primary_key_same(expect: &RegionMetadata, actual: &RegionMetadata) -> Result<bool> {
335 ensure!(
336 actual.primary_key.len() <= expect.primary_key.len(),
337 CompatReaderSnafu {
338 region_id: expect.region_id,
339 reason: format!(
340 "primary key has more columns {} than expect {}",
341 actual.primary_key.len(),
342 expect.primary_key.len()
343 ),
344 }
345 );
346 ensure!(
347 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
348 CompatReaderSnafu {
349 region_id: expect.region_id,
350 reason: format!(
351 "primary key has different prefix, expect: {:?}, actual: {:?}",
352 expect.primary_key, actual.primary_key
353 ),
354 }
355 );
356
357 Ok(actual.primary_key.len() == expect.primary_key.len())
358}
359
360#[derive(Debug)]
362enum IndexOrDefault {
363 Index {
365 pos: usize,
366 cast_type: Option<ConcreteDataType>,
367 },
368 DefaultValue {
370 default_vector: VectorRef,
372 semantic_type: SemanticType,
374 },
375}
376
377struct FlatRewritePrimaryKey {
379 codec: Arc<dyn PrimaryKeyCodec>,
381 metadata: RegionMetadataRef,
383 old_codec: Arc<dyn PrimaryKeyCodec>,
386}
387
388impl FlatRewritePrimaryKey {
389 fn new(
390 expect: &RegionMetadataRef,
391 actual: &RegionMetadataRef,
392 ) -> Option<FlatRewritePrimaryKey> {
393 if expect.primary_key_encoding == actual.primary_key_encoding {
394 return None;
395 }
396 let codec = build_primary_key_codec(expect);
397 let old_codec = build_primary_key_codec(actual);
398
399 Some(FlatRewritePrimaryKey {
400 codec,
401 metadata: expect.clone(),
402 old_codec,
403 })
404 }
405
406 fn rewrite_key(
409 &self,
410 append_values: &[(ColumnId, Value)],
411 batch: RecordBatch,
412 ) -> Result<RecordBatch> {
413 let old_pk_dict_array = batch
414 .column(primary_key_column_index(batch.num_columns()))
415 .as_any()
416 .downcast_ref::<PrimaryKeyArray>()
417 .unwrap();
418 let old_pk_values_array = old_pk_dict_array
419 .values()
420 .as_any()
421 .downcast_ref::<BinaryArray>()
422 .unwrap();
423 let mut builder = BinaryBuilder::with_capacity(
424 old_pk_values_array.len(),
425 old_pk_values_array.value_data().len(),
426 );
427
428 let mut buffer = Vec::with_capacity(
430 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1),
431 );
432 let mut column_id_values = Vec::new();
433 for value in old_pk_values_array.iter() {
435 let Some(old_pk) = value else {
436 builder.append_null();
437 continue;
438 };
439 let mut pk_values = self.old_codec.decode(old_pk).context(DecodeSnafu)?;
441 pk_values.extend(append_values);
442
443 buffer.clear();
444 column_id_values.clear();
445 match pk_values {
447 CompositeValues::Dense(dense_values) => {
448 self.codec
449 .encode_values(dense_values.as_slice(), &mut buffer)
450 .context(EncodeSnafu)?;
451 }
452 CompositeValues::Sparse(sparse_values) => {
453 for id in &self.metadata.primary_key {
454 let value = sparse_values.get_or_null(*id);
455 column_id_values.push((*id, value.clone()));
456 }
457 self.codec
458 .encode_values(&column_id_values, &mut buffer)
459 .context(EncodeSnafu)?;
460 }
461 }
462 builder.append_value(&buffer);
463 }
464 let new_pk_values_array = Arc::new(builder.finish());
465 let new_pk_dict_array =
466 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
467
468 let mut columns = batch.columns().to_vec();
469 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
470
471 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
472 }
473}
474
475#[derive(Default)]
477struct FlatCompatPrimaryKey {
478 rewriter: Option<FlatRewritePrimaryKey>,
480 converter: Option<Arc<dyn PrimaryKeyCodec>>,
482 values: Vec<(ColumnId, Value)>,
484}
485
486impl FlatCompatPrimaryKey {
487 fn new(expect: &RegionMetadataRef, actual: &RegionMetadataRef) -> Result<Self> {
488 let rewriter = FlatRewritePrimaryKey::new(expect, actual);
489
490 if is_primary_key_same(expect, actual)? {
491 return Ok(Self {
492 rewriter,
493 converter: None,
494 values: Vec::new(),
495 });
496 }
497
498 let to_add = &expect.primary_key[actual.primary_key.len()..];
500 let mut values = Vec::with_capacity(to_add.len());
501 let mut fields = Vec::with_capacity(to_add.len());
502 for column_id in to_add {
503 let column = expect.column_by_id(*column_id).unwrap();
505 fields.push((
506 *column_id,
507 SortField::new(column.column_schema.data_type.clone()),
508 ));
509 let default_value = column
510 .column_schema
511 .create_default()
512 .context(CreateDefaultSnafu {
513 region_id: expect.region_id,
514 column: &column.column_schema.name,
515 })?
516 .with_context(|| CompatReaderSnafu {
517 region_id: expect.region_id,
518 reason: format!(
519 "key column {} does not have a default value to read",
520 column.column_schema.name
521 ),
522 })?;
523 values.push((*column_id, default_value));
524 }
525 debug_assert!(!fields.is_empty());
527
528 let converter = Some(build_primary_key_codec_with_fields(
530 expect.primary_key_encoding,
531 fields.into_iter(),
532 ));
533
534 Ok(Self {
535 rewriter,
536 converter,
537 values,
538 })
539 }
540
541 fn compat(&self, batch: RecordBatch) -> Result<RecordBatch> {
545 if let Some(rewriter) = &self.rewriter {
546 return rewriter.rewrite_key(&self.values, batch);
548 }
549
550 self.append_key(batch)
551 }
552
553 fn append_key(&self, batch: RecordBatch) -> Result<RecordBatch> {
555 let Some(converter) = &self.converter else {
556 return Ok(batch);
557 };
558
559 let old_pk_dict_array = batch
560 .column(primary_key_column_index(batch.num_columns()))
561 .as_any()
562 .downcast_ref::<PrimaryKeyArray>()
563 .unwrap();
564 let old_pk_values_array = old_pk_dict_array
565 .values()
566 .as_any()
567 .downcast_ref::<BinaryArray>()
568 .unwrap();
569 let mut builder = BinaryBuilder::with_capacity(
570 old_pk_values_array.len(),
571 old_pk_values_array.value_data().len()
572 + converter.estimated_size().unwrap_or_default() * old_pk_values_array.len(),
573 );
574
575 let mut buffer = Vec::with_capacity(
577 old_pk_values_array.value_data().len() / old_pk_values_array.len().max(1)
578 + converter.estimated_size().unwrap_or_default(),
579 );
580
581 for value in old_pk_values_array.iter() {
583 let Some(old_pk) = value else {
584 builder.append_null();
585 continue;
586 };
587
588 buffer.clear();
589 buffer.extend_from_slice(old_pk);
590 converter
591 .encode_values(&self.values, &mut buffer)
592 .context(EncodeSnafu)?;
593
594 builder.append_value(&buffer);
595 }
596
597 let new_pk_values_array = Arc::new(builder.finish());
598 let new_pk_dict_array =
599 PrimaryKeyArray::new(old_pk_dict_array.keys().clone(), new_pk_values_array);
600
601 let mut columns = batch.columns().to_vec();
603 columns[primary_key_column_index(batch.num_columns())] = Arc::new(new_pk_dict_array);
604
605 RecordBatch::try_new(batch.schema(), columns).context(NewRecordBatchSnafu)
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use std::sync::Arc;
612
613 use api::v1::{OpType, SemanticType};
614 use datatypes::arrow::array::{
615 ArrayRef, BinaryDictionaryBuilder, Int64Array, StringDictionaryBuilder,
616 TimestampMillisecondArray, UInt8Array, UInt64Array,
617 };
618 use datatypes::arrow::datatypes::UInt32Type;
619 use datatypes::arrow::record_batch::RecordBatch;
620 use datatypes::prelude::ConcreteDataType;
621 use datatypes::schema::ColumnSchema;
622 use datatypes::value::ValueRef;
623 use mito_codec::row_converter::{
624 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
625 };
626 use store_api::codec::PrimaryKeyEncoding;
627 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
628 use store_api::storage::RegionId;
629
630 use super::*;
631 use crate::read::flat_projection::FlatProjectionMapper;
632 use crate::read::read_columns::ReadColumns;
633 use crate::sst::parquet::flat_format::FlatReadFormat;
634 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
635
636 fn new_metadata(
638 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
639 primary_key: &[ColumnId],
640 ) -> RegionMetadata {
641 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
642 for (id, semantic_type, data_type) in semantic_types {
643 let column_schema = match semantic_type {
644 SemanticType::Tag => {
645 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
646 }
647 SemanticType::Field => {
648 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
649 }
650 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
651 };
652
653 builder.push_column_metadata(ColumnMetadata {
654 column_schema,
655 semantic_type: *semantic_type,
656 column_id: *id,
657 });
658 }
659 builder.primary_key(primary_key.to_vec());
660 builder.build().unwrap()
661 }
662
663 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
665 let fields = (0..keys.len())
666 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
667 .collect();
668 let converter = DensePrimaryKeyCodec::with_fields(fields);
669 let row = keys.iter().map(|str_opt| match str_opt {
670 Some(v) => ValueRef::String(v),
671 None => ValueRef::Null,
672 });
673
674 converter.encode(row).unwrap()
675 }
676
677 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
679 let fields = (0..keys.len())
680 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
681 .collect();
682 let converter = SparsePrimaryKeyCodec::with_fields(fields);
683 let row = keys
684 .iter()
685 .map(|(id, str_opt)| match str_opt {
686 Some(v) => (*id, ValueRef::String(v)),
687 None => (*id, ValueRef::Null),
688 })
689 .collect::<Vec<_>>();
690 let mut buffer = vec![];
691 converter.encode_value_refs(&row, &mut buffer).unwrap();
692 buffer
693 }
694
695 fn build_flat_test_pk_array(primary_keys: &[&[u8]]) -> ArrayRef {
697 let mut builder = BinaryDictionaryBuilder::<UInt32Type>::new();
698 for &pk in primary_keys {
699 builder.append(pk).unwrap();
700 }
701 Arc::new(builder.finish())
702 }
703
704 #[test]
705 fn test_flat_compat_batch_with_missing_columns() {
706 let actual_metadata = Arc::new(new_metadata(
707 &[
708 (
709 0,
710 SemanticType::Timestamp,
711 ConcreteDataType::timestamp_millisecond_datatype(),
712 ),
713 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
714 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
715 ],
716 &[1],
717 ));
718
719 let expected_metadata = Arc::new(new_metadata(
720 &[
721 (
722 0,
723 SemanticType::Timestamp,
724 ConcreteDataType::timestamp_millisecond_datatype(),
725 ),
726 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
727 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
728 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
730 ],
731 &[1],
732 ));
733
734 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
735 let read_format = FlatReadFormat::new(
736 actual_metadata.clone(),
737 ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
738 None,
739 "test",
740 false,
741 )
742 .unwrap();
743
744 let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
745 .unwrap()
746 .unwrap();
747
748 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
749 tag_builder.append_value("tag1");
750 tag_builder.append_value("tag1");
751 let tag_dict_array = Arc::new(tag_builder.finish());
752
753 let k1 = encode_key(&[Some("tag1")]);
754 let input_columns: Vec<ArrayRef> = vec![
755 tag_dict_array.clone(),
756 Arc::new(Int64Array::from(vec![100, 200])),
757 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
758 build_flat_test_pk_array(&[&k1, &k1]),
759 Arc::new(UInt64Array::from_iter_values([1, 2])),
760 Arc::new(UInt8Array::from_iter_values([
761 OpType::Put as u8,
762 OpType::Put as u8,
763 ])),
764 ];
765 let input_schema =
766 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
767 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
768
769 let result = compat_batch.compat(input_batch).unwrap();
770
771 let expected_schema =
772 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
773
774 let expected_columns: Vec<ArrayRef> = vec![
775 tag_dict_array.clone(),
776 Arc::new(Int64Array::from(vec![100, 200])),
777 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
778 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
779 build_flat_test_pk_array(&[&k1, &k1]),
780 Arc::new(UInt64Array::from_iter_values([1, 2])),
781 Arc::new(UInt8Array::from_iter_values([
782 OpType::Put as u8,
783 OpType::Put as u8,
784 ])),
785 ];
786 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
787
788 assert_eq!(expected_batch, result);
789 }
790
791 #[test]
792 fn test_flat_compat_batch_with_read_projection_superset() {
793 let actual_metadata = Arc::new(new_metadata(
794 &[
795 (
796 0,
797 SemanticType::Timestamp,
798 ConcreteDataType::timestamp_millisecond_datatype(),
799 ),
800 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
801 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
802 ],
803 &[1],
804 ));
805
806 let expected_metadata = Arc::new(new_metadata(
807 &[
808 (
809 0,
810 SemanticType::Timestamp,
811 ConcreteDataType::timestamp_millisecond_datatype(),
812 ),
813 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
814 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
815 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
817 ],
818 &[1],
819 ));
820
821 let mapper = FlatProjectionMapper::new_with_read_columns(
822 &expected_metadata,
823 vec![1, 2],
824 ReadColumns::from_deduped_column_ids([1, 2, 3]),
825 None,
826 )
827 .unwrap();
828 let read_format = FlatReadFormat::new(
829 actual_metadata.clone(),
830 ReadColumns::from_deduped_column_ids([1, 2, 3]),
831 None,
832 "test",
833 false,
834 )
835 .unwrap();
836
837 let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
838 .unwrap()
839 .unwrap();
840
841 let mut tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
842 tag_builder.append_value("tag1");
843 tag_builder.append_value("tag1");
844 let tag_dict_array = Arc::new(tag_builder.finish());
845
846 let k1 = encode_key(&[Some("tag1")]);
847 let input_columns: Vec<ArrayRef> = vec![
848 tag_dict_array.clone(),
849 Arc::new(Int64Array::from(vec![100, 200])),
850 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
851 build_flat_test_pk_array(&[&k1, &k1]),
852 Arc::new(UInt64Array::from_iter_values([1, 2])),
853 Arc::new(UInt8Array::from_iter_values([
854 OpType::Put as u8,
855 OpType::Put as u8,
856 ])),
857 ];
858 let input_schema =
859 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
860 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
861
862 let result = compat_batch.compat(input_batch).unwrap();
863
864 let expected_schema =
865 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
866 let expected_columns: Vec<ArrayRef> = vec![
867 tag_dict_array.clone(),
868 Arc::new(Int64Array::from(vec![100, 200])),
869 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
870 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
871 build_flat_test_pk_array(&[&k1, &k1]),
872 Arc::new(UInt64Array::from_iter_values([1, 2])),
873 Arc::new(UInt8Array::from_iter_values([
874 OpType::Put as u8,
875 OpType::Put as u8,
876 ])),
877 ];
878 let expected_batch = RecordBatch::try_new(expected_schema, expected_columns).unwrap();
879
880 assert_eq!(expected_batch, result);
881 }
882
883 #[test]
884 fn test_flat_compat_batch_with_different_pk_encoding() {
885 let mut actual_metadata = new_metadata(
886 &[
887 (
888 0,
889 SemanticType::Timestamp,
890 ConcreteDataType::timestamp_millisecond_datatype(),
891 ),
892 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
893 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
894 ],
895 &[1],
896 );
897 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Dense;
898 let actual_metadata = Arc::new(actual_metadata);
899
900 let mut expected_metadata = new_metadata(
901 &[
902 (
903 0,
904 SemanticType::Timestamp,
905 ConcreteDataType::timestamp_millisecond_datatype(),
906 ),
907 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
908 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
909 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
910 ],
911 &[1, 3],
912 );
913 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
914 let expected_metadata = Arc::new(expected_metadata);
915
916 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
917 let read_format = FlatReadFormat::new(
918 actual_metadata.clone(),
919 ReadColumns::from_deduped_column_ids([0, 1, 2, 3]),
920 None,
921 "test",
922 false,
923 )
924 .unwrap();
925
926 let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, false)
927 .unwrap()
928 .unwrap();
929
930 let mut tag1_builder = StringDictionaryBuilder::<UInt32Type>::new();
932 tag1_builder.append_value("tag1");
933 tag1_builder.append_value("tag1");
934 let tag1_dict_array = Arc::new(tag1_builder.finish());
935
936 let k1 = encode_key(&[Some("tag1")]);
937 let input_columns: Vec<ArrayRef> = vec![
938 tag1_dict_array.clone(),
939 Arc::new(Int64Array::from(vec![100, 200])),
940 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
941 build_flat_test_pk_array(&[&k1, &k1]),
942 Arc::new(UInt64Array::from_iter_values([1, 2])),
943 Arc::new(UInt8Array::from_iter_values([
944 OpType::Put as u8,
945 OpType::Put as u8,
946 ])),
947 ];
948 let input_schema =
949 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
950 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
951
952 let result = compat_batch.compat(input_batch).unwrap();
953
954 let sparse_k1 = encode_sparse_key(&[(1, Some("tag1")), (3, None)]);
955 let mut null_tag_builder = StringDictionaryBuilder::<UInt32Type>::new();
956 null_tag_builder.append_nulls(2);
957 let null_tag_dict_array = Arc::new(null_tag_builder.finish());
958 let expected_columns: Vec<ArrayRef> = vec![
959 tag1_dict_array.clone(),
960 null_tag_dict_array,
961 Arc::new(Int64Array::from(vec![100, 200])),
962 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
963 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
964 Arc::new(UInt64Array::from_iter_values([1, 2])),
965 Arc::new(UInt8Array::from_iter_values([
966 OpType::Put as u8,
967 OpType::Put as u8,
968 ])),
969 ];
970 let output_schema =
971 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
972 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
973
974 assert_eq!(expected_batch, result);
975 }
976
977 #[test]
978 fn test_flat_compat_batch_compact_sparse() {
979 let mut actual_metadata = new_metadata(
980 &[
981 (
982 0,
983 SemanticType::Timestamp,
984 ConcreteDataType::timestamp_millisecond_datatype(),
985 ),
986 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
987 ],
988 &[],
989 );
990 actual_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
991 let actual_metadata = Arc::new(actual_metadata);
992
993 let mut expected_metadata = new_metadata(
994 &[
995 (
996 0,
997 SemanticType::Timestamp,
998 ConcreteDataType::timestamp_millisecond_datatype(),
999 ),
1000 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
1001 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
1002 ],
1003 &[],
1004 );
1005 expected_metadata.primary_key_encoding = PrimaryKeyEncoding::Sparse;
1006 let expected_metadata = Arc::new(expected_metadata);
1007
1008 let mapper = FlatProjectionMapper::all(&expected_metadata).unwrap();
1009 let read_format = FlatReadFormat::new(
1010 actual_metadata.clone(),
1011 ReadColumns::from_deduped_column_ids([0, 2, 3]),
1012 None,
1013 "test",
1014 true,
1015 )
1016 .unwrap();
1017
1018 let compat_batch = FlatCompatBatch::try_new(&mapper, &read_format, true)
1019 .unwrap()
1020 .unwrap();
1021
1022 let sparse_k1 = encode_sparse_key(&[]);
1023 let input_columns: Vec<ArrayRef> = vec![
1024 Arc::new(Int64Array::from(vec![100, 200])),
1025 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1026 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1027 Arc::new(UInt64Array::from_iter_values([1, 2])),
1028 Arc::new(UInt8Array::from_iter_values([
1029 OpType::Put as u8,
1030 OpType::Put as u8,
1031 ])),
1032 ];
1033 let input_schema =
1034 to_flat_sst_arrow_schema(&actual_metadata, &FlatSchemaOptions::default());
1035 let input_batch = RecordBatch::try_new(input_schema, input_columns).unwrap();
1036
1037 let result = compat_batch.compat(input_batch).unwrap();
1038
1039 let expected_columns: Vec<ArrayRef> = vec![
1040 Arc::new(Int64Array::from(vec![100, 200])),
1041 Arc::new(Int64Array::from(vec![None::<i64>, None::<i64>])),
1042 Arc::new(TimestampMillisecondArray::from_iter_values([1000, 2000])),
1043 build_flat_test_pk_array(&[&sparse_k1, &sparse_k1]),
1044 Arc::new(UInt64Array::from_iter_values([1, 2])),
1045 Arc::new(UInt8Array::from_iter_values([
1046 OpType::Put as u8,
1047 OpType::Put as u8,
1048 ])),
1049 ];
1050 let output_schema =
1051 to_flat_sst_arrow_schema(&expected_metadata, &FlatSchemaOptions::default());
1052 let expected_batch = RecordBatch::try_new(output_schema, expected_columns).unwrap();
1053
1054 assert_eq!(expected_batch, result);
1055 }
1056}