Skip to main content

mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection operations.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39/// Only cache vector when its length `<=` this value.
40pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42/// Wrapper enum for different projection mapper implementations.
43pub enum ProjectionMapper {
44    /// Projection mapper for primary key format.
45    PrimaryKey(PrimaryKeyProjectionMapper),
46    /// Projection mapper for flat format.
47    Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51    /// Returns a new mapper with projection.
52    pub fn new(
53        metadata: &RegionMetadataRef,
54        projection: impl Iterator<Item = usize> + Clone,
55    ) -> Result<Self> {
56        Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
57            metadata, projection,
58        )?))
59    }
60
61    /// Returns a new mapper with output projection and explicit read columns.
62    pub fn new_with_read_columns(
63        metadata: &RegionMetadataRef,
64        projection: impl Iterator<Item = usize>,
65        read_column_ids: Vec<ColumnId>,
66    ) -> Result<Self> {
67        let projection: Vec<_> = projection.collect();
68        Ok(ProjectionMapper::Flat(
69            FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
70        ))
71    }
72
73    /// Returns a new mapper without projection.
74    pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
75        Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
76    }
77
78    /// Returns the metadata that created the mapper.
79    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
80        match self {
81            ProjectionMapper::PrimaryKey(m) => m.metadata(),
82            ProjectionMapper::Flat(m) => m.metadata(),
83        }
84    }
85
86    /// Returns true if the projection includes any tag columns.
87    pub(crate) fn has_tags(&self) -> bool {
88        match self {
89            ProjectionMapper::PrimaryKey(m) => m.has_tags(),
90            ProjectionMapper::Flat(_) => false,
91        }
92    }
93
94    /// Returns ids of projected columns that we need to read
95    /// from memtables and SSTs.
96    pub(crate) fn column_ids(&self) -> &[ColumnId] {
97        match self {
98            ProjectionMapper::PrimaryKey(m) => m.column_ids(),
99            ProjectionMapper::Flat(m) => m.column_ids(),
100        }
101    }
102
103    /// Returns the schema of converted [RecordBatch].
104    pub(crate) fn output_schema(&self) -> SchemaRef {
105        match self {
106            ProjectionMapper::PrimaryKey(m) => m.output_schema(),
107            ProjectionMapper::Flat(m) => m.output_schema(),
108        }
109    }
110
111    /// Returns the primary key projection mapper or None if this is not a primary key mapper.
112    pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
113        match self {
114            ProjectionMapper::PrimaryKey(m) => Some(m),
115            ProjectionMapper::Flat(_) => None,
116        }
117    }
118
119    /// Returns the flat projection mapper or None if this is not a flat mapper.
120    pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
121        match self {
122            ProjectionMapper::PrimaryKey(_) => None,
123            ProjectionMapper::Flat(m) => Some(m),
124        }
125    }
126
127    /// Returns an empty [RecordBatch].
128    // TODO(yingwen): This is unused now. Use it after we finishing the flat format.
129    pub fn empty_record_batch(&self) -> RecordBatch {
130        match self {
131            ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
132            ProjectionMapper::Flat(m) => m.empty_record_batch(),
133        }
134    }
135}
136
137/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
138#[allow(dead_code)]
139pub struct PrimaryKeyProjectionMapper {
140    /// Metadata of the region.
141    metadata: RegionMetadataRef,
142    /// Maps column in [RecordBatch] to index in [Batch].
143    batch_indices: Vec<BatchIndex>,
144    /// Output record batch contains tags.
145    has_tags: bool,
146    /// Decoder for primary key.
147    codec: Arc<dyn PrimaryKeyCodec>,
148    /// Schema for converted [RecordBatch].
149    output_schema: SchemaRef,
150    /// Ids of columns to read from memtables and SSTs.
151    read_column_ids: Vec<ColumnId>,
152    /// Ids and DataTypes of field columns in the read [Batch].
153    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
154    /// `true` If the original projection is empty.
155    is_empty_projection: bool,
156}
157
158#[allow(dead_code)]
159impl PrimaryKeyProjectionMapper {
160    /// Returns a new mapper with projection.
161    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
162    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
163    /// empty `RecordBatch` and only use its row count in this query.
164    pub fn new(
165        metadata: &RegionMetadataRef,
166        projection: impl Iterator<Item = usize>,
167    ) -> Result<PrimaryKeyProjectionMapper> {
168        let projection: Vec<_> = projection.collect();
169        let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
170        Self::new_with_read_columns(metadata, projection, read_column_ids)
171    }
172
173    /// Returns a new mapper with output projection and explicit read columns.
174    pub fn new_with_read_columns(
175        metadata: &RegionMetadataRef,
176        projection: Vec<usize>,
177        read_column_ids: Vec<ColumnId>,
178    ) -> Result<PrimaryKeyProjectionMapper> {
179        // If the original projection is empty.
180        let is_empty_projection = projection.is_empty();
181
182        let mut column_schemas = Vec::with_capacity(projection.len());
183        for idx in &projection {
184            // For each projection index, we get the column schema for projection
185            column_schemas.push(
186                metadata
187                    .schema
188                    .column_schemas()
189                    .get(*idx)
190                    .with_context(|| InvalidRequestSnafu {
191                        region_id: metadata.region_id,
192                        reason: format!("projection index {} is out of bound", idx),
193                    })?
194                    .clone(),
195            );
196        }
197
198        let codec = build_primary_key_codec(metadata);
199        // If projection is empty, we don't output any column.
200        let output_schema = if is_empty_projection {
201            Arc::new(Schema::new(vec![]))
202        } else {
203            // Safety: Columns come from existing schema.
204            Arc::new(Schema::new(column_schemas))
205        };
206        // Get fields in each read batch.
207        let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
208
209        // Field column id to its index in batch.
210        let field_id_to_index: HashMap<_, _> = batch_fields
211            .iter()
212            .enumerate()
213            .map(|(index, (column_id, _))| (*column_id, index))
214            .collect();
215        // For each projected column, compute its index in batches.
216        let mut batch_indices = Vec::with_capacity(projection.len());
217        let mut has_tags = false;
218        if !is_empty_projection {
219            for idx in &projection {
220                // Safety: idx is valid.
221                let column = &metadata.column_metadatas[*idx];
222                // Get column index in a batch by its semantic type and column id.
223                let batch_index = match column.semantic_type {
224                    SemanticType::Tag => {
225                        // Safety: It is a primary key column.
226                        let index = metadata.primary_key_index(column.column_id).unwrap();
227                        // We need to output a tag.
228                        has_tags = true;
229                        // We always read all primary key so the column always exists and the tag
230                        // index is always valid.
231                        BatchIndex::Tag((index, column.column_id))
232                    }
233                    SemanticType::Timestamp => BatchIndex::Timestamp,
234                    SemanticType::Field => {
235                        let index = *field_id_to_index.get(&column.column_id).context(
236                            InvalidRequestSnafu {
237                                region_id: metadata.region_id,
238                                reason: format!(
239                                    "field column {} is missing in read projection",
240                                    column.column_schema.name
241                                ),
242                            },
243                        )?;
244                        BatchIndex::Field(index)
245                    }
246                };
247                batch_indices.push(batch_index);
248            }
249        }
250
251        Ok(PrimaryKeyProjectionMapper {
252            metadata: metadata.clone(),
253            batch_indices,
254            has_tags,
255            codec,
256            output_schema,
257            read_column_ids,
258            batch_fields,
259            is_empty_projection,
260        })
261    }
262
263    /// Returns a new mapper without projection.
264    pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
265        PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
266    }
267
268    /// Returns the metadata that created the mapper.
269    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
270        &self.metadata
271    }
272
273    /// Returns true if the projection includes any tag columns.
274    pub(crate) fn has_tags(&self) -> bool {
275        self.has_tags
276    }
277
278    /// Returns ids of projected columns that we need to read
279    /// from memtables and SSTs.
280    pub(crate) fn column_ids(&self) -> &[ColumnId] {
281        &self.read_column_ids
282    }
283
284    /// Returns ids of fields in [Batch]es the mapper expects to convert.
285    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
286        &self.batch_fields
287    }
288
289    /// Returns the schema of converted [RecordBatch].
290    /// This is the schema that the stream will output. This schema may contain
291    /// less columns than [PrimaryKeyProjectionMapper::column_ids()].
292    pub(crate) fn output_schema(&self) -> SchemaRef {
293        self.output_schema.clone()
294    }
295
296    /// Returns an empty [RecordBatch].
297    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
298        RecordBatch::new_empty(self.output_schema.clone())
299    }
300
301    /// Converts a [Batch] to a [RecordBatch].
302    ///
303    /// The batch must match the `projection` using to build the mapper.
304    pub(crate) fn convert(
305        &self,
306        batch: &Batch,
307        cache_strategy: &CacheStrategy,
308    ) -> common_recordbatch::error::Result<RecordBatch> {
309        if self.is_empty_projection {
310            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
311        }
312
313        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
314        debug_assert!(
315            self.batch_fields
316                .iter()
317                .zip(batch.fields())
318                .all(|((id, _), batch_col)| *id == batch_col.column_id)
319        );
320
321        // Skips decoding pk if we don't need to output it.
322        let pk_values = if self.has_tags {
323            match batch.pk_values() {
324                Some(v) => v.clone(),
325                None => self
326                    .codec
327                    .decode(batch.primary_key())
328                    .map_err(BoxedError::new)
329                    .context(ExternalSnafu)?,
330            }
331        } else {
332            CompositeValues::Dense(vec![])
333        };
334
335        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
336        let num_rows = batch.num_rows();
337        for (index, column_schema) in self
338            .batch_indices
339            .iter()
340            .zip(self.output_schema.column_schemas())
341        {
342            match index {
343                BatchIndex::Tag((idx, column_id)) => {
344                    let value = match &pk_values {
345                        CompositeValues::Dense(v) => &v[*idx].1,
346                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
347                    };
348                    let vector = repeated_vector_with_cache(
349                        &column_schema.data_type,
350                        value,
351                        num_rows,
352                        cache_strategy,
353                    )?;
354                    columns.push(vector);
355                }
356                BatchIndex::Timestamp => {
357                    columns.push(batch.timestamps().clone());
358                }
359                BatchIndex::Field(idx) => {
360                    columns.push(batch.fields()[*idx].data.clone());
361                }
362            }
363        }
364
365        RecordBatch::new(self.output_schema.clone(), columns)
366    }
367}
368
369pub(crate) fn read_column_ids_from_projection(
370    metadata: &RegionMetadataRef,
371    projection: &[usize],
372) -> Result<Vec<ColumnId>> {
373    let mut column_ids = Vec::with_capacity(projection.len().max(1));
374    if projection.is_empty() {
375        column_ids.push(metadata.time_index_column().column_id);
376        return Ok(column_ids);
377    }
378
379    for idx in projection {
380        let column = metadata
381            .column_metadatas
382            .get(*idx)
383            .with_context(|| InvalidRequestSnafu {
384                region_id: metadata.region_id,
385                reason: format!("projection index {} is out of bound", idx),
386            })?;
387        column_ids.push(column.column_id);
388    }
389    Ok(column_ids)
390}
391
392/// Index of a vector in a [Batch].
393#[derive(Debug, Clone, Copy)]
394#[allow(dead_code)]
395enum BatchIndex {
396    /// Index in primary keys.
397    Tag((usize, ColumnId)),
398    /// The time index column.
399    Timestamp,
400    /// Index in fields.
401    Field(usize),
402}
403
404/// Gets a vector with repeated values from specific cache or creates a new one.
405pub(crate) fn repeated_vector_with_cache(
406    data_type: &ConcreteDataType,
407    value: &Value,
408    num_rows: usize,
409    cache_strategy: &CacheStrategy,
410) -> common_recordbatch::error::Result<VectorRef> {
411    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
412        // Tries to get the vector from cache manager. If the vector doesn't
413        // have enough length, creates a new one.
414        match vector.len().cmp(&num_rows) {
415            Ordering::Less => (),
416            Ordering::Equal => return Ok(vector),
417            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
418        }
419    }
420
421    // Creates a new one.
422    let vector = new_repeated_vector(data_type, value, num_rows)?;
423    // Updates cache.
424    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
425        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
426    }
427
428    Ok(vector)
429}
430
431/// Returns a vector with repeated values.
432pub(crate) fn new_repeated_vector(
433    data_type: &ConcreteDataType,
434    value: &Value,
435    num_rows: usize,
436) -> common_recordbatch::error::Result<VectorRef> {
437    let mut mutable_vector = data_type.create_mutable_vector(1);
438    mutable_vector
439        .try_push_value_ref(&value.as_value_ref())
440        .context(DataTypesSnafu)?;
441    // This requires an additional allocation.
442    let base_vector = mutable_vector.to_vector();
443    Ok(base_vector.replicate(&[num_rows]))
444}
445
446#[cfg(test)]
447mod tests {
448    use std::sync::Arc;
449
450    use api::v1::OpType;
451    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
452    use datatypes::arrow::datatypes::Field;
453    use datatypes::arrow::util::pretty;
454    use datatypes::value::ValueRef;
455    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
456    use mito_codec::test_util::TestRegionMetadataBuilder;
457    use store_api::storage::consts::{
458        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
459    };
460
461    use super::*;
462
463    fn print_record_batch(record_batch: RecordBatch) -> String {
464        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
465            .unwrap()
466            .to_string()
467    }
468
469    fn new_flat_batch(
470        ts_start: Option<i64>,
471        idx_tags: &[(usize, i64)],
472        idx_fields: &[(usize, i64)],
473        num_rows: usize,
474    ) -> datatypes::arrow::record_batch::RecordBatch {
475        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
476        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
477
478        // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
479
480        // Primary key columns first
481        for (i, tag) in idx_tags {
482            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
483                *tag, num_rows,
484            ))) as _;
485            columns.push(array);
486            fields.push(Field::new(
487                format!("k{i}"),
488                datatypes::arrow::datatypes::DataType::Int64,
489                true,
490            ));
491        }
492
493        // Field columns
494        for (i, field) in idx_fields {
495            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
496                *field, num_rows,
497            ))) as _;
498            columns.push(array);
499            fields.push(Field::new(
500                format!("v{i}"),
501                datatypes::arrow::datatypes::DataType::Int64,
502                true,
503            ));
504        }
505
506        // Time index
507        if let Some(ts_start) = ts_start {
508            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
509                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
510            )) as _;
511            columns.push(timestamps);
512            fields.push(Field::new(
513                "ts",
514                datatypes::arrow::datatypes::DataType::Timestamp(
515                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
516                    None,
517                ),
518                true,
519            ));
520        }
521
522        // __primary_key column (encoded primary key as dictionary)
523        // Create encoded primary key
524        let converter = DensePrimaryKeyCodec::with_fields(
525            (0..idx_tags.len())
526                .map(|idx| {
527                    (
528                        idx as u32,
529                        SortField::new(ConcreteDataType::int64_datatype()),
530                    )
531                })
532                .collect(),
533        );
534        let encoded_pk = converter
535            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
536            .unwrap();
537
538        // Create dictionary array for the encoded primary key
539        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
540        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
541        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
542        let pk_array =
543            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
544        columns.push(pk_array);
545        fields.push(Field::new_dictionary(
546            PRIMARY_KEY_COLUMN_NAME,
547            datatypes::arrow::datatypes::DataType::UInt32,
548            datatypes::arrow::datatypes::DataType::Binary,
549            false,
550        ));
551
552        // __sequence column
553        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
554        fields.push(Field::new(
555            SEQUENCE_COLUMN_NAME,
556            datatypes::arrow::datatypes::DataType::UInt64,
557            false,
558        ));
559
560        // __op_type column
561        columns.push(Arc::new(UInt8Array::from_iter_values(
562            (0..num_rows).map(|_| OpType::Put as u8),
563        )) as _);
564        fields.push(Field::new(
565            OP_TYPE_COLUMN_NAME,
566            datatypes::arrow::datatypes::DataType::UInt8,
567            false,
568        ));
569
570        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
571
572        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
573    }
574
575    #[test]
576    fn test_flat_projection_mapper_all() {
577        let metadata = Arc::new(
578            TestRegionMetadataBuilder::default()
579                .num_tags(2)
580                .num_fields(2)
581                .build(),
582        );
583        let cache = CacheStrategy::Disabled;
584        let mapper = ProjectionMapper::all(&metadata).unwrap();
585        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
586        assert_eq!(
587            [
588                (1, ConcreteDataType::int64_datatype()),
589                (2, ConcreteDataType::int64_datatype()),
590                (3, ConcreteDataType::int64_datatype()),
591                (4, ConcreteDataType::int64_datatype()),
592                (0, ConcreteDataType::timestamp_millisecond_datatype())
593            ],
594            mapper.as_flat().unwrap().batch_schema()
595        );
596
597        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
598        let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
599        let expect = "\
600+---------------------+----+----+----+----+
601| ts                  | k0 | k1 | v0 | v1 |
602+---------------------+----+----+----+----+
603| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
604| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
605| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
606+---------------------+----+----+----+----+";
607        assert_eq!(expect, print_record_batch(record_batch));
608    }
609
610    #[test]
611    fn test_flat_projection_mapper_with_projection() {
612        let metadata = Arc::new(
613            TestRegionMetadataBuilder::default()
614                .num_tags(2)
615                .num_fields(2)
616                .build(),
617        );
618        let cache = CacheStrategy::Disabled;
619        // Columns v1, k0
620        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
621        assert_eq!([4, 1], mapper.column_ids());
622        assert_eq!(
623            [
624                (1, ConcreteDataType::int64_datatype()),
625                (4, ConcreteDataType::int64_datatype()),
626                (0, ConcreteDataType::timestamp_millisecond_datatype())
627            ],
628            mapper.as_flat().unwrap().batch_schema()
629        );
630
631        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
632        let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
633        let expect = "\
634+----+----+
635| v1 | k0 |
636+----+----+
637| 4  | 1  |
638| 4  | 1  |
639| 4  | 1  |
640+----+----+";
641        assert_eq!(expect, print_record_batch(record_batch));
642    }
643
644    #[test]
645    fn test_flat_projection_mapper_read_superset() {
646        let metadata = Arc::new(
647            TestRegionMetadataBuilder::default()
648                .num_tags(2)
649                .num_fields(2)
650                .build(),
651        );
652        let cache = CacheStrategy::Disabled;
653        // Output columns v1, k0. Read also includes v0.
654        let mapper =
655            ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3])
656                .unwrap();
657        assert_eq!([4, 1, 3], mapper.column_ids());
658
659        let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
660        let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
661        let expect = "\
662+----+----+
663| v1 | k0 |
664+----+----+
665| 4  | 1  |
666| 4  | 1  |
667| 4  | 1  |
668+----+----+";
669        assert_eq!(expect, print_record_batch(record_batch));
670    }
671
672    #[test]
673    fn test_flat_projection_mapper_empty_projection() {
674        let metadata = Arc::new(
675            TestRegionMetadataBuilder::default()
676                .num_tags(2)
677                .num_fields(2)
678                .build(),
679        );
680        let cache = CacheStrategy::Disabled;
681        // Empty projection
682        let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
683        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
684        assert!(mapper.output_schema().is_empty());
685        let flat_mapper = mapper.as_flat().unwrap();
686        assert_eq!(
687            [(0, ConcreteDataType::timestamp_millisecond_datatype())],
688            flat_mapper.batch_schema()
689        );
690
691        let batch = new_flat_batch(Some(0), &[], &[], 3);
692        let record_batch = flat_mapper.convert(&batch, &cache).unwrap();
693        assert_eq!(3, record_batch.num_rows());
694        assert_eq!(0, record_batch.num_columns());
695        assert!(record_batch.schema.is_empty());
696    }
697}