mito2/read/
compat.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities to adapt readers with different schema.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use datatypes::data_type::ConcreteDataType;
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use mito_codec::row_converter::{
24    build_primary_key_codec, build_primary_key_codec_with_fields, CompositeValues, PrimaryKeyCodec,
25    SortField,
26};
27use snafu::{ensure, OptionExt, ResultExt};
28use store_api::metadata::{RegionMetadata, RegionMetadataRef};
29use store_api::storage::ColumnId;
30
31use crate::error::{CompatReaderSnafu, CreateDefaultSnafu, DecodeSnafu, EncodeSnafu, Result};
32use crate::read::projection::ProjectionMapper;
33use crate::read::{Batch, BatchColumn, BatchReader};
34
35/// Reader to adapt schema of underlying reader to expected schema.
36pub struct CompatReader<R> {
37    /// Underlying reader.
38    reader: R,
39    /// Helper to compat batches.
40    compat: CompatBatch,
41}
42
43impl<R> CompatReader<R> {
44    /// Creates a new compat reader.
45    /// - `mapper` is built from the metadata users expect to see.
46    /// - `reader_meta` is the metadata of the input reader.
47    /// - `reader` is the input reader.
48    pub fn new(
49        mapper: &ProjectionMapper,
50        reader_meta: RegionMetadataRef,
51        reader: R,
52    ) -> Result<CompatReader<R>> {
53        Ok(CompatReader {
54            reader,
55            compat: CompatBatch::new(mapper, reader_meta)?,
56        })
57    }
58}
59
60#[async_trait::async_trait]
61impl<R: BatchReader> BatchReader for CompatReader<R> {
62    async fn next_batch(&mut self) -> Result<Option<Batch>> {
63        let Some(mut batch) = self.reader.next_batch().await? else {
64            return Ok(None);
65        };
66
67        batch = self.compat.compat_batch(batch)?;
68
69        Ok(Some(batch))
70    }
71}
72
73/// A helper struct to adapt schema of the batch to an expected schema.
74pub(crate) struct CompatBatch {
75    /// Optional primary key adapter.
76    rewrite_pk: Option<RewritePrimaryKey>,
77    /// Optional primary key adapter.
78    compat_pk: Option<CompatPrimaryKey>,
79    /// Optional fields adapter.
80    compat_fields: Option<CompatFields>,
81}
82
83impl CompatBatch {
84    /// Creates a new [CompatBatch].
85    /// - `mapper` is built from the metadata users expect to see.
86    /// - `reader_meta` is the metadata of the input reader.
87    pub(crate) fn new(mapper: &ProjectionMapper, reader_meta: RegionMetadataRef) -> Result<Self> {
88        let rewrite_pk = may_rewrite_primary_key(mapper.metadata(), &reader_meta);
89        let compat_pk = may_compat_primary_key(mapper.metadata(), &reader_meta)?;
90        let compat_fields = may_compat_fields(mapper, &reader_meta)?;
91
92        Ok(Self {
93            rewrite_pk,
94            compat_pk,
95            compat_fields,
96        })
97    }
98
99    /// Adapts the `batch` to the expected schema.
100    pub(crate) fn compat_batch(&self, mut batch: Batch) -> Result<Batch> {
101        if let Some(rewrite_pk) = &self.rewrite_pk {
102            batch = rewrite_pk.compat(batch)?;
103        }
104        if let Some(compat_pk) = &self.compat_pk {
105            batch = compat_pk.compat(batch)?;
106        }
107        if let Some(compat_fields) = &self.compat_fields {
108            batch = compat_fields.compat(batch);
109        }
110
111        Ok(batch)
112    }
113}
114
115/// Returns true if `left` and `right` have same columns and primary key encoding.
116pub(crate) fn has_same_columns_and_pk_encoding(
117    left: &RegionMetadata,
118    right: &RegionMetadata,
119) -> bool {
120    if left.primary_key_encoding != right.primary_key_encoding {
121        return false;
122    }
123
124    if left.column_metadatas.len() != right.column_metadatas.len() {
125        return false;
126    }
127
128    for (left_col, right_col) in left.column_metadatas.iter().zip(&right.column_metadatas) {
129        if left_col.column_id != right_col.column_id || !left_col.is_same_datatype(right_col) {
130            return false;
131        }
132        debug_assert_eq!(
133            left_col.column_schema.data_type,
134            right_col.column_schema.data_type
135        );
136        debug_assert_eq!(left_col.semantic_type, right_col.semantic_type);
137    }
138
139    true
140}
141
142/// Helper to make primary key compatible.
143#[derive(Debug)]
144struct CompatPrimaryKey {
145    /// Row converter to append values to primary keys.
146    converter: Arc<dyn PrimaryKeyCodec>,
147    /// Default values to append.
148    values: Vec<(ColumnId, Value)>,
149}
150
151impl CompatPrimaryKey {
152    /// Make primary key of the `batch` compatible.
153    fn compat(&self, mut batch: Batch) -> Result<Batch> {
154        let mut buffer = Vec::with_capacity(
155            batch.primary_key().len() + self.converter.estimated_size().unwrap_or_default(),
156        );
157        buffer.extend_from_slice(batch.primary_key());
158        self.converter
159            .encode_values(&self.values, &mut buffer)
160            .context(EncodeSnafu)?;
161
162        batch.set_primary_key(buffer);
163
164        // update cache
165        if let Some(pk_values) = &mut batch.pk_values {
166            pk_values.extend(&self.values);
167        }
168
169        Ok(batch)
170    }
171}
172
173/// Helper to make fields compatible.
174#[derive(Debug)]
175struct CompatFields {
176    /// Column Ids and DataTypes the reader actually returns.
177    actual_fields: Vec<(ColumnId, ConcreteDataType)>,
178    /// Indices to convert actual fields to expect fields.
179    index_or_defaults: Vec<IndexOrDefault>,
180}
181
182impl CompatFields {
183    /// Make fields of the `batch` compatible.
184    #[must_use]
185    fn compat(&self, batch: Batch) -> Batch {
186        debug_assert_eq!(self.actual_fields.len(), batch.fields().len());
187        debug_assert!(self
188            .actual_fields
189            .iter()
190            .zip(batch.fields())
191            .all(|((id, _), batch_column)| *id == batch_column.column_id));
192
193        let len = batch.num_rows();
194        let fields = self
195            .index_or_defaults
196            .iter()
197            .map(|index_or_default| match index_or_default {
198                IndexOrDefault::Index { pos, cast_type } => {
199                    let old_column = &batch.fields()[*pos];
200
201                    let data = if let Some(ty) = cast_type {
202                        // Safety: We ensure type can be converted and the new batch should be valid.
203                        // Tips: `safe` must be true in `CastOptions`, which will replace the specific value with null when it cannot be converted.
204                        old_column.data.cast(ty).unwrap()
205                    } else {
206                        old_column.data.clone()
207                    };
208                    BatchColumn {
209                        column_id: old_column.column_id,
210                        data,
211                    }
212                }
213                IndexOrDefault::DefaultValue {
214                    column_id,
215                    default_vector,
216                } => {
217                    let data = default_vector.replicate(&[len]);
218                    BatchColumn {
219                        column_id: *column_id,
220                        data,
221                    }
222                }
223            })
224            .collect();
225
226        // Safety: We ensure all columns have the same length and the new batch should be valid.
227        batch.with_fields(fields).unwrap()
228    }
229}
230
231fn may_rewrite_primary_key(
232    expect: &RegionMetadata,
233    actual: &RegionMetadata,
234) -> Option<RewritePrimaryKey> {
235    if expect.primary_key_encoding == actual.primary_key_encoding {
236        return None;
237    }
238
239    let fields = expect.primary_key.clone();
240    let original = build_primary_key_codec(actual);
241    let new = build_primary_key_codec(expect);
242
243    Some(RewritePrimaryKey {
244        original,
245        new,
246        fields,
247    })
248}
249
250/// Creates a [CompatPrimaryKey] if needed.
251fn may_compat_primary_key(
252    expect: &RegionMetadata,
253    actual: &RegionMetadata,
254) -> Result<Option<CompatPrimaryKey>> {
255    ensure!(
256        actual.primary_key.len() <= expect.primary_key.len(),
257        CompatReaderSnafu {
258            region_id: expect.region_id,
259            reason: format!(
260                "primary key has more columns {} than expect {}",
261                actual.primary_key.len(),
262                expect.primary_key.len()
263            ),
264        }
265    );
266    ensure!(
267        actual.primary_key == expect.primary_key[..actual.primary_key.len()],
268        CompatReaderSnafu {
269            region_id: expect.region_id,
270            reason: format!(
271                "primary key has different prefix, expect: {:?}, actual: {:?}",
272                expect.primary_key, actual.primary_key
273            ),
274        }
275    );
276    if actual.primary_key.len() == expect.primary_key.len() {
277        return Ok(None);
278    }
279
280    // We need to append default values to the primary key.
281    let to_add = &expect.primary_key[actual.primary_key.len()..];
282    let mut fields = Vec::with_capacity(to_add.len());
283    let mut values = Vec::with_capacity(to_add.len());
284    for column_id in to_add {
285        // Safety: The id comes from expect region metadata.
286        let column = expect.column_by_id(*column_id).unwrap();
287        fields.push((
288            *column_id,
289            SortField::new(column.column_schema.data_type.clone()),
290        ));
291        let default_value = column
292            .column_schema
293            .create_default()
294            .context(CreateDefaultSnafu {
295                region_id: expect.region_id,
296                column: &column.column_schema.name,
297            })?
298            .with_context(|| CompatReaderSnafu {
299                region_id: expect.region_id,
300                reason: format!(
301                    "key column {} does not have a default value to read",
302                    column.column_schema.name
303                ),
304            })?;
305        values.push((*column_id, default_value));
306    }
307    // Using expect primary key encoding to build the converter
308    let converter =
309        build_primary_key_codec_with_fields(expect.primary_key_encoding, fields.into_iter());
310
311    Ok(Some(CompatPrimaryKey { converter, values }))
312}
313
314/// Creates a [CompatFields] if needed.
315fn may_compat_fields(
316    mapper: &ProjectionMapper,
317    actual: &RegionMetadata,
318) -> Result<Option<CompatFields>> {
319    let expect_fields = mapper.batch_fields();
320    let actual_fields = Batch::projected_fields(actual, mapper.column_ids());
321    if expect_fields == actual_fields {
322        return Ok(None);
323    }
324
325    let source_field_index: HashMap<_, _> = actual_fields
326        .iter()
327        .enumerate()
328        .map(|(idx, (column_id, data_type))| (*column_id, (idx, data_type)))
329        .collect();
330
331    let index_or_defaults = expect_fields
332        .iter()
333        .map(|(column_id, expect_data_type)| {
334            if let Some((index, actual_data_type)) = source_field_index.get(column_id) {
335                let mut cast_type = None;
336
337                if expect_data_type != *actual_data_type {
338                    cast_type = Some(expect_data_type.clone())
339                }
340                // Source has this field.
341                Ok(IndexOrDefault::Index {
342                    pos: *index,
343                    cast_type,
344                })
345            } else {
346                // Safety: mapper must have this column.
347                let column = mapper.metadata().column_by_id(*column_id).unwrap();
348                // Create a default vector with 1 element for that column.
349                let default_vector = column
350                    .column_schema
351                    .create_default_vector(1)
352                    .context(CreateDefaultSnafu {
353                        region_id: mapper.metadata().region_id,
354                        column: &column.column_schema.name,
355                    })?
356                    .with_context(|| CompatReaderSnafu {
357                        region_id: mapper.metadata().region_id,
358                        reason: format!(
359                            "column {} does not have a default value to read",
360                            column.column_schema.name
361                        ),
362                    })?;
363                Ok(IndexOrDefault::DefaultValue {
364                    column_id: column.column_id,
365                    default_vector,
366                })
367            }
368        })
369        .collect::<Result<Vec<_>>>()?;
370
371    Ok(Some(CompatFields {
372        actual_fields,
373        index_or_defaults,
374    }))
375}
376
377/// Index in source batch or a default value to fill a column.
378#[derive(Debug)]
379enum IndexOrDefault {
380    /// Index of the column in source batch.
381    Index {
382        pos: usize,
383        cast_type: Option<ConcreteDataType>,
384    },
385    /// Default value for the column.
386    DefaultValue {
387        /// Id of the column.
388        column_id: ColumnId,
389        /// Default value. The vector has only 1 element.
390        default_vector: VectorRef,
391    },
392}
393
394/// Adapter to rewrite primary key.
395struct RewritePrimaryKey {
396    /// Original primary key codec.
397    original: Arc<dyn PrimaryKeyCodec>,
398    /// New primary key codec.
399    new: Arc<dyn PrimaryKeyCodec>,
400    /// Order of the fields in the new primary key.
401    fields: Vec<ColumnId>,
402}
403
404impl RewritePrimaryKey {
405    /// Make primary key of the `batch` compatible.
406    fn compat(&self, mut batch: Batch) -> Result<Batch> {
407        let values = if let Some(pk_values) = batch.pk_values() {
408            pk_values
409        } else {
410            let new_pk_values = self
411                .original
412                .decode(batch.primary_key())
413                .context(DecodeSnafu)?;
414            batch.set_pk_values(new_pk_values);
415            // Safety: We ensure pk_values is not None.
416            batch.pk_values().as_ref().unwrap()
417        };
418
419        let mut buffer = Vec::with_capacity(
420            batch.primary_key().len() + self.new.estimated_size().unwrap_or_default(),
421        );
422        match values {
423            CompositeValues::Dense(values) => {
424                self.new
425                    .encode_values(values.as_slice(), &mut buffer)
426                    .context(EncodeSnafu)?;
427            }
428            CompositeValues::Sparse(values) => {
429                let values = self
430                    .fields
431                    .iter()
432                    .map(|id| {
433                        let value = values.get_or_null(*id);
434                        (*id, value.as_value_ref())
435                    })
436                    .collect::<Vec<_>>();
437                self.new
438                    .encode_value_refs(&values, &mut buffer)
439                    .context(EncodeSnafu)?;
440            }
441        }
442        batch.set_primary_key(buffer);
443
444        Ok(batch)
445    }
446}
447
448#[cfg(test)]
449mod tests {
450    use std::sync::Arc;
451
452    use api::v1::{OpType, SemanticType};
453    use datatypes::prelude::ConcreteDataType;
454    use datatypes::schema::ColumnSchema;
455    use datatypes::value::ValueRef;
456    use datatypes::vectors::{Int64Vector, TimestampMillisecondVector, UInt64Vector, UInt8Vector};
457    use mito_codec::row_converter::{
458        DensePrimaryKeyCodec, PrimaryKeyCodecExt, SparsePrimaryKeyCodec,
459    };
460    use store_api::codec::PrimaryKeyEncoding;
461    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
462    use store_api::storage::RegionId;
463
464    use super::*;
465    use crate::test_util::{check_reader_result, VecBatchReader};
466
467    /// Creates a new [RegionMetadata].
468    fn new_metadata(
469        semantic_types: &[(ColumnId, SemanticType, ConcreteDataType)],
470        primary_key: &[ColumnId],
471    ) -> RegionMetadata {
472        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
473        for (id, semantic_type, data_type) in semantic_types {
474            let column_schema = match semantic_type {
475                SemanticType::Tag => {
476                    ColumnSchema::new(format!("tag_{id}"), data_type.clone(), true)
477                }
478                SemanticType::Field => {
479                    ColumnSchema::new(format!("field_{id}"), data_type.clone(), true)
480                }
481                SemanticType::Timestamp => ColumnSchema::new("ts", data_type.clone(), false),
482            };
483
484            builder.push_column_metadata(ColumnMetadata {
485                column_schema,
486                semantic_type: *semantic_type,
487                column_id: *id,
488            });
489        }
490        builder.primary_key(primary_key.to_vec());
491        builder.build().unwrap()
492    }
493
494    /// Encode primary key.
495    fn encode_key(keys: &[Option<&str>]) -> Vec<u8> {
496        let fields = (0..keys.len())
497            .map(|_| (0, SortField::new(ConcreteDataType::string_datatype())))
498            .collect();
499        let converter = DensePrimaryKeyCodec::with_fields(fields);
500        let row = keys.iter().map(|str_opt| match str_opt {
501            Some(v) => ValueRef::String(v),
502            None => ValueRef::Null,
503        });
504
505        converter.encode(row).unwrap()
506    }
507
508    /// Encode sparse primary key.
509    fn encode_sparse_key(keys: &[(ColumnId, Option<&str>)]) -> Vec<u8> {
510        let fields = (0..keys.len())
511            .map(|_| (1, SortField::new(ConcreteDataType::string_datatype())))
512            .collect();
513        let converter = SparsePrimaryKeyCodec::with_fields(fields);
514        let row = keys
515            .iter()
516            .map(|(id, str_opt)| match str_opt {
517                Some(v) => (*id, ValueRef::String(v)),
518                None => (*id, ValueRef::Null),
519            })
520            .collect::<Vec<_>>();
521        let mut buffer = vec![];
522        converter.encode_value_refs(&row, &mut buffer).unwrap();
523        buffer
524    }
525
526    /// Creates a batch for specific primary `key`.
527    ///
528    /// `fields`: [(column_id of the field, is null)]
529    fn new_batch(
530        primary_key: &[u8],
531        fields: &[(ColumnId, bool)],
532        start_ts: i64,
533        num_rows: usize,
534    ) -> Batch {
535        let timestamps = Arc::new(TimestampMillisecondVector::from_values(
536            start_ts..start_ts + num_rows as i64,
537        ));
538        let sequences = Arc::new(UInt64Vector::from_values(0..num_rows as u64));
539        let op_types = Arc::new(UInt8Vector::from_vec(vec![OpType::Put as u8; num_rows]));
540        let field_columns = fields
541            .iter()
542            .map(|(id, is_null)| {
543                let data = if *is_null {
544                    Arc::new(Int64Vector::from(vec![None; num_rows]))
545                } else {
546                    Arc::new(Int64Vector::from_vec(vec![*id as i64; num_rows]))
547                };
548                BatchColumn {
549                    column_id: *id,
550                    data,
551                }
552            })
553            .collect();
554        Batch::new(
555            primary_key.to_vec(),
556            timestamps,
557            sequences,
558            op_types,
559            field_columns,
560        )
561        .unwrap()
562    }
563
564    #[test]
565    fn test_invalid_pk_len() {
566        let reader_meta = new_metadata(
567            &[
568                (
569                    0,
570                    SemanticType::Timestamp,
571                    ConcreteDataType::timestamp_millisecond_datatype(),
572                ),
573                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
574                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
575                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
576            ],
577            &[1, 2],
578        );
579        let expect_meta = new_metadata(
580            &[
581                (
582                    0,
583                    SemanticType::Timestamp,
584                    ConcreteDataType::timestamp_millisecond_datatype(),
585                ),
586                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
587                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
588            ],
589            &[1],
590        );
591        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
592    }
593
594    #[test]
595    fn test_different_pk() {
596        let reader_meta = new_metadata(
597            &[
598                (
599                    0,
600                    SemanticType::Timestamp,
601                    ConcreteDataType::timestamp_millisecond_datatype(),
602                ),
603                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
604                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
605                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
606            ],
607            &[2, 1],
608        );
609        let expect_meta = new_metadata(
610            &[
611                (
612                    0,
613                    SemanticType::Timestamp,
614                    ConcreteDataType::timestamp_millisecond_datatype(),
615                ),
616                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
617                (2, SemanticType::Tag, ConcreteDataType::string_datatype()),
618                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
619                (4, SemanticType::Tag, ConcreteDataType::string_datatype()),
620            ],
621            &[1, 2, 4],
622        );
623        may_compat_primary_key(&expect_meta, &reader_meta).unwrap_err();
624    }
625
626    #[test]
627    fn test_same_pk() {
628        let reader_meta = new_metadata(
629            &[
630                (
631                    0,
632                    SemanticType::Timestamp,
633                    ConcreteDataType::timestamp_millisecond_datatype(),
634                ),
635                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
636                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
637            ],
638            &[1],
639        );
640        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
641            .unwrap()
642            .is_none());
643    }
644
645    #[test]
646    fn test_same_pk_encoding() {
647        let reader_meta = Arc::new(new_metadata(
648            &[
649                (
650                    0,
651                    SemanticType::Timestamp,
652                    ConcreteDataType::timestamp_millisecond_datatype(),
653                ),
654                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
655            ],
656            &[1],
657        ));
658
659        assert!(may_compat_primary_key(&reader_meta, &reader_meta)
660            .unwrap()
661            .is_none());
662    }
663
664    #[test]
665    fn test_same_fields() {
666        let reader_meta = Arc::new(new_metadata(
667            &[
668                (
669                    0,
670                    SemanticType::Timestamp,
671                    ConcreteDataType::timestamp_millisecond_datatype(),
672                ),
673                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
674                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
675            ],
676            &[1],
677        ));
678        let mapper = ProjectionMapper::all(&reader_meta).unwrap();
679        assert!(may_compat_fields(&mapper, &reader_meta).unwrap().is_none())
680    }
681
682    #[tokio::test]
683    async fn test_compat_reader() {
684        let reader_meta = Arc::new(new_metadata(
685            &[
686                (
687                    0,
688                    SemanticType::Timestamp,
689                    ConcreteDataType::timestamp_millisecond_datatype(),
690                ),
691                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
692                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
693            ],
694            &[1],
695        ));
696        let expect_meta = Arc::new(new_metadata(
697            &[
698                (
699                    0,
700                    SemanticType::Timestamp,
701                    ConcreteDataType::timestamp_millisecond_datatype(),
702                ),
703                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
704                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
705                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
706                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
707            ],
708            &[1, 3],
709        ));
710        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
711        let k1 = encode_key(&[Some("a")]);
712        let k2 = encode_key(&[Some("b")]);
713        let source_reader = VecBatchReader::new(&[
714            new_batch(&k1, &[(2, false)], 1000, 3),
715            new_batch(&k2, &[(2, false)], 1000, 3),
716        ]);
717
718        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
719        let k1 = encode_key(&[Some("a"), None]);
720        let k2 = encode_key(&[Some("b"), None]);
721        check_reader_result(
722            &mut compat_reader,
723            &[
724                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
725                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
726            ],
727        )
728        .await;
729    }
730
731    #[tokio::test]
732    async fn test_compat_reader_different_order() {
733        let reader_meta = Arc::new(new_metadata(
734            &[
735                (
736                    0,
737                    SemanticType::Timestamp,
738                    ConcreteDataType::timestamp_millisecond_datatype(),
739                ),
740                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
741                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
742            ],
743            &[1],
744        ));
745        let expect_meta = Arc::new(new_metadata(
746            &[
747                (
748                    0,
749                    SemanticType::Timestamp,
750                    ConcreteDataType::timestamp_millisecond_datatype(),
751                ),
752                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
753                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
754                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
755                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
756            ],
757            &[1],
758        ));
759        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
760        let k1 = encode_key(&[Some("a")]);
761        let k2 = encode_key(&[Some("b")]);
762        let source_reader = VecBatchReader::new(&[
763            new_batch(&k1, &[(2, false)], 1000, 3),
764            new_batch(&k2, &[(2, false)], 1000, 3),
765        ]);
766
767        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
768        check_reader_result(
769            &mut compat_reader,
770            &[
771                new_batch(&k1, &[(3, true), (2, false), (4, true)], 1000, 3),
772                new_batch(&k2, &[(3, true), (2, false), (4, true)], 1000, 3),
773            ],
774        )
775        .await;
776    }
777
778    #[tokio::test]
779    async fn test_compat_reader_different_types() {
780        let actual_meta = Arc::new(new_metadata(
781            &[
782                (
783                    0,
784                    SemanticType::Timestamp,
785                    ConcreteDataType::timestamp_millisecond_datatype(),
786                ),
787                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
788                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
789            ],
790            &[1],
791        ));
792        let expect_meta = Arc::new(new_metadata(
793            &[
794                (
795                    0,
796                    SemanticType::Timestamp,
797                    ConcreteDataType::timestamp_millisecond_datatype(),
798                ),
799                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
800                (2, SemanticType::Field, ConcreteDataType::string_datatype()),
801            ],
802            &[1],
803        ));
804        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
805        let k1 = encode_key(&[Some("a")]);
806        let k2 = encode_key(&[Some("b")]);
807        let source_reader = VecBatchReader::new(&[
808            new_batch(&k1, &[(2, false)], 1000, 3),
809            new_batch(&k2, &[(2, false)], 1000, 3),
810        ]);
811
812        let fn_batch_cast = |batch: Batch| {
813            let mut new_fields = batch.fields.clone();
814            new_fields[0].data = new_fields[0]
815                .data
816                .cast(&ConcreteDataType::string_datatype())
817                .unwrap();
818
819            batch.with_fields(new_fields).unwrap()
820        };
821        let mut compat_reader = CompatReader::new(&mapper, actual_meta, source_reader).unwrap();
822        check_reader_result(
823            &mut compat_reader,
824            &[
825                fn_batch_cast(new_batch(&k1, &[(2, false)], 1000, 3)),
826                fn_batch_cast(new_batch(&k2, &[(2, false)], 1000, 3)),
827            ],
828        )
829        .await;
830    }
831
832    #[tokio::test]
833    async fn test_compat_reader_projection() {
834        let reader_meta = Arc::new(new_metadata(
835            &[
836                (
837                    0,
838                    SemanticType::Timestamp,
839                    ConcreteDataType::timestamp_millisecond_datatype(),
840                ),
841                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
842                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
843            ],
844            &[1],
845        ));
846        let expect_meta = Arc::new(new_metadata(
847            &[
848                (
849                    0,
850                    SemanticType::Timestamp,
851                    ConcreteDataType::timestamp_millisecond_datatype(),
852                ),
853                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
854                (3, SemanticType::Field, ConcreteDataType::int64_datatype()),
855                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
856                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
857            ],
858            &[1],
859        ));
860        // tag_1, field_2, field_3
861        let mapper = ProjectionMapper::new(&expect_meta, [1, 3, 2].into_iter()).unwrap();
862        let k1 = encode_key(&[Some("a")]);
863        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[(2, false)], 1000, 3)]);
864
865        let mut compat_reader =
866            CompatReader::new(&mapper, reader_meta.clone(), source_reader).unwrap();
867        check_reader_result(
868            &mut compat_reader,
869            &[new_batch(&k1, &[(3, true), (2, false)], 1000, 3)],
870        )
871        .await;
872
873        // tag_1, field_4, field_3
874        let mapper = ProjectionMapper::new(&expect_meta, [1, 4, 2].into_iter()).unwrap();
875        let k1 = encode_key(&[Some("a")]);
876        let source_reader = VecBatchReader::new(&[new_batch(&k1, &[], 1000, 3)]);
877
878        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
879        check_reader_result(
880            &mut compat_reader,
881            &[new_batch(&k1, &[(3, true), (4, true)], 1000, 3)],
882        )
883        .await;
884    }
885
886    #[tokio::test]
887    async fn test_compat_reader_different_pk_encoding() {
888        let mut reader_meta = new_metadata(
889            &[
890                (
891                    0,
892                    SemanticType::Timestamp,
893                    ConcreteDataType::timestamp_millisecond_datatype(),
894                ),
895                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
896                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
897            ],
898            &[1],
899        );
900        reader_meta.primary_key_encoding = PrimaryKeyEncoding::Dense;
901        let reader_meta = Arc::new(reader_meta);
902        let mut expect_meta = new_metadata(
903            &[
904                (
905                    0,
906                    SemanticType::Timestamp,
907                    ConcreteDataType::timestamp_millisecond_datatype(),
908                ),
909                (1, SemanticType::Tag, ConcreteDataType::string_datatype()),
910                (2, SemanticType::Field, ConcreteDataType::int64_datatype()),
911                (3, SemanticType::Tag, ConcreteDataType::string_datatype()),
912                (4, SemanticType::Field, ConcreteDataType::int64_datatype()),
913            ],
914            &[1, 3],
915        );
916        expect_meta.primary_key_encoding = PrimaryKeyEncoding::Sparse;
917        let expect_meta = Arc::new(expect_meta);
918
919        let mapper = ProjectionMapper::all(&expect_meta).unwrap();
920        let k1 = encode_key(&[Some("a")]);
921        let k2 = encode_key(&[Some("b")]);
922        let source_reader = VecBatchReader::new(&[
923            new_batch(&k1, &[(2, false)], 1000, 3),
924            new_batch(&k2, &[(2, false)], 1000, 3),
925        ]);
926
927        let mut compat_reader = CompatReader::new(&mapper, reader_meta, source_reader).unwrap();
928        let k1 = encode_sparse_key(&[(1, Some("a")), (3, None)]);
929        let k2 = encode_sparse_key(&[(1, Some("b")), (3, None)]);
930        check_reader_result(
931            &mut compat_reader,
932            &[
933                new_batch(&k1, &[(2, false), (4, true)], 1000, 3),
934                new_batch(&k2, &[(2, false), (4, true)], 1000, 3),
935            ],
936        )
937        .await;
938    }
939}