mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection operations.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39/// Only cache vector when its length `<=` this value.
40const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42/// Wrapper enum for different projection mapper implementations.
43pub enum ProjectionMapper {
44    /// Projection mapper for primary key format.
45    PrimaryKey(PrimaryKeyProjectionMapper),
46    /// Projection mapper for flat format.
47    Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51    /// Returns a new mapper with projection.
52    pub fn new(
53        metadata: &RegionMetadataRef,
54        projection: impl Iterator<Item = usize> + Clone,
55        flat_format: bool,
56    ) -> Result<Self> {
57        if flat_format {
58            Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59                metadata, projection,
60            )?))
61        } else {
62            Ok(ProjectionMapper::PrimaryKey(
63                PrimaryKeyProjectionMapper::new(metadata, projection)?,
64            ))
65        }
66    }
67
68    /// Returns a new mapper without projection.
69    pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70        if flat_format {
71            Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72        } else {
73            Ok(ProjectionMapper::PrimaryKey(
74                PrimaryKeyProjectionMapper::all(metadata)?,
75            ))
76        }
77    }
78
79    /// Returns the metadata that created the mapper.
80    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81        match self {
82            ProjectionMapper::PrimaryKey(m) => m.metadata(),
83            ProjectionMapper::Flat(m) => m.metadata(),
84        }
85    }
86
87    /// Returns true if the projection includes any tag columns.
88    pub(crate) fn has_tags(&self) -> bool {
89        match self {
90            ProjectionMapper::PrimaryKey(m) => m.has_tags(),
91            ProjectionMapper::Flat(_) => false,
92        }
93    }
94
95    /// Returns ids of projected columns that we need to read
96    /// from memtables and SSTs.
97    pub(crate) fn column_ids(&self) -> &[ColumnId] {
98        match self {
99            ProjectionMapper::PrimaryKey(m) => m.column_ids(),
100            ProjectionMapper::Flat(m) => m.column_ids(),
101        }
102    }
103
104    /// Returns the schema of converted [RecordBatch].
105    pub(crate) fn output_schema(&self) -> SchemaRef {
106        match self {
107            ProjectionMapper::PrimaryKey(m) => m.output_schema(),
108            ProjectionMapper::Flat(m) => m.output_schema(),
109        }
110    }
111
112    /// Returns the primary key projection mapper or None if this is not a primary key mapper.
113    pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
114        match self {
115            ProjectionMapper::PrimaryKey(m) => Some(m),
116            ProjectionMapper::Flat(_) => None,
117        }
118    }
119
120    /// Returns the flat projection mapper or None if this is not a flat mapper.
121    pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
122        match self {
123            ProjectionMapper::PrimaryKey(_) => None,
124            ProjectionMapper::Flat(m) => Some(m),
125        }
126    }
127
128    /// Returns an empty [RecordBatch].
129    // TODO(yingwen): This is unused now. Use it after we finishing the flat format.
130    pub fn empty_record_batch(&self) -> RecordBatch {
131        match self {
132            ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
133            ProjectionMapper::Flat(m) => m.empty_record_batch(),
134        }
135    }
136}
137
138/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
139pub struct PrimaryKeyProjectionMapper {
140    /// Metadata of the region.
141    metadata: RegionMetadataRef,
142    /// Maps column in [RecordBatch] to index in [Batch].
143    batch_indices: Vec<BatchIndex>,
144    /// Output record batch contains tags.
145    has_tags: bool,
146    /// Decoder for primary key.
147    codec: Arc<dyn PrimaryKeyCodec>,
148    /// Schema for converted [RecordBatch].
149    output_schema: SchemaRef,
150    /// Ids of columns to project. It keeps ids in the same order as the `projection`
151    /// indices to build the mapper.
152    column_ids: Vec<ColumnId>,
153    /// Ids and DataTypes of field columns in the [Batch].
154    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
155    /// `true` If the original projection is empty.
156    is_empty_projection: bool,
157}
158
159impl PrimaryKeyProjectionMapper {
160    /// Returns a new mapper with projection.
161    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
162    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
163    /// empty `RecordBatch` and only use its row count in this query.
164    pub fn new(
165        metadata: &RegionMetadataRef,
166        projection: impl Iterator<Item = usize>,
167    ) -> Result<PrimaryKeyProjectionMapper> {
168        let mut projection: Vec<_> = projection.collect();
169        // If the original projection is empty.
170        let is_empty_projection = projection.is_empty();
171        if is_empty_projection {
172            // If the projection is empty, we still read the time index column.
173            projection.push(metadata.time_index_column_pos());
174        }
175
176        let mut column_schemas = Vec::with_capacity(projection.len());
177        let mut column_ids = Vec::with_capacity(projection.len());
178        for idx in &projection {
179            // For each projection index, we get the column id for projection.
180            let column = metadata
181                .column_metadatas
182                .get(*idx)
183                .context(InvalidRequestSnafu {
184                    region_id: metadata.region_id,
185                    reason: format!("projection index {} is out of bound", idx),
186                })?;
187
188            column_ids.push(column.column_id);
189            // Safety: idx is valid.
190            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
191        }
192
193        let codec = build_primary_key_codec(metadata);
194        if is_empty_projection {
195            // If projection is empty, we don't output any column.
196            return Ok(PrimaryKeyProjectionMapper {
197                metadata: metadata.clone(),
198                batch_indices: vec![],
199                has_tags: false,
200                codec,
201                output_schema: Arc::new(Schema::new(vec![])),
202                column_ids,
203                batch_fields: vec![],
204                is_empty_projection,
205            });
206        }
207
208        // Safety: Columns come from existing schema.
209        let output_schema = Arc::new(Schema::new(column_schemas));
210        // Get fields in each batch.
211        let batch_fields = Batch::projected_fields(metadata, &column_ids);
212
213        // Field column id to its index in batch.
214        let field_id_to_index: HashMap<_, _> = batch_fields
215            .iter()
216            .enumerate()
217            .map(|(index, (column_id, _))| (*column_id, index))
218            .collect();
219        // For each projected column, compute its index in batches.
220        let mut batch_indices = Vec::with_capacity(projection.len());
221        let mut has_tags = false;
222        for idx in &projection {
223            // Safety: idx is valid.
224            let column = &metadata.column_metadatas[*idx];
225            // Get column index in a batch by its semantic type and column id.
226            let batch_index = match column.semantic_type {
227                SemanticType::Tag => {
228                    // Safety: It is a primary key column.
229                    let index = metadata.primary_key_index(column.column_id).unwrap();
230                    // We need to output a tag.
231                    has_tags = true;
232                    // We always read all primary key so the column always exists and the tag
233                    // index is always valid.
234                    BatchIndex::Tag((index, column.column_id))
235                }
236                SemanticType::Timestamp => BatchIndex::Timestamp,
237                SemanticType::Field => {
238                    // Safety: It is a field column so it should be in `field_id_to_index`.
239                    let index = field_id_to_index[&column.column_id];
240                    BatchIndex::Field(index)
241                }
242            };
243            batch_indices.push(batch_index);
244        }
245
246        Ok(PrimaryKeyProjectionMapper {
247            metadata: metadata.clone(),
248            batch_indices,
249            has_tags,
250            codec,
251            output_schema,
252            column_ids,
253            batch_fields,
254            is_empty_projection,
255        })
256    }
257
258    /// Returns a new mapper without projection.
259    pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
260        PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
261    }
262
263    /// Returns the metadata that created the mapper.
264    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
265        &self.metadata
266    }
267
268    /// Returns true if the projection includes any tag columns.
269    pub(crate) fn has_tags(&self) -> bool {
270        self.has_tags
271    }
272
273    /// Returns ids of projected columns that we need to read
274    /// from memtables and SSTs.
275    pub(crate) fn column_ids(&self) -> &[ColumnId] {
276        &self.column_ids
277    }
278
279    /// Returns ids of fields in [Batch]es the mapper expects to convert.
280    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
281        &self.batch_fields
282    }
283
284    /// Returns the schema of converted [RecordBatch].
285    /// This is the schema that the stream will output. This schema may contain
286    /// less columns than [PrimaryKeyProjectionMapper::column_ids()].
287    pub(crate) fn output_schema(&self) -> SchemaRef {
288        self.output_schema.clone()
289    }
290
291    /// Returns an empty [RecordBatch].
292    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
293        RecordBatch::new_empty(self.output_schema.clone())
294    }
295
296    /// Converts a [Batch] to a [RecordBatch].
297    ///
298    /// The batch must match the `projection` using to build the mapper.
299    pub(crate) fn convert(
300        &self,
301        batch: &Batch,
302        cache_strategy: &CacheStrategy,
303    ) -> common_recordbatch::error::Result<RecordBatch> {
304        if self.is_empty_projection {
305            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
306        }
307
308        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
309        debug_assert!(
310            self.batch_fields
311                .iter()
312                .zip(batch.fields())
313                .all(|((id, _), batch_col)| *id == batch_col.column_id)
314        );
315
316        // Skips decoding pk if we don't need to output it.
317        let pk_values = if self.has_tags {
318            match batch.pk_values() {
319                Some(v) => v.clone(),
320                None => self
321                    .codec
322                    .decode(batch.primary_key())
323                    .map_err(BoxedError::new)
324                    .context(ExternalSnafu)?,
325            }
326        } else {
327            CompositeValues::Dense(vec![])
328        };
329
330        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
331        let num_rows = batch.num_rows();
332        for (index, column_schema) in self
333            .batch_indices
334            .iter()
335            .zip(self.output_schema.column_schemas())
336        {
337            match index {
338                BatchIndex::Tag((idx, column_id)) => {
339                    let value = match &pk_values {
340                        CompositeValues::Dense(v) => &v[*idx].1,
341                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
342                    };
343                    let vector = repeated_vector_with_cache(
344                        &column_schema.data_type,
345                        value,
346                        num_rows,
347                        cache_strategy,
348                    )?;
349                    columns.push(vector);
350                }
351                BatchIndex::Timestamp => {
352                    columns.push(batch.timestamps().clone());
353                }
354                BatchIndex::Field(idx) => {
355                    columns.push(batch.fields()[*idx].data.clone());
356                }
357            }
358        }
359
360        RecordBatch::new(self.output_schema.clone(), columns)
361    }
362}
363
364/// Index of a vector in a [Batch].
365#[derive(Debug, Clone, Copy)]
366enum BatchIndex {
367    /// Index in primary keys.
368    Tag((usize, ColumnId)),
369    /// The time index column.
370    Timestamp,
371    /// Index in fields.
372    Field(usize),
373}
374
375/// Gets a vector with repeated values from specific cache or creates a new one.
376fn repeated_vector_with_cache(
377    data_type: &ConcreteDataType,
378    value: &Value,
379    num_rows: usize,
380    cache_strategy: &CacheStrategy,
381) -> common_recordbatch::error::Result<VectorRef> {
382    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
383        // Tries to get the vector from cache manager. If the vector doesn't
384        // have enough length, creates a new one.
385        match vector.len().cmp(&num_rows) {
386            Ordering::Less => (),
387            Ordering::Equal => return Ok(vector),
388            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
389        }
390    }
391
392    // Creates a new one.
393    let vector = new_repeated_vector(data_type, value, num_rows)?;
394    // Updates cache.
395    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
396        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
397    }
398
399    Ok(vector)
400}
401
402/// Returns a vector with repeated values.
403fn new_repeated_vector(
404    data_type: &ConcreteDataType,
405    value: &Value,
406    num_rows: usize,
407) -> common_recordbatch::error::Result<VectorRef> {
408    let mut mutable_vector = data_type.create_mutable_vector(1);
409    mutable_vector
410        .try_push_value_ref(&value.as_value_ref())
411        .map_err(BoxedError::new)
412        .context(ExternalSnafu)?;
413    // This requires an additional allocation.
414    let base_vector = mutable_vector.to_vector();
415    Ok(base_vector.replicate(&[num_rows]))
416}
417
418#[cfg(test)]
419mod tests {
420    use std::sync::Arc;
421
422    use api::v1::OpType;
423    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
424    use datatypes::arrow::datatypes::Field;
425    use datatypes::arrow::util::pretty;
426    use datatypes::value::ValueRef;
427    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
428    use mito_codec::test_util::TestRegionMetadataBuilder;
429    use store_api::storage::consts::{
430        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
431    };
432
433    use super::*;
434    use crate::cache::CacheManager;
435    use crate::read::BatchBuilder;
436
437    fn new_batch(
438        ts_start: i64,
439        tags: &[i64],
440        fields: &[(ColumnId, i64)],
441        num_rows: usize,
442    ) -> Batch {
443        let converter = DensePrimaryKeyCodec::with_fields(
444            (0..tags.len())
445                .map(|idx| {
446                    (
447                        idx as u32,
448                        SortField::new(ConcreteDataType::int64_datatype()),
449                    )
450                })
451                .collect(),
452        );
453        let primary_key = converter
454            .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
455            .unwrap();
456
457        let mut builder = BatchBuilder::new(primary_key);
458        builder
459            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
460                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
461            )))
462            .unwrap()
463            .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
464            .unwrap()
465            .op_types_array(Arc::new(UInt8Array::from_iter_values(
466                (0..num_rows).map(|_| OpType::Put as u8),
467            )))
468            .unwrap();
469        for (column_id, field) in fields {
470            builder
471                .push_field_array(
472                    *column_id,
473                    Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
474                        *field, num_rows,
475                    ))),
476                )
477                .unwrap();
478        }
479        builder.build().unwrap()
480    }
481
482    fn print_record_batch(record_batch: RecordBatch) -> String {
483        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
484            .unwrap()
485            .to_string()
486    }
487
488    #[test]
489    fn test_projection_mapper_all() {
490        let metadata = Arc::new(
491            TestRegionMetadataBuilder::default()
492                .num_tags(2)
493                .num_fields(2)
494                .build(),
495        );
496        // Create the enum wrapper with default format (primary key)
497        let mapper = ProjectionMapper::all(&metadata, false).unwrap();
498        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
499        assert_eq!(
500            [
501                (3, ConcreteDataType::int64_datatype()),
502                (4, ConcreteDataType::int64_datatype())
503            ],
504            mapper.as_primary_key().unwrap().batch_fields()
505        );
506
507        // With vector cache.
508        let cache = CacheManager::builder().vector_cache_size(1024).build();
509        let cache = CacheStrategy::EnableAll(Arc::new(cache));
510        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
511        let record_batch = mapper
512            .as_primary_key()
513            .unwrap()
514            .convert(&batch, &cache)
515            .unwrap();
516        let expect = "\
517+---------------------+----+----+----+----+
518| ts                  | k0 | k1 | v0 | v1 |
519+---------------------+----+----+----+----+
520| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
521| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
522| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
523+---------------------+----+----+----+----+";
524        assert_eq!(expect, print_record_batch(record_batch));
525
526        assert!(
527            cache
528                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
529                .is_some()
530        );
531        assert!(
532            cache
533                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
534                .is_some()
535        );
536        assert!(
537            cache
538                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
539                .is_none()
540        );
541        let record_batch = mapper
542            .as_primary_key()
543            .unwrap()
544            .convert(&batch, &cache)
545            .unwrap();
546        assert_eq!(expect, print_record_batch(record_batch));
547    }
548
549    #[test]
550    fn test_projection_mapper_with_projection() {
551        let metadata = Arc::new(
552            TestRegionMetadataBuilder::default()
553                .num_tags(2)
554                .num_fields(2)
555                .build(),
556        );
557        // Columns v1, k0
558        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
559        assert_eq!([4, 1], mapper.column_ids());
560        assert_eq!(
561            [(4, ConcreteDataType::int64_datatype())],
562            mapper.as_primary_key().unwrap().batch_fields()
563        );
564
565        let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
566        let cache = CacheManager::builder().vector_cache_size(1024).build();
567        let cache = CacheStrategy::EnableAll(Arc::new(cache));
568        let record_batch = mapper
569            .as_primary_key()
570            .unwrap()
571            .convert(&batch, &cache)
572            .unwrap();
573        let expect = "\
574+----+----+
575| v1 | k0 |
576+----+----+
577| 4  | 1  |
578| 4  | 1  |
579| 4  | 1  |
580+----+----+";
581        assert_eq!(expect, print_record_batch(record_batch));
582    }
583
584    #[test]
585    fn test_projection_mapper_empty_projection() {
586        let metadata = Arc::new(
587            TestRegionMetadataBuilder::default()
588                .num_tags(2)
589                .num_fields(2)
590                .build(),
591        );
592        // Empty projection
593        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
594        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
595        assert!(mapper.output_schema().is_empty());
596        let pk_mapper = mapper.as_primary_key().unwrap();
597        assert!(pk_mapper.batch_fields().is_empty());
598        assert!(!pk_mapper.has_tags);
599        assert!(pk_mapper.batch_indices.is_empty());
600        assert!(pk_mapper.is_empty_projection);
601
602        let batch = new_batch(0, &[1, 2], &[], 3);
603        let cache = CacheManager::builder().vector_cache_size(1024).build();
604        let cache = CacheStrategy::EnableAll(Arc::new(cache));
605        let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
606        assert_eq!(3, record_batch.num_rows());
607        assert_eq!(0, record_batch.num_columns());
608        assert!(record_batch.schema.is_empty());
609    }
610
611    fn new_flat_batch(
612        ts_start: Option<i64>,
613        idx_tags: &[(usize, i64)],
614        idx_fields: &[(usize, i64)],
615        num_rows: usize,
616    ) -> datatypes::arrow::record_batch::RecordBatch {
617        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
618        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
619
620        // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
621
622        // Primary key columns first
623        for (i, tag) in idx_tags {
624            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
625                *tag, num_rows,
626            ))) as _;
627            columns.push(array);
628            fields.push(Field::new(
629                format!("k{i}"),
630                datatypes::arrow::datatypes::DataType::Int64,
631                true,
632            ));
633        }
634
635        // Field columns
636        for (i, field) in idx_fields {
637            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
638                *field, num_rows,
639            ))) as _;
640            columns.push(array);
641            fields.push(Field::new(
642                format!("v{i}"),
643                datatypes::arrow::datatypes::DataType::Int64,
644                true,
645            ));
646        }
647
648        // Time index
649        if let Some(ts_start) = ts_start {
650            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
651                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
652            )) as _;
653            columns.push(timestamps);
654            fields.push(Field::new(
655                "ts",
656                datatypes::arrow::datatypes::DataType::Timestamp(
657                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
658                    None,
659                ),
660                true,
661            ));
662        }
663
664        // __primary_key column (encoded primary key as dictionary)
665        // Create encoded primary key
666        let converter = DensePrimaryKeyCodec::with_fields(
667            (0..idx_tags.len())
668                .map(|idx| {
669                    (
670                        idx as u32,
671                        SortField::new(ConcreteDataType::int64_datatype()),
672                    )
673                })
674                .collect(),
675        );
676        let encoded_pk = converter
677            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
678            .unwrap();
679
680        // Create dictionary array for the encoded primary key
681        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
682        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
683        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
684        let pk_array =
685            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
686        columns.push(pk_array);
687        fields.push(Field::new_dictionary(
688            PRIMARY_KEY_COLUMN_NAME,
689            datatypes::arrow::datatypes::DataType::UInt32,
690            datatypes::arrow::datatypes::DataType::Binary,
691            false,
692        ));
693
694        // __sequence column
695        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
696        fields.push(Field::new(
697            SEQUENCE_COLUMN_NAME,
698            datatypes::arrow::datatypes::DataType::UInt64,
699            false,
700        ));
701
702        // __op_type column
703        columns.push(Arc::new(UInt8Array::from_iter_values(
704            (0..num_rows).map(|_| OpType::Put as u8),
705        )) as _);
706        fields.push(Field::new(
707            OP_TYPE_COLUMN_NAME,
708            datatypes::arrow::datatypes::DataType::UInt8,
709            false,
710        ));
711
712        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
713
714        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
715    }
716
717    #[test]
718    fn test_flat_projection_mapper_all() {
719        let metadata = Arc::new(
720            TestRegionMetadataBuilder::default()
721                .num_tags(2)
722                .num_fields(2)
723                .build(),
724        );
725        let mapper = ProjectionMapper::all(&metadata, true).unwrap();
726        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
727        assert_eq!(
728            [
729                (1, ConcreteDataType::int64_datatype()),
730                (2, ConcreteDataType::int64_datatype()),
731                (3, ConcreteDataType::int64_datatype()),
732                (4, ConcreteDataType::int64_datatype()),
733                (0, ConcreteDataType::timestamp_millisecond_datatype())
734            ],
735            mapper.as_flat().unwrap().batch_schema()
736        );
737
738        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
739        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
740        let expect = "\
741+---------------------+----+----+----+----+
742| ts                  | k0 | k1 | v0 | v1 |
743+---------------------+----+----+----+----+
744| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
745| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
746| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
747+---------------------+----+----+----+----+";
748        assert_eq!(expect, print_record_batch(record_batch));
749    }
750
751    #[test]
752    fn test_flat_projection_mapper_with_projection() {
753        let metadata = Arc::new(
754            TestRegionMetadataBuilder::default()
755                .num_tags(2)
756                .num_fields(2)
757                .build(),
758        );
759        // Columns v1, k0
760        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
761        assert_eq!([4, 1], mapper.column_ids());
762        assert_eq!(
763            [
764                (1, ConcreteDataType::int64_datatype()),
765                (4, ConcreteDataType::int64_datatype()),
766                (0, ConcreteDataType::timestamp_millisecond_datatype())
767            ],
768            mapper.as_flat().unwrap().batch_schema()
769        );
770
771        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
772        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
773        let expect = "\
774+----+----+
775| v1 | k0 |
776+----+----+
777| 4  | 1  |
778| 4  | 1  |
779| 4  | 1  |
780+----+----+";
781        assert_eq!(expect, print_record_batch(record_batch));
782    }
783
784    #[test]
785    fn test_flat_projection_mapper_empty_projection() {
786        let metadata = Arc::new(
787            TestRegionMetadataBuilder::default()
788                .num_tags(2)
789                .num_fields(2)
790                .build(),
791        );
792        // Empty projection
793        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
794        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
795        assert!(mapper.output_schema().is_empty());
796        let flat_mapper = mapper.as_flat().unwrap();
797        assert!(flat_mapper.batch_schema().is_empty());
798
799        let batch = new_flat_batch(Some(0), &[], &[], 3);
800        let record_batch = flat_mapper.convert(&batch).unwrap();
801        assert_eq!(3, record_batch.num_rows());
802        assert_eq!(0, record_batch.num_columns());
803        assert!(record_batch.schema.is_empty());
804    }
805}