1use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::metadata::{RegionMetadata, RegionMetadataRef};
25use store_api::storage::ColumnId;
26
27use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
28use crate::read::projection::ProjectionMapper;
29use crate::read::{Batch, BatchColumn, BatchReader};
30use crate::row_converter::{
31 build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
32 SortField,
33};
34
35pub struct CompatReader<R> {
37 reader: R,
39 compat: CompatBatch,
41}
42
43impl<R> CompatReader<R> {
44 pub fn new(
49 mapper: &ProjectionMapper,
50 reader_meta: RegionMetadataRef,
51 reader: R,
52 ) -> Result<CompatReader<R>> {
53 Ok(CompatReader {
54 reader,
55 compat: CompatBatch::new(mapper, reader_meta)?,
56 })
57 }
58}
59
60#[async_trait::async_trait]
61impl<R: BatchReader> BatchReader for CompatReader<R> {
62 async fn next_batch(&mut self) -> Result<Option<Batch>> {
63 let Some(mut batch) = self.reader.next_batch().await? else {
64 return Ok(None);
65 };
66
67 batch = self.compat.compat_batch(batch)?;
68
69 Ok(Some(batch))
70 }
71}
72
73pub(crate) struct CompatBatch {
75 rewrite_pk: Option<RewritePrimaryKey>,
77 compat_pk: Option<CompatPrimaryKey>,
79 compat_fields: Option<CompatFields>,
81}
82
83impl CompatBatch {
84 pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
88 let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
89 let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
90 let compat_fields = may_compat_fields(mapper, &reader_meta)?;
91
92 Ok(Self {
93 rewrite_pk,
94 compat_pk,
95 compat_fields,
96 })
97 }
98
99 pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
101 if let Some(rewrite_pk) = &self.rewrite_pk {
102 batch = rewrite_pk.compat(batch)?;
103 }
104 if let Some(compat_pk) = &self.compat_pk {
105 batch = compat_pk.compat(batch)?;
106 }
107 if let Some(compat_fields) = &self.compat_fields {
108 batch = compat_fields.compat(batch);
109 }
110
111 Ok(batch)
112 }
113}
114
115pub(crate) fn has_same_columns_and_pk_encoding(
117 left: &RegionMetadata,
118 right: &RegionMetadata,
119) -> bool {
120 if left.primary_key_encoding != right.primary_key_encoding {
121 return false;
122 }
123
124 if left.column_metadatas.len() != right.column_metadatas.len() {
125 return false;
126 }
127
128 for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
129 if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
130 return false;
131 }
132 debug_assert_eq!(
133 left_col.column_schema.data_type,
134 right_col.column_schema.data_type
135 );
136 debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
137 }
138
139 true
140}
141
142#[derive(Debug)]
144struct CompatPrimaryKey {
145 converter: Arc<dyn PrimaryKeyCodec>,
147 values: Vec<(ColumnId, Value)>,
149}
150
151impl CompatPrimaryKey {
152 fn compat(&self, mut batch: Batch) -> Result<Batch> {
154 let mut buffer = Vec::with_capacity(
155 batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
156 );
157 buffer.extend_from_slice(batch.primary_key());
158 self.converter.encode_values(&self.values, &mut buffer)?;
159
160 batch.set_primary_key(buffer);
161
162 if let Some(pk_values) = &mut batch.pk_values {
164 pk_values.extend(&self.values);
165 }
166
167 Ok(batch)
168 }
169}
170
171#[derive(Debug)]
173struct CompatFields {
174 actual_fields: Vec<(ColumnId, ConcreteDataType)>,
176 index_or_defaults: Vec<IndexOrDefault>,
178}
179
180impl CompatFields {
181 #[must_use]
183 fn compat(&self, batch: Batch) -> Batch {
184 debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
185 debug_assert!(self
186 .actual_fields
187 .iter()
188 .zip(batch.fields())
189 .all(|((id, _), batch_column)| *id == batch_column.column_id));
190
191 let len = batch.num_rows();
192 let fields = self
193 .index_or_defaults
194 .iter()
195 .map(|index_or_default| match index_or_default {
196 IndexOrDefault::Index { pos, cast_type } => {
197 let old_column = &batch.fields()[*pos];
198
199 let data = if let Some(ty) = cast_type {
200 old_column.data.cast(ty).unwrap()
203 } else {
204 old_column.data.clone()
205 };
206 BatchColumn {
207 column_id: old_column.column_id,
208 data,
209 }
210 }
211 IndexOrDefault::DefaultValue {
212 column_id,
213 default_vector,
214 } => {
215 let data = default_vector.replicate(&[len]);
216 BatchColumn {
217 column_id: *column_id,
218 data,
219 }
220 }
221 })
222 .collect();
223
224 batch.with_fields(fields).unwrap()
226 }
227}
228
229fn may_rewrite_primary_key(
230 expect: &RegionMetadata,
231 actual: &RegionMetadata,
232) -> Option<RewritePrimaryKey> {
233 if expect.primary_key_encoding == actual.primary_key_encoding {
234 return None;
235 }
236
237 let fields = expect.primary_key.clone();
238 let original = build_primary_key_codec(actual);
239 let new = build_primary_key_codec(expect);
240
241 Some(RewritePrimaryKey {
242 original,
243 new,
244 fields,
245 })
246}
247
248fn may_compat_primary_key(
250 expect: &RegionMetadata,
251 actual: &RegionMetadata,
252) -> Result<Option<CompatPrimaryKey>> {
253 ensure!(
254 actual.primary_key.len() <= expect.primary_key.len(),
255 CompatReaderSnafu {
256 region_id: expect.region_id,
257 reason: format!(
258 "primary key has more columns {} than expect {}",
259 actual.primary_key.len(),
260 expect.primary_key.len()
261 ),
262 }
263 );
264 ensure!(
265 actual.primary_key == expect.primary_key[..actual.primary_key.len()],
266 CompatReaderSnafu {
267 region_id: expect.region_id,
268 reason: format!(
269 "primary key has different prefix, expect: {:?}, actual: {:?}",
270 expect.primary_key, actual.primary_key
271 ),
272 }
273 );
274 if actual.primary_key.len() == expect.primary_key.len() {
275 return Ok(None);
276 }
277
278 let to_add = &expect.primary_key[actual.primary_key.len()..];
280 let mut fields = Vec::with_capacity(to_add.len());
281 let mut values = Vec::with_capacity(to_add.len());
282 for column_id in to_add {
283 let column = expect.column_by_id(*column_id).unwrap();
285 fields.push((
286 *column_id,
287 SortField::new(column.column_schema.data_type.clone()),
288 ));
289 let default_value = column
290 .column_schema
291 .create_default()
292 .context(CreateDefaultSnafu {
293 region_id: expect.region_id,
294 column: &column.column_schema.name,
295 })?
296 .with_context(|| CompatReaderSnafu {
297 region_id: expect.region_id,
298 reason: format!(
299 "key column {} does not have a default value to read",
300 column.column_schema.name
301 ),
302 })?;
303 values.push((*column_id, default_value));
304 }
305 let converter =
307 build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
308
309 Ok(Some(CompatPrimaryKey { converter, values }))
310}
311
312fn may_compat_fields(
314 mapper: &ProjectionMapper,
315 actual: &RegionMetadata,
316) -> Result<Option<CompatFields>> {
317 let expect_fields = mapper.batch_fields();
318 let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
319 if expect_fields == actual_fields {
320 return Ok(None);
321 }
322
323 let source_field_index: HashMap<_, _> = actual_fields
324 .iter()
325 .enumerate()
326 .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
327 .collect();
328
329 let index_or_defaults = expect_fields
330 .iter()
331 .map(|(column_id, expect_data_type)| {
332 if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
333 let mut cast_type = None;
334
335 if expect_data_type != *actual_data_type {
336 cast_type = Some(expect_data_type.clone())
337 }
338 Ok(IndexOrDefault::Index {
340 pos: *index,
341 cast_type,
342 })
343 } else {
344 let column = mapper.metadata().column_by_id(*column_id).unwrap();
346 let default_vector = column
348 .column_schema
349 .create_default_vector(1)
350 .context(CreateDefaultSnafu {
351 region_id: mapper.metadata().region_id,
352 column: &column.column_schema.name,
353 })?
354 .with_context(|| CompatReaderSnafu {
355 region_id: mapper.metadata().region_id,
356 reason: format!(
357 "column {} does not have a default value to read",
358 column.column_schema.name
359 ),
360 })?;
361 Ok(IndexOrDefault::DefaultValue {
362 column_id: column.column_id,
363 default_vector,
364 })
365 }
366 })
367 .collect::<Result<Vec<_>>>()?;
368
369 Ok(Some(CompatFields {
370 actual_fields,
371 index_or_defaults,
372 }))
373}
374
375#[derive(Debug)]
377enum IndexOrDefault {
378 Index {
380 pos: usize,
381 cast_type: Option<ConcreteDataType>,
382 },
383 DefaultValue {
385 column_id: ColumnId,
387 default_vector: VectorRef,
389 },
390}
391
392struct RewritePrimaryKey {
394 original: Arc<dyn PrimaryKeyCodec>,
396 new: Arc<dyn PrimaryKeyCodec>,
398 fields: Vec<ColumnId>,
400}
401
402impl RewritePrimaryKey {
403 fn compat(&self, mut batch: Batch) -> Result<Batch> {
405 let values = if let Some(pk_values) = batch.pk_values() {
406 pk_values
407 } else {
408 let new_pk_values = self.original.decode(batch.primary_key())?;
409 batch.set_pk_values(new_pk_values);
410 batch.pk_values().as_ref().unwrap()
412 };
413
414 let mut buffer = Vec::with_capacity(
415 batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
416 );
417 match values {
418 CompositeValues::Dense(values) => {
419 self.new.encode_values(values.as_slice(), &mut buffer)?;
420 }
421 CompositeValues::Sparse(values) => {
422 let values = self
423 .fields
424 .iter()
425 .map(|id| {
426 let value = values.get_or_null(*id);
427 (*id, value.as_value_ref())
428 })
429 .collect::<Vec<_>>();
430 self.new.encode_value_refs(&values, &mut buffer)?;
431 }
432 }
433 batch.set_primary_key(buffer);
434
435 Ok(batch)
436 }
437}
438
439#[cfg(test)]
440mod tests {
441 use std::sync::Arc;
442
443 use api::v1::{OpType, SemanticType};
444 use datatypes::prelude::ConcreteDataType;
445 use datatypes::schema::ColumnSchema;
446 use datatypes::value::ValueRef;
447 use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
448 use store_api::codec::PrimaryKeyEncoding;
449 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
450 use store_api::storage::RegionId;
451
452 use super::*;
453 use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
454 use crate::test_util::{check_reader_result, VecBatchReader};
455
456 fn new_metadata(
458 semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
459 primary_key: &[ColumnId],
460 ) -> RegionMetadata {
461 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
462 for (id, semantic_type, data_type) in semantic_types {
463 let column_schema = match semantic_type {
464 SemanticType::Tag => {
465 ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
466 }
467 SemanticType::Field => {
468 ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
469 }
470 SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
471 };
472
473 builder.push_column_metadata(ColumnMetadata {
474 column_schema,
475 semantic_type: *semantic_type,
476 column_id: *id,
477 });
478 }
479 builder.primary_key(primary_key.to_vec());
480 builder.build().unwrap()
481 }
482
483 fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
485 let fields = (0..keys.len())
486 .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
487 .collect();
488 let converter = DensePrimaryKeyCodec::with_fields(fields);
489 let row = keys.iter().map(|str_opt| match str_opt {
490 Some(v) => ValueRef::String(v),
491 None => ValueRef::Null,
492 });
493
494 converter.encode(row).unwrap()
495 }
496
497 fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
499 let fields = (0..keys.len())
500 .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
501 .collect();
502 let converter = SparsePrimaryKeyCodec::with_fields(fields);
503 let row = keys
504 .iter()
505 .map(|(id, str_opt)| match str_opt {
506 Some(v) => (*id, ValueRef::String(v)),
507 None => (*id, ValueRef::Null),
508 })
509 .collect::<Vec<_>>();
510 let mut buffer = vec![];
511 converter.encode_value_refs(&row, &mut buffer).unwrap();
512 buffer
513 }
514
515 fn new_batch(
519 primary_key: &[u8],
520 fields: &[(ColumnId, bool)],
521 start_ts: i64,
522 num_rows: usize,
523 ) -> Batch {
524 let timestamps = Arc::new(TimestampMillisecondVector::from_values(
525 start_ts..start_ts + num_rows as i64,
526 ));
527 let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
528 let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
529 let field_columns = fields
530 .iter()
531 .map(|(id, is_null)| {
532 let data = if *is_null {
533 Arc::new(Int64Vector::from(vec![None; num_rows]))
534 } else {
535 Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
536 };
537 BatchColumn {
538 column_id: *id,
539 data,
540 }
541 })
542 .collect();
543 Batch::new(
544 primary_key.to_vec(),
545 timestamps,
546 sequences,
547 op_types,
548 field_columns,
549 )
550 .unwrap()
551 }
552
553 #[test]
554 fn test_invalid_pk_len() {
555 let reader_meta = new_metadata(
556 &[
557 (
558 0,
559 SemanticType::Timestamp,
560 ConcreteDataType::timestamp_millisecond_datatype(),
561 ),
562 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
563 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
564 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
565 ],
566 &[1, 2],
567 );
568 let expect_meta = new_metadata(
569 &[
570 (
571 0,
572 SemanticType::Timestamp,
573 ConcreteDataType::timestamp_millisecond_datatype(),
574 ),
575 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
576 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
577 ],
578 &[1],
579 );
580 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
581 }
582
583 #[test]
584 fn test_different_pk() {
585 let reader_meta = new_metadata(
586 &[
587 (
588 0,
589 SemanticType::Timestamp,
590 ConcreteDataType::timestamp_millisecond_datatype(),
591 ),
592 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
593 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
594 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
595 ],
596 &[2, 1],
597 );
598 let expect_meta = new_metadata(
599 &[
600 (
601 0,
602 SemanticType::Timestamp,
603 ConcreteDataType::timestamp_millisecond_datatype(),
604 ),
605 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
606 (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
607 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
608 (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
609 ],
610 &[1, 2, 4],
611 );
612 may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
613 }
614
615 #[test]
616 fn test_same_pk() {
617 let reader_meta = new_metadata(
618 &[
619 (
620 0,
621 SemanticType::Timestamp,
622 ConcreteDataType::timestamp_millisecond_datatype(),
623 ),
624 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
625 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
626 ],
627 &[1],
628 );
629 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
630 .unwrap()
631 .is_none());
632 }
633
634 #[test]
635 fn test_same_pk_encoding() {
636 let reader_meta = Arc::new(new_metadata(
637 &[
638 (
639 0,
640 SemanticType::Timestamp,
641 ConcreteDataType::timestamp_millisecond_datatype(),
642 ),
643 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
644 ],
645 &[1],
646 ));
647
648 assert!(may_compat_primary_key(&reader_meta, &reader_meta)
649 .unwrap()
650 .is_none());
651 }
652
653 #[test]
654 fn test_same_fields() {
655 let reader_meta = Arc::new(new_metadata(
656 &[
657 (
658 0,
659 SemanticType::Timestamp,
660 ConcreteDataType::timestamp_millisecond_datatype(),
661 ),
662 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
663 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
664 ],
665 &[1],
666 ));
667 let mapper = ProjectionMapper::all(&reader_meta).unwrap();
668 assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
669 }
670
671 #[tokio::test]
672 async fn test_compat_reader() {
673 let reader_meta = Arc::new(new_metadata(
674 &[
675 (
676 0,
677 SemanticType::Timestamp,
678 ConcreteDataType::timestamp_millisecond_datatype(),
679 ),
680 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
681 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
682 ],
683 &[1],
684 ));
685 let expect_meta = Arc::new(new_metadata(
686 &[
687 (
688 0,
689 SemanticType::Timestamp,
690 ConcreteDataType::timestamp_millisecond_datatype(),
691 ),
692 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
693 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
694 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
695 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
696 ],
697 &[1, 3],
698 ));
699 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
700 let k1 = encode_key(&[Some("a")]);
701 let k2 = encode_key(&[Some("b")]);
702 let source_reader = VecBatchReader::new(&[
703 new_batch(&k1, &[(2, false)], 1000, 3),
704 new_batch(&k2, &[(2, false)], 1000, 3),
705 ]);
706
707 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
708 let k1 = encode_key(&[Some("a"), None]);
709 let k2 = encode_key(&[Some("b"), None]);
710 check_reader_result(
711 &mut compat_reader,
712 &[
713 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
714 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
715 ],
716 )
717 .await;
718 }
719
720 #[tokio::test]
721 async fn test_compat_reader_different_order() {
722 let reader_meta = Arc::new(new_metadata(
723 &[
724 (
725 0,
726 SemanticType::Timestamp,
727 ConcreteDataType::timestamp_millisecond_datatype(),
728 ),
729 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
730 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
731 ],
732 &[1],
733 ));
734 let expect_meta = Arc::new(new_metadata(
735 &[
736 (
737 0,
738 SemanticType::Timestamp,
739 ConcreteDataType::timestamp_millisecond_datatype(),
740 ),
741 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
742 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
743 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
744 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
745 ],
746 &[1],
747 ));
748 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
749 let k1 = encode_key(&[Some("a")]);
750 let k2 = encode_key(&[Some("b")]);
751 let source_reader = VecBatchReader::new(&[
752 new_batch(&k1, &[(2, false)], 1000, 3),
753 new_batch(&k2, &[(2, false)], 1000, 3),
754 ]);
755
756 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
757 check_reader_result(
758 &mut compat_reader,
759 &[
760 new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
761 new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
762 ],
763 )
764 .await;
765 }
766
767 #[tokio::test]
768 async fn test_compat_reader_different_types() {
769 let actual_meta = Arc::new(new_metadata(
770 &[
771 (
772 0,
773 SemanticType::Timestamp,
774 ConcreteDataType::timestamp_millisecond_datatype(),
775 ),
776 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
777 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
778 ],
779 &[1],
780 ));
781 let expect_meta = Arc::new(new_metadata(
782 &[
783 (
784 0,
785 SemanticType::Timestamp,
786 ConcreteDataType::timestamp_millisecond_datatype(),
787 ),
788 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
789 (2, SemanticType::Field, ConcreteDataType::string_datatype()),
790 ],
791 &[1],
792 ));
793 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
794 let k1 = encode_key(&[Some("a")]);
795 let k2 = encode_key(&[Some("b")]);
796 let source_reader = VecBatchReader::new(&[
797 new_batch(&k1, &[(2, false)], 1000, 3),
798 new_batch(&k2, &[(2, false)], 1000, 3),
799 ]);
800
801 let fn_batch_cast = |batch: Batch| {
802 let mut new_fields = batch.fields.clone();
803 new_fields[0].data = new_fields[0]
804 .data
805 .cast(&ConcreteDataType::string_datatype())
806 .unwrap();
807
808 batch.with_fields(new_fields).unwrap()
809 };
810 let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
811 check_reader_result(
812 &mut compat_reader,
813 &[
814 fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
815 fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
816 ],
817 )
818 .await;
819 }
820
821 #[tokio::test]
822 async fn test_compat_reader_projection() {
823 let reader_meta = Arc::new(new_metadata(
824 &[
825 (
826 0,
827 SemanticType::Timestamp,
828 ConcreteDataType::timestamp_millisecond_datatype(),
829 ),
830 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
831 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
832 ],
833 &[1],
834 ));
835 let expect_meta = Arc::new(new_metadata(
836 &[
837 (
838 0,
839 SemanticType::Timestamp,
840 ConcreteDataType::timestamp_millisecond_datatype(),
841 ),
842 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
843 (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
844 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
845 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
846 ],
847 &[1],
848 ));
849 let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
851 let k1 = encode_key(&[Some("a")]);
852 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
853
854 let mut compat_reader =
855 CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
856 check_reader_result(
857 &mut compat_reader,
858 &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
859 )
860 .await;
861
862 let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
864 let k1 = encode_key(&[Some("a")]);
865 let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
866
867 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
868 check_reader_result(
869 &mut compat_reader,
870 &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
871 )
872 .await;
873 }
874
875 #[tokio::test]
876 async fn test_compat_reader_different_pk_encoding() {
877 let mut reader_meta = new_metadata(
878 &[
879 (
880 0,
881 SemanticType::Timestamp,
882 ConcreteDataType::timestamp_millisecond_datatype(),
883 ),
884 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
885 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
886 ],
887 &[1],
888 );
889 reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
890 let reader_meta = Arc::new(reader_meta);
891 let mut expect_meta = new_metadata(
892 &[
893 (
894 0,
895 SemanticType::Timestamp,
896 ConcreteDataType::timestamp_millisecond_datatype(),
897 ),
898 (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
899 (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
900 (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
901 (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
902 ],
903 &[1, 3],
904 );
905 expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
906 let expect_meta = Arc::new(expect_meta);
907
908 let mapper = ProjectionMapper::all(&expect_meta).unwrap();
909 let k1 = encode_key(&[Some("a")]);
910 let k2 = encode_key(&[Some("b")]);
911 let source_reader = VecBatchReader::new(&[
912 new_batch(&k1, &[(2, false)], 1000, 3),
913 new_batch(&k2, &[(2, false)], 1000, 3),
914 ]);
915
916 let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
917 let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
918 let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
919 check_reader_result(
920 &mut compat_reader,
921 &[
922 new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
923 new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
924 ],
925 )
926 .await;
927 }
928}