mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use snafu::{ensure, OptionExt, ResultExt};
24use store_api::metadata::{RegionMetadata, RegionMetadataRef};
25use store_api::storage::ColumnId;
26
27use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, Result};
28use crate::read::projection::ProjectionMapper;
29use crate::read::{Batch, BatchColumn, BatchReader};
30use crate::row_converter::{
31    build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
32    SortField,
33};
34
35/// Reader to adapt schema of underlying reader to expected schema.
36pub struct CompatReader<R> {
37    /// Underlying reader.
38    reader: R,
39    /// Helper to compat batches.
40    compat: CompatBatch,
41}
42
43impl<R> CompatReader<R> {
44    /// Creates a new compat reader.
45    /// - `mapper` is built from the metadata users expect to see.
46    /// - `reader_meta` is the metadata of the input reader.
47    /// - `reader` is the input reader.
48    pub fn new(
49        mapper: &ProjectionMapper,
50        reader_meta: RegionMetadataRef,
51        reader: R,
52    ) -> Result<CompatReader<R>> {
53        Ok(CompatReader {
54            reader,
55            compat: CompatBatch::new(mapper, reader_meta)?,
56        })
57    }
58}
59
60#[async_trait::async_trait]
61impl<R: BatchReader> BatchReader for CompatReader<R> {
62    async fn next_batch(&mut self) -> Result<Option<Batch>> {
63        let Some(mut batch) = self.reader.next_batch().await? else {
64            return Ok(None);
65        };
66
67        batch = self.compat.compat_batch(batch)?;
68
69        Ok(Some(batch))
70    }
71}
72
73/// A helper struct to adapt schema of the batch to an expected schema.
74pub(crate) struct CompatBatch {
75    /// Optional primary key adapter.
76    rewrite_pk: Option<RewritePrimaryKey>,
77    /// Optional primary key adapter.
78    compat_pk: Option<CompatPrimaryKey>,
79    /// Optional fields adapter.
80    compat_fields: Option<CompatFields>,
81}
82
83impl CompatBatch {
84    /// Creates a new [CompatBatch].
85    /// - `mapper` is built from the metadata users expect to see.
86    /// - `reader_meta` is the metadata of the input reader.
87    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
88        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
89        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
90        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
91
92        Ok(Self {
93            rewrite_pk,
94            compat_pk,
95            compat_fields,
96        })
97    }
98
99    /// Adapts the `batch` to the expected schema.
100    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
101        if let Some(rewrite_pk) = &self.rewrite_pk {
102            batch = rewrite_pk.compat(batch)?;
103        }
104        if let Some(compat_pk) = &self.compat_pk {
105            batch = compat_pk.compat(batch)?;
106        }
107        if let Some(compat_fields) = &self.compat_fields {
108            batch = compat_fields.compat(batch);
109        }
110
111        Ok(batch)
112    }
113}
114
115/// Returns true if `left` and `right` have same columns and primary key encoding.
116pub(crate) fn has_same_columns_and_pk_encoding(
117    left: &RegionMetadata,
118    right: &RegionMetadata,
119) -> bool {
120    if left.primary_key_encoding != right.primary_key_encoding {
121        return false;
122    }
123
124    if left.column_metadatas.len() != right.column_metadatas.len() {
125        return false;
126    }
127
128    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
129        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
130            return false;
131        }
132        debug_assert_eq!(
133            left_col.column_schema.data_type,
134            right_col.column_schema.data_type
135        );
136        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
137    }
138
139    true
140}
141
142/// Helper to make primary key compatible.
143#[derive(Debug)]
144struct CompatPrimaryKey {
145    /// Row converter to append values to primary keys.
146    converter: Arc<dyn PrimaryKeyCodec>,
147    /// Default values to append.
148    values: Vec<(ColumnId, Value)>,
149}
150
151impl CompatPrimaryKey {
152    /// Make primary key of the `batch` compatible.
153    fn compat(&self, mut batch: Batch) -> Result<Batch> {
154        let mut buffer = Vec::with_capacity(
155            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
156        );
157        buffer.extend_from_slice(batch.primary_key());
158        self.converter.encode_values(&self.values, &mut buffer)?;
159
160        batch.set_primary_key(buffer);
161
162        // update cache
163        if let Some(pk_values) = &mut batch.pk_values {
164            pk_values.extend(&self.values);
165        }
166
167        Ok(batch)
168    }
169}
170
171/// Helper to make fields compatible.
172#[derive(Debug)]
173struct CompatFields {
174    /// Column Ids and DataTypes the reader actually returns.
175    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
176    /// Indices to convert actual fields to expect fields.
177    index_or_defaults: Vec<IndexOrDefault>,
178}
179
180impl CompatFields {
181    /// Make fields of the `batch` compatible.
182    #[must_use]
183    fn compat(&self, batch: Batch) -> Batch {
184        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
185        debug_assert!(self
186            .actual_fields
187            .iter()
188            .zip(batch.fields())
189            .all(|((id, _), batch_column)| *id == batch_column.column_id));
190
191        let len = batch.num_rows();
192        let fields = self
193            .index_or_defaults
194            .iter()
195            .map(|index_or_default| match index_or_default {
196                IndexOrDefault::Index { pos, cast_type } => {
197                    let old_column = &batch.fields()[*pos];
198
199                    let data = if let Some(ty) = cast_type {
200                        // Safety: We ensure type can be converted and the new batch should be valid.
201                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
202                        old_column.data.cast(ty).unwrap()
203                    } else {
204                        old_column.data.clone()
205                    };
206                    BatchColumn {
207                        column_id: old_column.column_id,
208                        data,
209                    }
210                }
211                IndexOrDefault::DefaultValue {
212                    column_id,
213                    default_vector,
214                } => {
215                    let data = default_vector.replicate(&[len]);
216                    BatchColumn {
217                        column_id: *column_id,
218                        data,
219                    }
220                }
221            })
222            .collect();
223
224        // Safety: We ensure all columns have the same length and the new batch should be valid.
225        batch.with_fields(fields).unwrap()
226    }
227}
228
229fn may_rewrite_primary_key(
230    expect: &RegionMetadata,
231    actual: &RegionMetadata,
232) -> Option<RewritePrimaryKey> {
233    if expect.primary_key_encoding == actual.primary_key_encoding {
234        return None;
235    }
236
237    let fields = expect.primary_key.clone();
238    let original = build_primary_key_codec(actual);
239    let new = build_primary_key_codec(expect);
240
241    Some(RewritePrimaryKey {
242        original,
243        new,
244        fields,
245    })
246}
247
248/// Creates a [CompatPrimaryKey] if needed.
249fn may_compat_primary_key(
250    expect: &RegionMetadata,
251    actual: &RegionMetadata,
252) -> Result<Option<CompatPrimaryKey>> {
253    ensure!(
254        actual.primary_key.len() <= expect.primary_key.len(),
255        CompatReaderSnafu {
256            region_id: expect.region_id,
257            reason: format!(
258                "primary key has more columns {} than expect {}",
259                actual.primary_key.len(),
260                expect.primary_key.len()
261            ),
262        }
263    );
264    ensure!(
265        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
266        CompatReaderSnafu {
267            region_id: expect.region_id,
268            reason: format!(
269                "primary key has different prefix, expect: {:?}, actual: {:?}",
270                expect.primary_key, actual.primary_key
271            ),
272        }
273    );
274    if actual.primary_key.len() == expect.primary_key.len() {
275        return Ok(None);
276    }
277
278    // We need to append default values to the primary key.
279    let to_add = &expect.primary_key[actual.primary_key.len()..];
280    let mut fields = Vec::with_capacity(to_add.len());
281    let mut values = Vec::with_capacity(to_add.len());
282    for column_id in to_add {
283        // Safety: The id comes from expect region metadata.
284        let column = expect.column_by_id(*column_id).unwrap();
285        fields.push((
286            *column_id,
287            SortField::new(column.column_schema.data_type.clone()),
288        ));
289        let default_value = column
290            .column_schema
291            .create_default()
292            .context(CreateDefaultSnafu {
293                region_id: expect.region_id,
294                column: &column.column_schema.name,
295            })?
296            .with_context(|| CompatReaderSnafu {
297                region_id: expect.region_id,
298                reason: format!(
299                    "key column {} does not have a default value to read",
300                    column.column_schema.name
301                ),
302            })?;
303        values.push((*column_id, default_value));
304    }
305    // Using expect primary key encoding to build the converter
306    let converter =
307        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
308
309    Ok(Some(CompatPrimaryKey { converter, values }))
310}
311
312/// Creates a [CompatFields] if needed.
313fn may_compat_fields(
314    mapper: &ProjectionMapper,
315    actual: &RegionMetadata,
316) -> Result<Option<CompatFields>> {
317    let expect_fields = mapper.batch_fields();
318    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
319    if expect_fields == actual_fields {
320        return Ok(None);
321    }
322
323    let source_field_index: HashMap<_, _> = actual_fields
324        .iter()
325        .enumerate()
326        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
327        .collect();
328
329    let index_or_defaults = expect_fields
330        .iter()
331        .map(|(column_id, expect_data_type)| {
332            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
333                let mut cast_type = None;
334
335                if expect_data_type != *actual_data_type {
336                    cast_type = Some(expect_data_type.clone())
337                }
338                // Source has this field.
339                Ok(IndexOrDefault::Index {
340                    pos: *index,
341                    cast_type,
342                })
343            } else {
344                // Safety: mapper must have this column.
345                let column = mapper.metadata().column_by_id(*column_id).unwrap();
346                // Create a default vector with 1 element for that column.
347                let default_vector = column
348                    .column_schema
349                    .create_default_vector(1)
350                    .context(CreateDefaultSnafu {
351                        region_id: mapper.metadata().region_id,
352                        column: &column.column_schema.name,
353                    })?
354                    .with_context(|| CompatReaderSnafu {
355                        region_id: mapper.metadata().region_id,
356                        reason: format!(
357                            "column {} does not have a default value to read",
358                            column.column_schema.name
359                        ),
360                    })?;
361                Ok(IndexOrDefault::DefaultValue {
362                    column_id: column.column_id,
363                    default_vector,
364                })
365            }
366        })
367        .collect::<Result<Vec<_>>>()?;
368
369    Ok(Some(CompatFields {
370        actual_fields,
371        index_or_defaults,
372    }))
373}
374
375/// Index in source batch or a default value to fill a column.
376#[derive(Debug)]
377enum IndexOrDefault {
378    /// Index of the column in source batch.
379    Index {
380        pos: usize,
381        cast_type: Option<ConcreteDataType>,
382    },
383    /// Default value for the column.
384    DefaultValue {
385        /// Id of the column.
386        column_id: ColumnId,
387        /// Default value. The vector has only 1 element.
388        default_vector: VectorRef,
389    },
390}
391
392/// Adapter to rewrite primary key.
393struct RewritePrimaryKey {
394    /// Original primary key codec.
395    original: Arc<dyn PrimaryKeyCodec>,
396    /// New primary key codec.
397    new: Arc<dyn PrimaryKeyCodec>,
398    /// Order of the fields in the new primary key.
399    fields: Vec<ColumnId>,
400}
401
402impl RewritePrimaryKey {
403    /// Make primary key of the `batch` compatible.
404    fn compat(&self, mut batch: Batch) -> Result<Batch> {
405        let values = if let Some(pk_values) = batch.pk_values() {
406            pk_values
407        } else {
408            let new_pk_values = self.original.decode(batch.primary_key())?;
409            batch.set_pk_values(new_pk_values);
410            // Safety: We ensure pk_values is not None.
411            batch.pk_values().as_ref().unwrap()
412        };
413
414        let mut buffer = Vec::with_capacity(
415            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
416        );
417        match values {
418            CompositeValues::Dense(values) => {
419                self.new.encode_values(values.as_slice(), &mut buffer)?;
420            }
421            CompositeValues::Sparse(values) => {
422                let values = self
423                    .fields
424                    .iter()
425                    .map(|id| {
426                        let value = values.get_or_null(*id);
427                        (*id, value.as_value_ref())
428                    })
429                    .collect::<Vec<_>>();
430                self.new.encode_value_refs(&values, &mut buffer)?;
431            }
432        }
433        batch.set_primary_key(buffer);
434
435        Ok(batch)
436    }
437}
438
439#[cfg(test)]
440mod tests {
441    use std::sync::Arc;
442
443    use api::v1::{OpType, SemanticType};
444    use datatypes::prelude::ConcreteDataType;
445    use datatypes::schema::ColumnSchema;
446    use datatypes::value::ValueRef;
447    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
448    use store_api::codec::PrimaryKeyEncoding;
449    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
450    use store_api::storage::RegionId;
451
452    use super::*;
453    use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec};
454    use crate::test_util::{check_reader_result, VecBatchReader};
455
456    /// Creates a new [RegionMetadata].
457    fn new_metadata(
458        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
459        primary_key: &[ColumnId],
460    ) -> RegionMetadata {
461        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
462        for (id, semantic_type, data_type) in semantic_types {
463            let column_schema = match semantic_type {
464                SemanticType::Tag => {
465                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
466                }
467                SemanticType::Field => {
468                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
469                }
470                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
471            };
472
473            builder.push_column_metadata(ColumnMetadata {
474                column_schema,
475                semantic_type: *semantic_type,
476                column_id: *id,
477            });
478        }
479        builder.primary_key(primary_key.to_vec());
480        builder.build().unwrap()
481    }
482
483    /// Encode primary key.
484    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
485        let fields = (0..keys.len())
486            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
487            .collect();
488        let converter = DensePrimaryKeyCodec::with_fields(fields);
489        let row = keys.iter().map(|str_opt| match str_opt {
490            Some(v) => ValueRef::String(v),
491            None => ValueRef::Null,
492        });
493
494        converter.encode(row).unwrap()
495    }
496
497    /// Encode sparse primary key.
498    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
499        let fields = (0..keys.len())
500            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
501            .collect();
502        let converter = SparsePrimaryKeyCodec::with_fields(fields);
503        let row = keys
504            .iter()
505            .map(|(id, str_opt)| match str_opt {
506                Some(v) => (*id, ValueRef::String(v)),
507                None => (*id, ValueRef::Null),
508            })
509            .collect::<Vec<_>>();
510        let mut buffer = vec![];
511        converter.encode_value_refs(&row, &mut buffer).unwrap();
512        buffer
513    }
514
515    /// Creates a batch for specific primary `key`.
516    ///
517    /// `fields`: [(column_id of the field, is null)]
518    fn new_batch(
519        primary_key: &[u8],
520        fields: &[(ColumnId, bool)],
521        start_ts: i64,
522        num_rows: usize,
523    ) -> Batch {
524        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
525            start_ts..start_ts + num_rows as i64,
526        ));
527        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
528        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
529        let field_columns = fields
530            .iter()
531            .map(|(id, is_null)| {
532                let data = if *is_null {
533                    Arc::new(Int64Vector::from(vec![None; num_rows]))
534                } else {
535                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
536                };
537                BatchColumn {
538                    column_id: *id,
539                    data,
540                }
541            })
542            .collect();
543        Batch::new(
544            primary_key.to_vec(),
545            timestamps,
546            sequences,
547            op_types,
548            field_columns,
549        )
550        .unwrap()
551    }
552
553    #[test]
554    fn test_invalid_pk_len() {
555        let reader_meta = new_metadata(
556            &[
557                (
558                    0,
559                    SemanticType::Timestamp,
560                    ConcreteDataType::timestamp_millisecond_datatype(),
561                ),
562                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
563                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
564                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
565            ],
566            &[1, 2],
567        );
568        let expect_meta = new_metadata(
569            &[
570                (
571                    0,
572                    SemanticType::Timestamp,
573                    ConcreteDataType::timestamp_millisecond_datatype(),
574                ),
575                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
576                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
577            ],
578            &[1],
579        );
580        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
581    }
582
583    #[test]
584    fn test_different_pk() {
585        let reader_meta = new_metadata(
586            &[
587                (
588                    0,
589                    SemanticType::Timestamp,
590                    ConcreteDataType::timestamp_millisecond_datatype(),
591                ),
592                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
593                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
594                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
595            ],
596            &[2, 1],
597        );
598        let expect_meta = new_metadata(
599            &[
600                (
601                    0,
602                    SemanticType::Timestamp,
603                    ConcreteDataType::timestamp_millisecond_datatype(),
604                ),
605                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
606                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
607                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
608                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
609            ],
610            &[1, 2, 4],
611        );
612        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
613    }
614
615    #[test]
616    fn test_same_pk() {
617        let reader_meta = new_metadata(
618            &[
619                (
620                    0,
621                    SemanticType::Timestamp,
622                    ConcreteDataType::timestamp_millisecond_datatype(),
623                ),
624                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
625                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
626            ],
627            &[1],
628        );
629        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
630            .unwrap()
631            .is_none());
632    }
633
634    #[test]
635    fn test_same_pk_encoding() {
636        let reader_meta = Arc::new(new_metadata(
637            &[
638                (
639                    0,
640                    SemanticType::Timestamp,
641                    ConcreteDataType::timestamp_millisecond_datatype(),
642                ),
643                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
644            ],
645            &[1],
646        ));
647
648        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
649            .unwrap()
650            .is_none());
651    }
652
653    #[test]
654    fn test_same_fields() {
655        let reader_meta = Arc::new(new_metadata(
656            &[
657                (
658                    0,
659                    SemanticType::Timestamp,
660                    ConcreteDataType::timestamp_millisecond_datatype(),
661                ),
662                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
663                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
664            ],
665            &[1],
666        ));
667        let mapper = ProjectionMapper::all(&reader_meta).unwrap();
668        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
669    }
670
671    #[tokio::test]
672    async fn test_compat_reader() {
673        let reader_meta = Arc::new(new_metadata(
674            &[
675                (
676                    0,
677                    SemanticType::Timestamp,
678                    ConcreteDataType::timestamp_millisecond_datatype(),
679                ),
680                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
681                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
682            ],
683            &[1],
684        ));
685        let expect_meta = Arc::new(new_metadata(
686            &[
687                (
688                    0,
689                    SemanticType::Timestamp,
690                    ConcreteDataType::timestamp_millisecond_datatype(),
691                ),
692                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
693                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
694                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
695                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
696            ],
697            &[1, 3],
698        ));
699        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
700        let k1 = encode_key(&[Some("a")]);
701        let k2 = encode_key(&[Some("b")]);
702        let source_reader = VecBatchReader::new(&[
703            new_batch(&k1, &[(2, false)], 1000, 3),
704            new_batch(&k2, &[(2, false)], 1000, 3),
705        ]);
706
707        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
708        let k1 = encode_key(&[Some("a"), None]);
709        let k2 = encode_key(&[Some("b"), None]);
710        check_reader_result(
711            &mut compat_reader,
712            &[
713                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
714                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
715            ],
716        )
717        .await;
718    }
719
720    #[tokio::test]
721    async fn test_compat_reader_different_order() {
722        let reader_meta = Arc::new(new_metadata(
723            &[
724                (
725                    0,
726                    SemanticType::Timestamp,
727                    ConcreteDataType::timestamp_millisecond_datatype(),
728                ),
729                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
730                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
731            ],
732            &[1],
733        ));
734        let expect_meta = Arc::new(new_metadata(
735            &[
736                (
737                    0,
738                    SemanticType::Timestamp,
739                    ConcreteDataType::timestamp_millisecond_datatype(),
740                ),
741                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
742                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
743                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
744                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
745            ],
746            &[1],
747        ));
748        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
749        let k1 = encode_key(&[Some("a")]);
750        let k2 = encode_key(&[Some("b")]);
751        let source_reader = VecBatchReader::new(&[
752            new_batch(&k1, &[(2, false)], 1000, 3),
753            new_batch(&k2, &[(2, false)], 1000, 3),
754        ]);
755
756        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
757        check_reader_result(
758            &mut compat_reader,
759            &[
760                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
761                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
762            ],
763        )
764        .await;
765    }
766
767    #[tokio::test]
768    async fn test_compat_reader_different_types() {
769        let actual_meta = Arc::new(new_metadata(
770            &[
771                (
772                    0,
773                    SemanticType::Timestamp,
774                    ConcreteDataType::timestamp_millisecond_datatype(),
775                ),
776                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
777                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
778            ],
779            &[1],
780        ));
781        let expect_meta = Arc::new(new_metadata(
782            &[
783                (
784                    0,
785                    SemanticType::Timestamp,
786                    ConcreteDataType::timestamp_millisecond_datatype(),
787                ),
788                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
789                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
790            ],
791            &[1],
792        ));
793        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
794        let k1 = encode_key(&[Some("a")]);
795        let k2 = encode_key(&[Some("b")]);
796        let source_reader = VecBatchReader::new(&[
797            new_batch(&k1, &[(2, false)], 1000, 3),
798            new_batch(&k2, &[(2, false)], 1000, 3),
799        ]);
800
801        let fn_batch_cast = |batch: Batch| {
802            let mut new_fields = batch.fields.clone();
803            new_fields[0].data = new_fields[0]
804                .data
805                .cast(&ConcreteDataType::string_datatype())
806                .unwrap();
807
808            batch.with_fields(new_fields).unwrap()
809        };
810        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
811        check_reader_result(
812            &mut compat_reader,
813            &[
814                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
815                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
816            ],
817        )
818        .await;
819    }
820
821    #[tokio::test]
822    async fn test_compat_reader_projection() {
823        let reader_meta = Arc::new(new_metadata(
824            &[
825                (
826                    0,
827                    SemanticType::Timestamp,
828                    ConcreteDataType::timestamp_millisecond_datatype(),
829                ),
830                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
831                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
832            ],
833            &[1],
834        ));
835        let expect_meta = Arc::new(new_metadata(
836            &[
837                (
838                    0,
839                    SemanticType::Timestamp,
840                    ConcreteDataType::timestamp_millisecond_datatype(),
841                ),
842                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
843                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
844                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
845                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
846            ],
847            &[1],
848        ));
849        // tag_1, field_2, field_3
850        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
851        let k1 = encode_key(&[Some("a")]);
852        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
853
854        let mut compat_reader =
855            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
856        check_reader_result(
857            &mut compat_reader,
858            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
859        )
860        .await;
861
862        // tag_1, field_4, field_3
863        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
864        let k1 = encode_key(&[Some("a")]);
865        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
866
867        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
868        check_reader_result(
869            &mut compat_reader,
870            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
871        )
872        .await;
873    }
874
875    #[tokio::test]
876    async fn test_compat_reader_different_pk_encoding() {
877        let mut reader_meta = new_metadata(
878            &[
879                (
880                    0,
881                    SemanticType::Timestamp,
882                    ConcreteDataType::timestamp_millisecond_datatype(),
883                ),
884                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
885                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
886            ],
887            &[1],
888        );
889        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
890        let reader_meta = Arc::new(reader_meta);
891        let mut expect_meta = new_metadata(
892            &[
893                (
894                    0,
895                    SemanticType::Timestamp,
896                    ConcreteDataType::timestamp_millisecond_datatype(),
897                ),
898                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
899                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
900                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
901                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
902            ],
903            &[1, 3],
904        );
905        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
906        let expect_meta = Arc::new(expect_meta);
907
908        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
909        let k1 = encode_key(&[Some("a")]);
910        let k2 = encode_key(&[Some("b")]);
911        let source_reader = VecBatchReader::new(&[
912            new_batch(&k1, &[(2, false)], 1000, 3),
913            new_batch(&k2, &[(2, false)], 1000, 3),
914        ]);
915
916        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
917        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
918        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
919        check_reader_result(
920            &mut compat_reader,
921            &[
922                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
923                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
924            ],
925        )
926        .await;
927    }
928}