mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection operations.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::RecordBatch;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::flat_projection::FlatProjectionMapper;
37use crate::read::Batch;
38
39/// Only cache vector when its length `<=` this value.
40const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42/// Wrapper enum for different projection mapper implementations.
43pub enum ProjectionMapper {
44    /// Projection mapper for primary key format.
45    PrimaryKey(PrimaryKeyProjectionMapper),
46    /// Projection mapper for flat format.
47    Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51    /// Returns a new mapper with projection.
52    pub fn new(
53        metadata: &RegionMetadataRef,
54        projection: impl Iterator<Item = usize> + Clone,
55        flat_format: bool,
56    ) -> Result<Self> {
57        if flat_format {
58            Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59                metadata, projection,
60            )?))
61        } else {
62            Ok(ProjectionMapper::PrimaryKey(
63                PrimaryKeyProjectionMapper::new(metadata, projection)?,
64            ))
65        }
66    }
67
68    /// Returns a new mapper without projection.
69    pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70        if flat_format {
71            Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72        } else {
73            Ok(ProjectionMapper::PrimaryKey(
74                PrimaryKeyProjectionMapper::all(metadata)?,
75            ))
76        }
77    }
78
79    /// Returns the metadata that created the mapper.
80    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81        match self {
82            ProjectionMapper::PrimaryKey(m) => m.metadata(),
83            ProjectionMapper::Flat(m) => m.metadata(),
84        }
85    }
86
87    /// Returns ids of projected columns that we need to read
88    /// from memtables and SSTs.
89    pub(crate) fn column_ids(&self) -> &[ColumnId] {
90        match self {
91            ProjectionMapper::PrimaryKey(m) => m.column_ids(),
92            ProjectionMapper::Flat(m) => m.column_ids(),
93        }
94    }
95
96    /// Returns the schema of converted [RecordBatch].
97    pub(crate) fn output_schema(&self) -> SchemaRef {
98        match self {
99            ProjectionMapper::PrimaryKey(m) => m.output_schema(),
100            ProjectionMapper::Flat(m) => m.output_schema(),
101        }
102    }
103
104    /// Returns the primary key projection mapper or None if this is not a primary key mapper.
105    pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
106        match self {
107            ProjectionMapper::PrimaryKey(m) => Some(m),
108            ProjectionMapper::Flat(_) => None,
109        }
110    }
111
112    /// Returns the flat projection mapper or None if this is not a flat mapper.
113    pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
114        match self {
115            ProjectionMapper::PrimaryKey(_) => None,
116            ProjectionMapper::Flat(m) => Some(m),
117        }
118    }
119
120    /// Returns an empty [RecordBatch].
121    // TODO(yingwen): This is unused now. Use it after we finishing the flat format.
122    pub fn empty_record_batch(&self) -> RecordBatch {
123        match self {
124            ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
125            ProjectionMapper::Flat(m) => m.empty_record_batch(),
126        }
127    }
128}
129
130/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
131pub struct PrimaryKeyProjectionMapper {
132    /// Metadata of the region.
133    metadata: RegionMetadataRef,
134    /// Maps column in [RecordBatch] to index in [Batch].
135    batch_indices: Vec<BatchIndex>,
136    /// Output record batch contains tags.
137    has_tags: bool,
138    /// Decoder for primary key.
139    codec: Arc<dyn PrimaryKeyCodec>,
140    /// Schema for converted [RecordBatch].
141    output_schema: SchemaRef,
142    /// Ids of columns to project. It keeps ids in the same order as the `projection`
143    /// indices to build the mapper.
144    column_ids: Vec<ColumnId>,
145    /// Ids and DataTypes of field columns in the [Batch].
146    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
147    /// `true` If the original projection is empty.
148    is_empty_projection: bool,
149}
150
151impl PrimaryKeyProjectionMapper {
152    /// Returns a new mapper with projection.
153    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
154    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
155    /// empty `RecordBatch` and only use its row count in this query.
156    pub fn new(
157        metadata: &RegionMetadataRef,
158        projection: impl Iterator<Item = usize>,
159    ) -> Result<PrimaryKeyProjectionMapper> {
160        let mut projection: Vec<_> = projection.collect();
161        // If the original projection is empty.
162        let is_empty_projection = projection.is_empty();
163        if is_empty_projection {
164            // If the projection is empty, we still read the time index column.
165            projection.push(metadata.time_index_column_pos());
166        }
167
168        let mut column_schemas = Vec::with_capacity(projection.len());
169        let mut column_ids = Vec::with_capacity(projection.len());
170        for idx in &projection {
171            // For each projection index, we get the column id for projection.
172            let column = metadata
173                .column_metadatas
174                .get(*idx)
175                .context(InvalidRequestSnafu {
176                    region_id: metadata.region_id,
177                    reason: format!("projection index {} is out of bound", idx),
178                })?;
179
180            column_ids.push(column.column_id);
181            // Safety: idx is valid.
182            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
183        }
184
185        let codec = build_primary_key_codec(metadata);
186        if is_empty_projection {
187            // If projection is empty, we don't output any column.
188            return Ok(PrimaryKeyProjectionMapper {
189                metadata: metadata.clone(),
190                batch_indices: vec![],
191                has_tags: false,
192                codec,
193                output_schema: Arc::new(Schema::new(vec![])),
194                column_ids,
195                batch_fields: vec![],
196                is_empty_projection,
197            });
198        }
199
200        // Safety: Columns come from existing schema.
201        let output_schema = Arc::new(Schema::new(column_schemas));
202        // Get fields in each batch.
203        let batch_fields = Batch::projected_fields(metadata, &column_ids);
204
205        // Field column id to its index in batch.
206        let field_id_to_index: HashMap<_, _> = batch_fields
207            .iter()
208            .enumerate()
209            .map(|(index, (column_id, _))| (*column_id, index))
210            .collect();
211        // For each projected column, compute its index in batches.
212        let mut batch_indices = Vec::with_capacity(projection.len());
213        let mut has_tags = false;
214        for idx in &projection {
215            // Safety: idx is valid.
216            let column = &metadata.column_metadatas[*idx];
217            // Get column index in a batch by its semantic type and column id.
218            let batch_index = match column.semantic_type {
219                SemanticType::Tag => {
220                    // Safety: It is a primary key column.
221                    let index = metadata.primary_key_index(column.column_id).unwrap();
222                    // We need to output a tag.
223                    has_tags = true;
224                    // We always read all primary key so the column always exists and the tag
225                    // index is always valid.
226                    BatchIndex::Tag((index, column.column_id))
227                }
228                SemanticType::Timestamp => BatchIndex::Timestamp,
229                SemanticType::Field => {
230                    // Safety: It is a field column so it should be in `field_id_to_index`.
231                    let index = field_id_to_index[&column.column_id];
232                    BatchIndex::Field(index)
233                }
234            };
235            batch_indices.push(batch_index);
236        }
237
238        Ok(PrimaryKeyProjectionMapper {
239            metadata: metadata.clone(),
240            batch_indices,
241            has_tags,
242            codec,
243            output_schema,
244            column_ids,
245            batch_fields,
246            is_empty_projection,
247        })
248    }
249
250    /// Returns a new mapper without projection.
251    pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
252        PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
253    }
254
255    /// Returns the metadata that created the mapper.
256    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
257        &self.metadata
258    }
259
260    /// Returns ids of projected columns that we need to read
261    /// from memtables and SSTs.
262    pub(crate) fn column_ids(&self) -> &[ColumnId] {
263        &self.column_ids
264    }
265
266    /// Returns ids of fields in [Batch]es the mapper expects to convert.
267    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
268        &self.batch_fields
269    }
270
271    /// Returns the schema of converted [RecordBatch].
272    /// This is the schema that the stream will output. This schema may contain
273    /// less columns than [PrimaryKeyProjectionMapper::column_ids()].
274    pub(crate) fn output_schema(&self) -> SchemaRef {
275        self.output_schema.clone()
276    }
277
278    /// Returns an empty [RecordBatch].
279    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
280        RecordBatch::new_empty(self.output_schema.clone())
281    }
282
283    /// Converts a [Batch] to a [RecordBatch].
284    ///
285    /// The batch must match the `projection` using to build the mapper.
286    pub(crate) fn convert(
287        &self,
288        batch: &Batch,
289        cache_strategy: &CacheStrategy,
290    ) -> common_recordbatch::error::Result<RecordBatch> {
291        if self.is_empty_projection {
292            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
293        }
294
295        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
296        debug_assert!(self
297            .batch_fields
298            .iter()
299            .zip(batch.fields())
300            .all(|((id, _), batch_col)| *id == batch_col.column_id));
301
302        // Skips decoding pk if we don't need to output it.
303        let pk_values = if self.has_tags {
304            match batch.pk_values() {
305                Some(v) => v.clone(),
306                None => self
307                    .codec
308                    .decode(batch.primary_key())
309                    .map_err(BoxedError::new)
310                    .context(ExternalSnafu)?,
311            }
312        } else {
313            CompositeValues::Dense(vec![])
314        };
315
316        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
317        let num_rows = batch.num_rows();
318        for (index, column_schema) in self
319            .batch_indices
320            .iter()
321            .zip(self.output_schema.column_schemas())
322        {
323            match index {
324                BatchIndex::Tag((idx, column_id)) => {
325                    let value = match &pk_values {
326                        CompositeValues::Dense(v) => &v[*idx].1,
327                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
328                    };
329                    let vector = repeated_vector_with_cache(
330                        &column_schema.data_type,
331                        value,
332                        num_rows,
333                        cache_strategy,
334                    )?;
335                    columns.push(vector);
336                }
337                BatchIndex::Timestamp => {
338                    columns.push(batch.timestamps().clone());
339                }
340                BatchIndex::Field(idx) => {
341                    columns.push(batch.fields()[*idx].data.clone());
342                }
343            }
344        }
345
346        RecordBatch::new(self.output_schema.clone(), columns)
347    }
348}
349
350/// Index of a vector in a [Batch].
351#[derive(Debug, Clone, Copy)]
352enum BatchIndex {
353    /// Index in primary keys.
354    Tag((usize, ColumnId)),
355    /// The time index column.
356    Timestamp,
357    /// Index in fields.
358    Field(usize),
359}
360
361/// Gets a vector with repeated values from specific cache or creates a new one.
362fn repeated_vector_with_cache(
363    data_type: &ConcreteDataType,
364    value: &Value,
365    num_rows: usize,
366    cache_strategy: &CacheStrategy,
367) -> common_recordbatch::error::Result<VectorRef> {
368    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
369        // Tries to get the vector from cache manager. If the vector doesn't
370        // have enough length, creates a new one.
371        match vector.len().cmp(&num_rows) {
372            Ordering::Less => (),
373            Ordering::Equal => return Ok(vector),
374            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
375        }
376    }
377
378    // Creates a new one.
379    let vector = new_repeated_vector(data_type, value, num_rows)?;
380    // Updates cache.
381    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
382        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
383    }
384
385    Ok(vector)
386}
387
388/// Returns a vector with repeated values.
389fn new_repeated_vector(
390    data_type: &ConcreteDataType,
391    value: &Value,
392    num_rows: usize,
393) -> common_recordbatch::error::Result<VectorRef> {
394    let mut mutable_vector = data_type.create_mutable_vector(1);
395    mutable_vector
396        .try_push_value_ref(value.as_value_ref())
397        .map_err(BoxedError::new)
398        .context(ExternalSnafu)?;
399    // This requires an additional allocation.
400    let base_vector = mutable_vector.to_vector();
401    Ok(base_vector.replicate(&[num_rows]))
402}
403
404#[cfg(test)]
405mod tests {
406    use std::sync::Arc;
407
408    use api::v1::OpType;
409    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
410    use datatypes::arrow::datatypes::Field;
411    use datatypes::arrow::util::pretty;
412    use datatypes::value::ValueRef;
413    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
414    use mito_codec::test_util::TestRegionMetadataBuilder;
415    use store_api::storage::consts::{
416        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
417    };
418
419    use super::*;
420    use crate::cache::CacheManager;
421    use crate::read::BatchBuilder;
422
423    fn new_batch(
424        ts_start: i64,
425        tags: &[i64],
426        fields: &[(ColumnId, i64)],
427        num_rows: usize,
428    ) -> Batch {
429        let converter = DensePrimaryKeyCodec::with_fields(
430            (0..tags.len())
431                .map(|idx| {
432                    (
433                        idx as u32,
434                        SortField::new(ConcreteDataType::int64_datatype()),
435                    )
436                })
437                .collect(),
438        );
439        let primary_key = converter
440            .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
441            .unwrap();
442
443        let mut builder = BatchBuilder::new(primary_key);
444        builder
445            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
446                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
447            )))
448            .unwrap()
449            .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
450            .unwrap()
451            .op_types_array(Arc::new(UInt8Array::from_iter_values(
452                (0..num_rows).map(|_| OpType::Put as u8),
453            )))
454            .unwrap();
455        for (column_id, field) in fields {
456            builder
457                .push_field_array(
458                    *column_id,
459                    Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
460                        *field, num_rows,
461                    ))),
462                )
463                .unwrap();
464        }
465        builder.build().unwrap()
466    }
467
468    fn print_record_batch(record_batch: RecordBatch) -> String {
469        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
470            .unwrap()
471            .to_string()
472    }
473
474    #[test]
475    fn test_projection_mapper_all() {
476        let metadata = Arc::new(
477            TestRegionMetadataBuilder::default()
478                .num_tags(2)
479                .num_fields(2)
480                .build(),
481        );
482        // Create the enum wrapper with default format (primary key)
483        let mapper = ProjectionMapper::all(&metadata, false).unwrap();
484        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
485        assert_eq!(
486            [
487                (3, ConcreteDataType::int64_datatype()),
488                (4, ConcreteDataType::int64_datatype())
489            ],
490            mapper.as_primary_key().unwrap().batch_fields()
491        );
492
493        // With vector cache.
494        let cache = CacheManager::builder().vector_cache_size(1024).build();
495        let cache = CacheStrategy::EnableAll(Arc::new(cache));
496        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
497        let record_batch = mapper
498            .as_primary_key()
499            .unwrap()
500            .convert(&batch, &cache)
501            .unwrap();
502        let expect = "\
503+---------------------+----+----+----+----+
504| ts                  | k0 | k1 | v0 | v1 |
505+---------------------+----+----+----+----+
506| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
507| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
508| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
509+---------------------+----+----+----+----+";
510        assert_eq!(expect, print_record_batch(record_batch));
511
512        assert!(cache
513            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
514            .is_some());
515        assert!(cache
516            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
517            .is_some());
518        assert!(cache
519            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
520            .is_none());
521        let record_batch = mapper
522            .as_primary_key()
523            .unwrap()
524            .convert(&batch, &cache)
525            .unwrap();
526        assert_eq!(expect, print_record_batch(record_batch));
527    }
528
529    #[test]
530    fn test_projection_mapper_with_projection() {
531        let metadata = Arc::new(
532            TestRegionMetadataBuilder::default()
533                .num_tags(2)
534                .num_fields(2)
535                .build(),
536        );
537        // Columns v1, k0
538        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
539        assert_eq!([4, 1], mapper.column_ids());
540        assert_eq!(
541            [(4, ConcreteDataType::int64_datatype())],
542            mapper.as_primary_key().unwrap().batch_fields()
543        );
544
545        let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
546        let cache = CacheManager::builder().vector_cache_size(1024).build();
547        let cache = CacheStrategy::EnableAll(Arc::new(cache));
548        let record_batch = mapper
549            .as_primary_key()
550            .unwrap()
551            .convert(&batch, &cache)
552            .unwrap();
553        let expect = "\
554+----+----+
555| v1 | k0 |
556+----+----+
557| 4  | 1  |
558| 4  | 1  |
559| 4  | 1  |
560+----+----+";
561        assert_eq!(expect, print_record_batch(record_batch));
562    }
563
564    #[test]
565    fn test_projection_mapper_empty_projection() {
566        let metadata = Arc::new(
567            TestRegionMetadataBuilder::default()
568                .num_tags(2)
569                .num_fields(2)
570                .build(),
571        );
572        // Empty projection
573        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
574        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
575        assert!(mapper.output_schema().is_empty());
576        let pk_mapper = mapper.as_primary_key().unwrap();
577        assert!(pk_mapper.batch_fields().is_empty());
578        assert!(!pk_mapper.has_tags);
579        assert!(pk_mapper.batch_indices.is_empty());
580        assert!(pk_mapper.is_empty_projection);
581
582        let batch = new_batch(0, &[1, 2], &[], 3);
583        let cache = CacheManager::builder().vector_cache_size(1024).build();
584        let cache = CacheStrategy::EnableAll(Arc::new(cache));
585        let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
586        assert_eq!(3, record_batch.num_rows());
587        assert_eq!(0, record_batch.num_columns());
588        assert!(record_batch.schema.is_empty());
589    }
590
591    fn new_flat_batch(
592        ts_start: Option<i64>,
593        idx_tags: &[(usize, i64)],
594        idx_fields: &[(usize, i64)],
595        num_rows: usize,
596    ) -> datatypes::arrow::record_batch::RecordBatch {
597        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
598        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
599
600        // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
601
602        // Primary key columns first
603        for (i, tag) in idx_tags {
604            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
605                *tag, num_rows,
606            ))) as _;
607            columns.push(array);
608            fields.push(Field::new(
609                format!("k{i}"),
610                datatypes::arrow::datatypes::DataType::Int64,
611                true,
612            ));
613        }
614
615        // Field columns
616        for (i, field) in idx_fields {
617            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
618                *field, num_rows,
619            ))) as _;
620            columns.push(array);
621            fields.push(Field::new(
622                format!("v{i}"),
623                datatypes::arrow::datatypes::DataType::Int64,
624                true,
625            ));
626        }
627
628        // Time index
629        if let Some(ts_start) = ts_start {
630            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
631                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
632            )) as _;
633            columns.push(timestamps);
634            fields.push(Field::new(
635                "ts",
636                datatypes::arrow::datatypes::DataType::Timestamp(
637                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
638                    None,
639                ),
640                true,
641            ));
642        }
643
644        // __primary_key column (encoded primary key as dictionary)
645        // Create encoded primary key
646        let converter = DensePrimaryKeyCodec::with_fields(
647            (0..idx_tags.len())
648                .map(|idx| {
649                    (
650                        idx as u32,
651                        SortField::new(ConcreteDataType::int64_datatype()),
652                    )
653                })
654                .collect(),
655        );
656        let encoded_pk = converter
657            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
658            .unwrap();
659
660        // Create dictionary array for the encoded primary key
661        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
662        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
663        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
664        let pk_array =
665            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
666        columns.push(pk_array);
667        fields.push(Field::new_dictionary(
668            PRIMARY_KEY_COLUMN_NAME,
669            datatypes::arrow::datatypes::DataType::UInt32,
670            datatypes::arrow::datatypes::DataType::Binary,
671            false,
672        ));
673
674        // __sequence column
675        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
676        fields.push(Field::new(
677            SEQUENCE_COLUMN_NAME,
678            datatypes::arrow::datatypes::DataType::UInt64,
679            false,
680        ));
681
682        // __op_type column
683        columns.push(Arc::new(UInt8Array::from_iter_values(
684            (0..num_rows).map(|_| OpType::Put as u8),
685        )) as _);
686        fields.push(Field::new(
687            OP_TYPE_COLUMN_NAME,
688            datatypes::arrow::datatypes::DataType::UInt8,
689            false,
690        ));
691
692        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
693
694        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
695    }
696
697    #[test]
698    fn test_flat_projection_mapper_all() {
699        let metadata = Arc::new(
700            TestRegionMetadataBuilder::default()
701                .num_tags(2)
702                .num_fields(2)
703                .build(),
704        );
705        let mapper = ProjectionMapper::all(&metadata, true).unwrap();
706        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
707        assert_eq!(
708            [
709                (1, ConcreteDataType::int64_datatype()),
710                (2, ConcreteDataType::int64_datatype()),
711                (3, ConcreteDataType::int64_datatype()),
712                (4, ConcreteDataType::int64_datatype()),
713                (0, ConcreteDataType::timestamp_millisecond_datatype())
714            ],
715            mapper.as_flat().unwrap().batch_schema()
716        );
717
718        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
719        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
720        let expect = "\
721+---------------------+----+----+----+----+
722| ts                  | k0 | k1 | v0 | v1 |
723+---------------------+----+----+----+----+
724| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
725| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
726| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
727+---------------------+----+----+----+----+";
728        assert_eq!(expect, print_record_batch(record_batch));
729    }
730
731    #[test]
732    fn test_flat_projection_mapper_with_projection() {
733        let metadata = Arc::new(
734            TestRegionMetadataBuilder::default()
735                .num_tags(2)
736                .num_fields(2)
737                .build(),
738        );
739        // Columns v1, k0
740        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
741        assert_eq!([4, 1], mapper.column_ids());
742        assert_eq!(
743            [
744                (1, ConcreteDataType::int64_datatype()),
745                (4, ConcreteDataType::int64_datatype())
746            ],
747            mapper.as_flat().unwrap().batch_schema()
748        );
749
750        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
751        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
752        let expect = "\
753+----+----+
754| v1 | k0 |
755+----+----+
756| 4  | 1  |
757| 4  | 1  |
758| 4  | 1  |
759+----+----+";
760        assert_eq!(expect, print_record_batch(record_batch));
761    }
762
763    #[test]
764    fn test_flat_projection_mapper_empty_projection() {
765        let metadata = Arc::new(
766            TestRegionMetadataBuilder::default()
767                .num_tags(2)
768                .num_fields(2)
769                .build(),
770        );
771        // Empty projection
772        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
773        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
774        assert!(mapper.output_schema().is_empty());
775        let flat_mapper = mapper.as_flat().unwrap();
776        assert!(flat_mapper.batch_schema().is_empty());
777
778        let batch = new_flat_batch(Some(0), &[], &[], 3);
779        let record_batch = flat_mapper.convert(&batch).unwrap();
780        assert_eq!(3, record_batch.num_rows());
781        assert_eq!(0, record_batch.num_columns());
782        assert!(record_batch.schema.is_empty());
783    }
784}