1use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use mito_codec::row_converter::{
24 build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
25 SortField,
26};
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
32use crate::read::projection::ProjectionMapper;
33use crate::read::{Batch, BatchColumn, BatchReader};
34
35pub struct CompatReader<R> {
37 reader: R,
39 compat: CompatBatch,
41}
42
43impl<R> CompatReader<R> {
44 pub fn new(
49 mapper: &ProjectionMapper,
50 reader_meta: RegionMetadataRef,
51 reader: R,
52 ) -> Result<CompatReader<R>> {
53 Ok(CompatReader {
54 reader,
55 compat: CompatBatch::new(mapper, reader_meta)?,
56 })
57 }
58}
59
60#[async_trait::async_trait]
61impl<R: BatchReader> BatchReader for CompatReader<R> {
62 async fn next_batch(&mut self) -> Result<Option<Batch>> {
63 let Some(mut batch) = self.reader.next_batch().await? else {
64 return Ok(None);
65 };
66
67 batch = self.compat.compat_batch(batch)?;
68
69 Ok(Some(batch))
70 }
71}
72
73pub(crate) struct CompatBatch {
75 rewrite_pk: Option<RewritePrimaryKey>,
77 compat_pk: Option<CompatPrimaryKey>,
79 compat_fields: Option<CompatFields>,
81}
82
83impl CompatBatch {
84 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
88 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
89 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
90 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
91
92 Ok(Self {
93 rewrite_pk,
94 compat_pk,
95 compat_fields,
96 })
97 }
98
99 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
101 if let Some(rewrite_pk) = &self.rewrite_pk {
102 batch = rewrite_pk.compat(batch)?;
103 }
104 if let Some(compat_pk) = &self.compat_pk {
105 batch = compat_pk.compat(batch)?;
106 }
107 if let Some(compat_fields) = &self.compat_fields {
108 batch = compat_fields.compat(batch);
109 }
110
111 Ok(batch)
112 }
113}
114
115pub(crate) fn has_same_columns_and_pk_encoding(
117 left: &RegionMetadata,
118 right: &RegionMetadata,
119) -> bool {
120 if left.primary_key_encoding != right.primary_key_encoding {
121 return false;
122 }
123
124 if left.column_metadatas.len() != right.column_metadatas.len() {
125 return false;
126 }
127
128 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
129 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
130 return false;
131 }
132 debug_assert_eq!(
133 left_col.column_schema.data_type,
134 right_col.column_schema.data_type
135 );
136 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
137 }
138
139 true
140}
141
142#[derive(Debug)]
144struct CompatPrimaryKey {
145 converter: Arc<dyn PrimaryKeyCodec>,
147 values: Vec<(ColumnId, Value)>,
149}
150
151impl CompatPrimaryKey {
152 fn compat(&self, mut batch: Batch) -> Result<Batch> {
154 let mut buffer = Vec::with_capacity(
155 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
156 );
157 buffer.extend_from_slice(batch.primary_key());
158 self.converter
159 .encode_values(&self.values, &mut buffer)
160 .context(EncodeSnafu)?;
161
162 batch.set_primary_key(buffer);
163
164 if let Some(pk_values) = &mut batch.pk_values {
166 pk_values.extend(&self.values);
167 }
168
169 Ok(batch)
170 }
171}
172
173#[derive(Debug)]
175struct CompatFields {
176 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
178 index_or_defaults: Vec<IndexOrDefault>,
180}
181
182impl CompatFields {
183 #[must_use]
185 fn compat(&self, batch: Batch) -> Batch {
186 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
187 debug_assert!(self
188 .actual_fields
189 .iter()
190 .zip(batch.fields())
191 .all(|((id, _), batch_column)| *id == batch_column.column_id));
192
193 let len = batch.num_rows();
194 let fields = self
195 .index_or_defaults
196 .iter()
197 .map(|index_or_default| match index_or_default {
198 IndexOrDefault::Index { pos, cast_type } => {
199 let old_column = &batch.fields()[*pos];
200
201 let data = if let Some(ty) = cast_type {
202 old_column.data.cast(ty).unwrap()
205 } else {
206 old_column.data.clone()
207 };
208 BatchColumn {
209 column_id: old_column.column_id,
210 data,
211 }
212 }
213 IndexOrDefault::DefaultValue {
214 column_id,
215 default_vector,
216 } => {
217 let data = default_vector.replicate(&[len]);
218 BatchColumn {
219 column_id: *column_id,
220 data,
221 }
222 }
223 })
224 .collect();
225
226 batch.with_fields(fields).unwrap()
228 }
229}
230
231fn may_rewrite_primary_key(
232 expect: &RegionMetadata,
233 actual: &RegionMetadata,
234) -> Option<RewritePrimaryKey> {
235 if expect.primary_key_encoding == actual.primary_key_encoding {
236 return None;
237 }
238
239 let fields = expect.primary_key.clone();
240 let original = build_primary_key_codec(actual);
241 let new = build_primary_key_codec(expect);
242
243 Some(RewritePrimaryKey {
244 original,
245 new,
246 fields,
247 })
248}
249
250fn may_compat_primary_key(
252 expect: &RegionMetadata,
253 actual: &RegionMetadata,
254) -> Result<Option<CompatPrimaryKey>> {
255 ensure!(
256 actual.primary_key.len() <= expect.primary_key.len(),
257 CompatReaderSnafu {
258 region_id: expect.region_id,
259 reason: format!(
260 "primary key has more columns {} than expect {}",
261 actual.primary_key.len(),
262 expect.primary_key.len()
263 ),
264 }
265 );
266 ensure!(
267 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
268 CompatReaderSnafu {
269 region_id: expect.region_id,
270 reason: format!(
271 "primary key has different prefix, expect: {:?}, actual: {:?}",
272 expect.primary_key, actual.primary_key
273 ),
274 }
275 );
276 if actual.primary_key.len() == expect.primary_key.len() {
277 return Ok(None);
278 }
279
280 let to_add = &expect.primary_key[actual.primary_key.len()..];
282 let mut fields = Vec::with_capacity(to_add.len());
283 let mut values = Vec::with_capacity(to_add.len());
284 for column_id in to_add {
285 let column = expect.column_by_id(*column_id).unwrap();
287 fields.push((
288 *column_id,
289 SortField::new(column.column_schema.data_type.clone()),
290 ));
291 let default_value = column
292 .column_schema
293 .create_default()
294 .context(CreateDefaultSnafu {
295 region_id: expect.region_id,
296 column: &column.column_schema.name,
297 })?
298 .with_context(|| CompatReaderSnafu {
299 region_id: expect.region_id,
300 reason: format!(
301 "key column {} does not have a default value to read",
302 column.column_schema.name
303 ),
304 })?;
305 values.push((*column_id, default_value));
306 }
307 let converter =
309 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
310
311 Ok(Some(CompatPrimaryKey { converter, values }))
312}
313
314fn may_compat_fields(
316 mapper: &ProjectionMapper,
317 actual: &RegionMetadata,
318) -> Result<Option<CompatFields>> {
319 let expect_fields = mapper.batch_fields();
320 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
321 if expect_fields == actual_fields {
322 return Ok(None);
323 }
324
325 let source_field_index: HashMap<_, _> = actual_fields
326 .iter()
327 .enumerate()
328 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
329 .collect();
330
331 let index_or_defaults = expect_fields
332 .iter()
333 .map(|(column_id, expect_data_type)| {
334 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
335 let mut cast_type = None;
336
337 if expect_data_type != *actual_data_type {
338 cast_type = Some(expect_data_type.clone())
339 }
340 Ok(IndexOrDefault::Index {
342 pos: *index,
343 cast_type,
344 })
345 } else {
346 let column = mapper.metadata().column_by_id(*column_id).unwrap();
348 let default_vector = column
350 .column_schema
351 .create_default_vector(1)
352 .context(CreateDefaultSnafu {
353 region_id: mapper.metadata().region_id,
354 column: &column.column_schema.name,
355 })?
356 .with_context(|| CompatReaderSnafu {
357 region_id: mapper.metadata().region_id,
358 reason: format!(
359 "column {} does not have a default value to read",
360 column.column_schema.name
361 ),
362 })?;
363 Ok(IndexOrDefault::DefaultValue {
364 column_id: column.column_id,
365 default_vector,
366 })
367 }
368 })
369 .collect::<Result<Vec<_>>>()?;
370
371 Ok(Some(CompatFields {
372 actual_fields,
373 index_or_defaults,
374 }))
375}
376
377#[derive(Debug)]
379enum IndexOrDefault {
380 Index {
382 pos: usize,
383 cast_type: Option<ConcreteDataType>,
384 },
385 DefaultValue {
387 column_id: ColumnId,
389 default_vector: VectorRef,
391 },
392}
393
394struct RewritePrimaryKey {
396 original: Arc<dyn PrimaryKeyCodec>,
398 new: Arc<dyn PrimaryKeyCodec>,
400 fields: Vec<ColumnId>,
402}
403
404impl RewritePrimaryKey {
405 fn compat(&self, mut batch: Batch) -> Result<Batch> {
407 let values = if let Some(pk_values) = batch.pk_values() {
408 pk_values
409 } else {
410 let new_pk_values = self
411 .original
412 .decode(batch.primary_key())
413 .context(DecodeSnafu)?;
414 batch.set_pk_values(new_pk_values);
415 batch.pk_values().as_ref().unwrap()
417 };
418
419 let mut buffer = Vec::with_capacity(
420 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
421 );
422 match values {
423 CompositeValues::Dense(values) => {
424 self.new
425 .encode_values(values.as_slice(), &mut buffer)
426 .context(EncodeSnafu)?;
427 }
428 CompositeValues::Sparse(values) => {
429 let values = self
430 .fields
431 .iter()
432 .map(|id| {
433 let value = values.get_or_null(*id);
434 (*id, value.as_value_ref())
435 })
436 .collect::<Vec<_>>();
437 self.new
438 .encode_value_refs(&values, &mut buffer)
439 .context(EncodeSnafu)?;
440 }
441 }
442 batch.set_primary_key(buffer);
443
444 Ok(batch)
445 }
446}
447
448#[cfg(test)]
449mod tests {
450 use std::sync::Arc;
451
452 use api::v1::{OpType, SemanticType};
453 use datatypes::prelude::ConcreteDataType;
454 use datatypes::schema::ColumnSchema;
455 use datatypes::value::ValueRef;
456 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
457 use mito_codec::row_converter::{
458 DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
459 };
460 use store_api::codec::PrimaryKeyEncoding;
461 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
462 use store_api::storage::RegionId;
463
464 use super::*;
465 use crate::test_util::{check_reader_result, VecBatchReader};
466
467 fn new_metadata(
469 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
470 primary_key: &[ColumnId],
471 ) -> RegionMetadata {
472 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
473 for (id, semantic_type, data_type) in semantic_types {
474 let column_schema = match semantic_type {
475 SemanticType::Tag => {
476 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
477 }
478 SemanticType::Field => {
479 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
480 }
481 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
482 };
483
484 builder.push_column_metadata(ColumnMetadata {
485 column_schema,
486 semantic_type: *semantic_type,
487 column_id: *id,
488 });
489 }
490 builder.primary_key(primary_key.to_vec());
491 builder.build().unwrap()
492 }
493
494 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
496 let fields = (0..keys.len())
497 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
498 .collect();
499 let converter = DensePrimaryKeyCodec::with_fields(fields);
500 let row = keys.iter().map(|str_opt| match str_opt {
501 Some(v) => ValueRef::String(v),
502 None => ValueRef::Null,
503 });
504
505 converter.encode(row).unwrap()
506 }
507
508 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
510 let fields = (0..keys.len())
511 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
512 .collect();
513 let converter = SparsePrimaryKeyCodec::with_fields(fields);
514 let row = keys
515 .iter()
516 .map(|(id, str_opt)| match str_opt {
517 Some(v) => (*id, ValueRef::String(v)),
518 None => (*id, ValueRef::Null),
519 })
520 .collect::<Vec<_>>();
521 let mut buffer = vec![];
522 converter.encode_value_refs(&row, &mut buffer).unwrap();
523 buffer
524 }
525
526 fn new_batch(
530 primary_key: &[u8],
531 fields: &[(ColumnId, bool)],
532 start_ts: i64,
533 num_rows: usize,
534 ) -> Batch {
535 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
536 start_ts..start_ts + num_rows as i64,
537 ));
538 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
539 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
540 let field_columns = fields
541 .iter()
542 .map(|(id, is_null)| {
543 let data = if *is_null {
544 Arc::new(Int64Vector::from(vec![None; num_rows]))
545 } else {
546 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
547 };
548 BatchColumn {
549 column_id: *id,
550 data,
551 }
552 })
553 .collect();
554 Batch::new(
555 primary_key.to_vec(),
556 timestamps,
557 sequences,
558 op_types,
559 field_columns,
560 )
561 .unwrap()
562 }
563
564 #[test]
565 fn test_invalid_pk_len() {
566 let reader_meta = new_metadata(
567 &[
568 (
569 0,
570 SemanticType::Timestamp,
571 ConcreteDataType::timestamp_millisecond_datatype(),
572 ),
573 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
574 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
575 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
576 ],
577 &[1, 2],
578 );
579 let expect_meta = new_metadata(
580 &[
581 (
582 0,
583 SemanticType::Timestamp,
584 ConcreteDataType::timestamp_millisecond_datatype(),
585 ),
586 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
587 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
588 ],
589 &[1],
590 );
591 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
592 }
593
594 #[test]
595 fn test_different_pk() {
596 let reader_meta = new_metadata(
597 &[
598 (
599 0,
600 SemanticType::Timestamp,
601 ConcreteDataType::timestamp_millisecond_datatype(),
602 ),
603 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
604 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
605 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
606 ],
607 &[2, 1],
608 );
609 let expect_meta = new_metadata(
610 &[
611 (
612 0,
613 SemanticType::Timestamp,
614 ConcreteDataType::timestamp_millisecond_datatype(),
615 ),
616 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
617 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
618 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
619 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
620 ],
621 &[1, 2, 4],
622 );
623 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
624 }
625
626 #[test]
627 fn test_same_pk() {
628 let reader_meta = new_metadata(
629 &[
630 (
631 0,
632 SemanticType::Timestamp,
633 ConcreteDataType::timestamp_millisecond_datatype(),
634 ),
635 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
636 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
637 ],
638 &[1],
639 );
640 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
641 .unwrap()
642 .is_none());
643 }
644
645 #[test]
646 fn test_same_pk_encoding() {
647 let reader_meta = Arc::new(new_metadata(
648 &[
649 (
650 0,
651 SemanticType::Timestamp,
652 ConcreteDataType::timestamp_millisecond_datatype(),
653 ),
654 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
655 ],
656 &[1],
657 ));
658
659 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
660 .unwrap()
661 .is_none());
662 }
663
664 #[test]
665 fn test_same_fields() {
666 let reader_meta = Arc::new(new_metadata(
667 &[
668 (
669 0,
670 SemanticType::Timestamp,
671 ConcreteDataType::timestamp_millisecond_datatype(),
672 ),
673 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
674 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
675 ],
676 &[1],
677 ));
678 let mapper = ProjectionMapper::all(&reader_meta).unwrap();
679 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
680 }
681
682 #[tokio::test]
683 async fn test_compat_reader() {
684 let reader_meta = Arc::new(new_metadata(
685 &[
686 (
687 0,
688 SemanticType::Timestamp,
689 ConcreteDataType::timestamp_millisecond_datatype(),
690 ),
691 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
692 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
693 ],
694 &[1],
695 ));
696 let expect_meta = Arc::new(new_metadata(
697 &[
698 (
699 0,
700 SemanticType::Timestamp,
701 ConcreteDataType::timestamp_millisecond_datatype(),
702 ),
703 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
704 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
705 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
706 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
707 ],
708 &[1, 3],
709 ));
710 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
711 let k1 = encode_key(&[Some("a")]);
712 let k2 = encode_key(&[Some("b")]);
713 let source_reader = VecBatchReader::new(&[
714 new_batch(&k1, &[(2, false)], 1000, 3),
715 new_batch(&k2, &[(2, false)], 1000, 3),
716 ]);
717
718 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
719 let k1 = encode_key(&[Some("a"), None]);
720 let k2 = encode_key(&[Some("b"), None]);
721 check_reader_result(
722 &mut compat_reader,
723 &[
724 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
725 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
726 ],
727 )
728 .await;
729 }
730
731 #[tokio::test]
732 async fn test_compat_reader_different_order() {
733 let reader_meta = Arc::new(new_metadata(
734 &[
735 (
736 0,
737 SemanticType::Timestamp,
738 ConcreteDataType::timestamp_millisecond_datatype(),
739 ),
740 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
741 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
742 ],
743 &[1],
744 ));
745 let expect_meta = Arc::new(new_metadata(
746 &[
747 (
748 0,
749 SemanticType::Timestamp,
750 ConcreteDataType::timestamp_millisecond_datatype(),
751 ),
752 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
753 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
754 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
755 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
756 ],
757 &[1],
758 ));
759 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
760 let k1 = encode_key(&[Some("a")]);
761 let k2 = encode_key(&[Some("b")]);
762 let source_reader = VecBatchReader::new(&[
763 new_batch(&k1, &[(2, false)], 1000, 3),
764 new_batch(&k2, &[(2, false)], 1000, 3),
765 ]);
766
767 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
768 check_reader_result(
769 &mut compat_reader,
770 &[
771 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
772 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
773 ],
774 )
775 .await;
776 }
777
778 #[tokio::test]
779 async fn test_compat_reader_different_types() {
780 let actual_meta = Arc::new(new_metadata(
781 &[
782 (
783 0,
784 SemanticType::Timestamp,
785 ConcreteDataType::timestamp_millisecond_datatype(),
786 ),
787 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
788 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
789 ],
790 &[1],
791 ));
792 let expect_meta = Arc::new(new_metadata(
793 &[
794 (
795 0,
796 SemanticType::Timestamp,
797 ConcreteDataType::timestamp_millisecond_datatype(),
798 ),
799 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
800 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
801 ],
802 &[1],
803 ));
804 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
805 let k1 = encode_key(&[Some("a")]);
806 let k2 = encode_key(&[Some("b")]);
807 let source_reader = VecBatchReader::new(&[
808 new_batch(&k1, &[(2, false)], 1000, 3),
809 new_batch(&k2, &[(2, false)], 1000, 3),
810 ]);
811
812 let fn_batch_cast = |batch: Batch| {
813 let mut new_fields = batch.fields.clone();
814 new_fields[0].data = new_fields[0]
815 .data
816 .cast(&ConcreteDataType::string_datatype())
817 .unwrap();
818
819 batch.with_fields(new_fields).unwrap()
820 };
821 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
822 check_reader_result(
823 &mut compat_reader,
824 &[
825 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
826 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
827 ],
828 )
829 .await;
830 }
831
832 #[tokio::test]
833 async fn test_compat_reader_projection() {
834 let reader_meta = Arc::new(new_metadata(
835 &[
836 (
837 0,
838 SemanticType::Timestamp,
839 ConcreteDataType::timestamp_millisecond_datatype(),
840 ),
841 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
842 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
843 ],
844 &[1],
845 ));
846 let expect_meta = Arc::new(new_metadata(
847 &[
848 (
849 0,
850 SemanticType::Timestamp,
851 ConcreteDataType::timestamp_millisecond_datatype(),
852 ),
853 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
854 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
855 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
856 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
857 ],
858 &[1],
859 ));
860 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
862 let k1 = encode_key(&[Some("a")]);
863 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
864
865 let mut compat_reader =
866 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
867 check_reader_result(
868 &mut compat_reader,
869 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
870 )
871 .await;
872
873 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
875 let k1 = encode_key(&[Some("a")]);
876 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
877
878 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
879 check_reader_result(
880 &mut compat_reader,
881 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
882 )
883 .await;
884 }
885
886 #[tokio::test]
887 async fn test_compat_reader_different_pk_encoding() {
888 let mut reader_meta = new_metadata(
889 &[
890 (
891 0,
892 SemanticType::Timestamp,
893 ConcreteDataType::timestamp_millisecond_datatype(),
894 ),
895 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
896 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
897 ],
898 &[1],
899 );
900 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
901 let reader_meta = Arc::new(reader_meta);
902 let mut expect_meta = new_metadata(
903 &[
904 (
905 0,
906 SemanticType::Timestamp,
907 ConcreteDataType::timestamp_millisecond_datatype(),
908 ),
909 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
910 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
911 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
912 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
913 ],
914 &[1, 3],
915 );
916 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
917 let expect_meta = Arc::new(expect_meta);
918
919 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
920 let k1 = encode_key(&[Some("a")]);
921 let k2 = encode_key(&[Some("b")]);
922 let source_reader = VecBatchReader::new(&[
923 new_batch(&k1, &[(2, false)], 1000, 3),
924 new_batch(&k2, &[(2, false)], 1000, 3),
925 ]);
926
927 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
928 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
929 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
930 check_reader_result(
931 &mut compat_reader,
932 &[
933 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
934 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
935 ],
936 )
937 .await;
938 }
939}