mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection operations.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39/// Only cache vector when its length `<=` this value.
40const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42/// Wrapper enum for different projection mapper implementations.
43pub enum ProjectionMapper {
44    /// Projection mapper for primary key format.
45    PrimaryKey(PrimaryKeyProjectionMapper),
46    /// Projection mapper for flat format.
47    Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51    /// Returns a new mapper with projection.
52    pub fn new(
53        metadata: &RegionMetadataRef,
54        projection: impl Iterator<Item = usize> + Clone,
55        flat_format: bool,
56    ) -> Result<Self> {
57        if flat_format {
58            Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59                metadata, projection,
60            )?))
61        } else {
62            Ok(ProjectionMapper::PrimaryKey(
63                PrimaryKeyProjectionMapper::new(metadata, projection)?,
64            ))
65        }
66    }
67
68    /// Returns a new mapper without projection.
69    pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
70        if flat_format {
71            Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
72        } else {
73            Ok(ProjectionMapper::PrimaryKey(
74                PrimaryKeyProjectionMapper::all(metadata)?,
75            ))
76        }
77    }
78
79    /// Returns the metadata that created the mapper.
80    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
81        match self {
82            ProjectionMapper::PrimaryKey(m) => m.metadata(),
83            ProjectionMapper::Flat(m) => m.metadata(),
84        }
85    }
86
87    /// Returns ids of projected columns that we need to read
88    /// from memtables and SSTs.
89    pub(crate) fn column_ids(&self) -> &[ColumnId] {
90        match self {
91            ProjectionMapper::PrimaryKey(m) => m.column_ids(),
92            ProjectionMapper::Flat(m) => m.column_ids(),
93        }
94    }
95
96    /// Returns the schema of converted [RecordBatch].
97    pub(crate) fn output_schema(&self) -> SchemaRef {
98        match self {
99            ProjectionMapper::PrimaryKey(m) => m.output_schema(),
100            ProjectionMapper::Flat(m) => m.output_schema(),
101        }
102    }
103
104    /// Returns the primary key projection mapper or None if this is not a primary key mapper.
105    pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
106        match self {
107            ProjectionMapper::PrimaryKey(m) => Some(m),
108            ProjectionMapper::Flat(_) => None,
109        }
110    }
111
112    /// Returns the flat projection mapper or None if this is not a flat mapper.
113    pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
114        match self {
115            ProjectionMapper::PrimaryKey(_) => None,
116            ProjectionMapper::Flat(m) => Some(m),
117        }
118    }
119
120    /// Returns an empty [RecordBatch].
121    // TODO(yingwen): This is unused now. Use it after we finishing the flat format.
122    pub fn empty_record_batch(&self) -> RecordBatch {
123        match self {
124            ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
125            ProjectionMapper::Flat(m) => m.empty_record_batch(),
126        }
127    }
128}
129
130/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
131pub struct PrimaryKeyProjectionMapper {
132    /// Metadata of the region.
133    metadata: RegionMetadataRef,
134    /// Maps column in [RecordBatch] to index in [Batch].
135    batch_indices: Vec<BatchIndex>,
136    /// Output record batch contains tags.
137    has_tags: bool,
138    /// Decoder for primary key.
139    codec: Arc<dyn PrimaryKeyCodec>,
140    /// Schema for converted [RecordBatch].
141    output_schema: SchemaRef,
142    /// Ids of columns to project. It keeps ids in the same order as the `projection`
143    /// indices to build the mapper.
144    column_ids: Vec<ColumnId>,
145    /// Ids and DataTypes of field columns in the [Batch].
146    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
147    /// `true` If the original projection is empty.
148    is_empty_projection: bool,
149}
150
151impl PrimaryKeyProjectionMapper {
152    /// Returns a new mapper with projection.
153    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
154    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
155    /// empty `RecordBatch` and only use its row count in this query.
156    pub fn new(
157        metadata: &RegionMetadataRef,
158        projection: impl Iterator<Item = usize>,
159    ) -> Result<PrimaryKeyProjectionMapper> {
160        let mut projection: Vec<_> = projection.collect();
161        // If the original projection is empty.
162        let is_empty_projection = projection.is_empty();
163        if is_empty_projection {
164            // If the projection is empty, we still read the time index column.
165            projection.push(metadata.time_index_column_pos());
166        }
167
168        let mut column_schemas = Vec::with_capacity(projection.len());
169        let mut column_ids = Vec::with_capacity(projection.len());
170        for idx in &projection {
171            // For each projection index, we get the column id for projection.
172            let column = metadata
173                .column_metadatas
174                .get(*idx)
175                .context(InvalidRequestSnafu {
176                    region_id: metadata.region_id,
177                    reason: format!("projection index {} is out of bound", idx),
178                })?;
179
180            column_ids.push(column.column_id);
181            // Safety: idx is valid.
182            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
183        }
184
185        let codec = build_primary_key_codec(metadata);
186        if is_empty_projection {
187            // If projection is empty, we don't output any column.
188            return Ok(PrimaryKeyProjectionMapper {
189                metadata: metadata.clone(),
190                batch_indices: vec![],
191                has_tags: false,
192                codec,
193                output_schema: Arc::new(Schema::new(vec![])),
194                column_ids,
195                batch_fields: vec![],
196                is_empty_projection,
197            });
198        }
199
200        // Safety: Columns come from existing schema.
201        let output_schema = Arc::new(Schema::new(column_schemas));
202        // Get fields in each batch.
203        let batch_fields = Batch::projected_fields(metadata, &column_ids);
204
205        // Field column id to its index in batch.
206        let field_id_to_index: HashMap<_, _> = batch_fields
207            .iter()
208            .enumerate()
209            .map(|(index, (column_id, _))| (*column_id, index))
210            .collect();
211        // For each projected column, compute its index in batches.
212        let mut batch_indices = Vec::with_capacity(projection.len());
213        let mut has_tags = false;
214        for idx in &projection {
215            // Safety: idx is valid.
216            let column = &metadata.column_metadatas[*idx];
217            // Get column index in a batch by its semantic type and column id.
218            let batch_index = match column.semantic_type {
219                SemanticType::Tag => {
220                    // Safety: It is a primary key column.
221                    let index = metadata.primary_key_index(column.column_id).unwrap();
222                    // We need to output a tag.
223                    has_tags = true;
224                    // We always read all primary key so the column always exists and the tag
225                    // index is always valid.
226                    BatchIndex::Tag((index, column.column_id))
227                }
228                SemanticType::Timestamp => BatchIndex::Timestamp,
229                SemanticType::Field => {
230                    // Safety: It is a field column so it should be in `field_id_to_index`.
231                    let index = field_id_to_index[&column.column_id];
232                    BatchIndex::Field(index)
233                }
234            };
235            batch_indices.push(batch_index);
236        }
237
238        Ok(PrimaryKeyProjectionMapper {
239            metadata: metadata.clone(),
240            batch_indices,
241            has_tags,
242            codec,
243            output_schema,
244            column_ids,
245            batch_fields,
246            is_empty_projection,
247        })
248    }
249
250    /// Returns a new mapper without projection.
251    pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
252        PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
253    }
254
255    /// Returns the metadata that created the mapper.
256    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
257        &self.metadata
258    }
259
260    /// Returns ids of projected columns that we need to read
261    /// from memtables and SSTs.
262    pub(crate) fn column_ids(&self) -> &[ColumnId] {
263        &self.column_ids
264    }
265
266    /// Returns ids of fields in [Batch]es the mapper expects to convert.
267    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
268        &self.batch_fields
269    }
270
271    /// Returns the schema of converted [RecordBatch].
272    /// This is the schema that the stream will output. This schema may contain
273    /// less columns than [PrimaryKeyProjectionMapper::column_ids()].
274    pub(crate) fn output_schema(&self) -> SchemaRef {
275        self.output_schema.clone()
276    }
277
278    /// Returns an empty [RecordBatch].
279    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
280        RecordBatch::new_empty(self.output_schema.clone())
281    }
282
283    /// Converts a [Batch] to a [RecordBatch].
284    ///
285    /// The batch must match the `projection` using to build the mapper.
286    pub(crate) fn convert(
287        &self,
288        batch: &Batch,
289        cache_strategy: &CacheStrategy,
290    ) -> common_recordbatch::error::Result<RecordBatch> {
291        if self.is_empty_projection {
292            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
293        }
294
295        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
296        debug_assert!(
297            self.batch_fields
298                .iter()
299                .zip(batch.fields())
300                .all(|((id, _), batch_col)| *id == batch_col.column_id)
301        );
302
303        // Skips decoding pk if we don't need to output it.
304        let pk_values = if self.has_tags {
305            match batch.pk_values() {
306                Some(v) => v.clone(),
307                None => self
308                    .codec
309                    .decode(batch.primary_key())
310                    .map_err(BoxedError::new)
311                    .context(ExternalSnafu)?,
312            }
313        } else {
314            CompositeValues::Dense(vec![])
315        };
316
317        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
318        let num_rows = batch.num_rows();
319        for (index, column_schema) in self
320            .batch_indices
321            .iter()
322            .zip(self.output_schema.column_schemas())
323        {
324            match index {
325                BatchIndex::Tag((idx, column_id)) => {
326                    let value = match &pk_values {
327                        CompositeValues::Dense(v) => &v[*idx].1,
328                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
329                    };
330                    let vector = repeated_vector_with_cache(
331                        &column_schema.data_type,
332                        value,
333                        num_rows,
334                        cache_strategy,
335                    )?;
336                    columns.push(vector);
337                }
338                BatchIndex::Timestamp => {
339                    columns.push(batch.timestamps().clone());
340                }
341                BatchIndex::Field(idx) => {
342                    columns.push(batch.fields()[*idx].data.clone());
343                }
344            }
345        }
346
347        RecordBatch::new(self.output_schema.clone(), columns)
348    }
349}
350
351/// Index of a vector in a [Batch].
352#[derive(Debug, Clone, Copy)]
353enum BatchIndex {
354    /// Index in primary keys.
355    Tag((usize, ColumnId)),
356    /// The time index column.
357    Timestamp,
358    /// Index in fields.
359    Field(usize),
360}
361
362/// Gets a vector with repeated values from specific cache or creates a new one.
363fn repeated_vector_with_cache(
364    data_type: &ConcreteDataType,
365    value: &Value,
366    num_rows: usize,
367    cache_strategy: &CacheStrategy,
368) -> common_recordbatch::error::Result<VectorRef> {
369    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
370        // Tries to get the vector from cache manager. If the vector doesn't
371        // have enough length, creates a new one.
372        match vector.len().cmp(&num_rows) {
373            Ordering::Less => (),
374            Ordering::Equal => return Ok(vector),
375            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
376        }
377    }
378
379    // Creates a new one.
380    let vector = new_repeated_vector(data_type, value, num_rows)?;
381    // Updates cache.
382    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
383        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
384    }
385
386    Ok(vector)
387}
388
389/// Returns a vector with repeated values.
390fn new_repeated_vector(
391    data_type: &ConcreteDataType,
392    value: &Value,
393    num_rows: usize,
394) -> common_recordbatch::error::Result<VectorRef> {
395    let mut mutable_vector = data_type.create_mutable_vector(1);
396    mutable_vector
397        .try_push_value_ref(&value.as_value_ref())
398        .map_err(BoxedError::new)
399        .context(ExternalSnafu)?;
400    // This requires an additional allocation.
401    let base_vector = mutable_vector.to_vector();
402    Ok(base_vector.replicate(&[num_rows]))
403}
404
405#[cfg(test)]
406mod tests {
407    use std::sync::Arc;
408
409    use api::v1::OpType;
410    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
411    use datatypes::arrow::datatypes::Field;
412    use datatypes::arrow::util::pretty;
413    use datatypes::value::ValueRef;
414    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
415    use mito_codec::test_util::TestRegionMetadataBuilder;
416    use store_api::storage::consts::{
417        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
418    };
419
420    use super::*;
421    use crate::cache::CacheManager;
422    use crate::read::BatchBuilder;
423
424    fn new_batch(
425        ts_start: i64,
426        tags: &[i64],
427        fields: &[(ColumnId, i64)],
428        num_rows: usize,
429    ) -> Batch {
430        let converter = DensePrimaryKeyCodec::with_fields(
431            (0..tags.len())
432                .map(|idx| {
433                    (
434                        idx as u32,
435                        SortField::new(ConcreteDataType::int64_datatype()),
436                    )
437                })
438                .collect(),
439        );
440        let primary_key = converter
441            .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
442            .unwrap();
443
444        let mut builder = BatchBuilder::new(primary_key);
445        builder
446            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
447                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
448            )))
449            .unwrap()
450            .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
451            .unwrap()
452            .op_types_array(Arc::new(UInt8Array::from_iter_values(
453                (0..num_rows).map(|_| OpType::Put as u8),
454            )))
455            .unwrap();
456        for (column_id, field) in fields {
457            builder
458                .push_field_array(
459                    *column_id,
460                    Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
461                        *field, num_rows,
462                    ))),
463                )
464                .unwrap();
465        }
466        builder.build().unwrap()
467    }
468
469    fn print_record_batch(record_batch: RecordBatch) -> String {
470        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
471            .unwrap()
472            .to_string()
473    }
474
475    #[test]
476    fn test_projection_mapper_all() {
477        let metadata = Arc::new(
478            TestRegionMetadataBuilder::default()
479                .num_tags(2)
480                .num_fields(2)
481                .build(),
482        );
483        // Create the enum wrapper with default format (primary key)
484        let mapper = ProjectionMapper::all(&metadata, false).unwrap();
485        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
486        assert_eq!(
487            [
488                (3, ConcreteDataType::int64_datatype()),
489                (4, ConcreteDataType::int64_datatype())
490            ],
491            mapper.as_primary_key().unwrap().batch_fields()
492        );
493
494        // With vector cache.
495        let cache = CacheManager::builder().vector_cache_size(1024).build();
496        let cache = CacheStrategy::EnableAll(Arc::new(cache));
497        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
498        let record_batch = mapper
499            .as_primary_key()
500            .unwrap()
501            .convert(&batch, &cache)
502            .unwrap();
503        let expect = "\
504+---------------------+----+----+----+----+
505| ts                  | k0 | k1 | v0 | v1 |
506+---------------------+----+----+----+----+
507| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
508| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
509| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
510+---------------------+----+----+----+----+";
511        assert_eq!(expect, print_record_batch(record_batch));
512
513        assert!(
514            cache
515                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
516                .is_some()
517        );
518        assert!(
519            cache
520                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
521                .is_some()
522        );
523        assert!(
524            cache
525                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
526                .is_none()
527        );
528        let record_batch = mapper
529            .as_primary_key()
530            .unwrap()
531            .convert(&batch, &cache)
532            .unwrap();
533        assert_eq!(expect, print_record_batch(record_batch));
534    }
535
536    #[test]
537    fn test_projection_mapper_with_projection() {
538        let metadata = Arc::new(
539            TestRegionMetadataBuilder::default()
540                .num_tags(2)
541                .num_fields(2)
542                .build(),
543        );
544        // Columns v1, k0
545        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
546        assert_eq!([4, 1], mapper.column_ids());
547        assert_eq!(
548            [(4, ConcreteDataType::int64_datatype())],
549            mapper.as_primary_key().unwrap().batch_fields()
550        );
551
552        let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
553        let cache = CacheManager::builder().vector_cache_size(1024).build();
554        let cache = CacheStrategy::EnableAll(Arc::new(cache));
555        let record_batch = mapper
556            .as_primary_key()
557            .unwrap()
558            .convert(&batch, &cache)
559            .unwrap();
560        let expect = "\
561+----+----+
562| v1 | k0 |
563+----+----+
564| 4  | 1  |
565| 4  | 1  |
566| 4  | 1  |
567+----+----+";
568        assert_eq!(expect, print_record_batch(record_batch));
569    }
570
571    #[test]
572    fn test_projection_mapper_empty_projection() {
573        let metadata = Arc::new(
574            TestRegionMetadataBuilder::default()
575                .num_tags(2)
576                .num_fields(2)
577                .build(),
578        );
579        // Empty projection
580        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
581        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
582        assert!(mapper.output_schema().is_empty());
583        let pk_mapper = mapper.as_primary_key().unwrap();
584        assert!(pk_mapper.batch_fields().is_empty());
585        assert!(!pk_mapper.has_tags);
586        assert!(pk_mapper.batch_indices.is_empty());
587        assert!(pk_mapper.is_empty_projection);
588
589        let batch = new_batch(0, &[1, 2], &[], 3);
590        let cache = CacheManager::builder().vector_cache_size(1024).build();
591        let cache = CacheStrategy::EnableAll(Arc::new(cache));
592        let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
593        assert_eq!(3, record_batch.num_rows());
594        assert_eq!(0, record_batch.num_columns());
595        assert!(record_batch.schema.is_empty());
596    }
597
598    fn new_flat_batch(
599        ts_start: Option<i64>,
600        idx_tags: &[(usize, i64)],
601        idx_fields: &[(usize, i64)],
602        num_rows: usize,
603    ) -> datatypes::arrow::record_batch::RecordBatch {
604        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
605        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
606
607        // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
608
609        // Primary key columns first
610        for (i, tag) in idx_tags {
611            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
612                *tag, num_rows,
613            ))) as _;
614            columns.push(array);
615            fields.push(Field::new(
616                format!("k{i}"),
617                datatypes::arrow::datatypes::DataType::Int64,
618                true,
619            ));
620        }
621
622        // Field columns
623        for (i, field) in idx_fields {
624            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
625                *field, num_rows,
626            ))) as _;
627            columns.push(array);
628            fields.push(Field::new(
629                format!("v{i}"),
630                datatypes::arrow::datatypes::DataType::Int64,
631                true,
632            ));
633        }
634
635        // Time index
636        if let Some(ts_start) = ts_start {
637            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
638                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
639            )) as _;
640            columns.push(timestamps);
641            fields.push(Field::new(
642                "ts",
643                datatypes::arrow::datatypes::DataType::Timestamp(
644                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
645                    None,
646                ),
647                true,
648            ));
649        }
650
651        // __primary_key column (encoded primary key as dictionary)
652        // Create encoded primary key
653        let converter = DensePrimaryKeyCodec::with_fields(
654            (0..idx_tags.len())
655                .map(|idx| {
656                    (
657                        idx as u32,
658                        SortField::new(ConcreteDataType::int64_datatype()),
659                    )
660                })
661                .collect(),
662        );
663        let encoded_pk = converter
664            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
665            .unwrap();
666
667        // Create dictionary array for the encoded primary key
668        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
669        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
670        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
671        let pk_array =
672            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
673        columns.push(pk_array);
674        fields.push(Field::new_dictionary(
675            PRIMARY_KEY_COLUMN_NAME,
676            datatypes::arrow::datatypes::DataType::UInt32,
677            datatypes::arrow::datatypes::DataType::Binary,
678            false,
679        ));
680
681        // __sequence column
682        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
683        fields.push(Field::new(
684            SEQUENCE_COLUMN_NAME,
685            datatypes::arrow::datatypes::DataType::UInt64,
686            false,
687        ));
688
689        // __op_type column
690        columns.push(Arc::new(UInt8Array::from_iter_values(
691            (0..num_rows).map(|_| OpType::Put as u8),
692        )) as _);
693        fields.push(Field::new(
694            OP_TYPE_COLUMN_NAME,
695            datatypes::arrow::datatypes::DataType::UInt8,
696            false,
697        ));
698
699        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
700
701        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
702    }
703
704    #[test]
705    fn test_flat_projection_mapper_all() {
706        let metadata = Arc::new(
707            TestRegionMetadataBuilder::default()
708                .num_tags(2)
709                .num_fields(2)
710                .build(),
711        );
712        let mapper = ProjectionMapper::all(&metadata, true).unwrap();
713        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
714        assert_eq!(
715            [
716                (1, ConcreteDataType::int64_datatype()),
717                (2, ConcreteDataType::int64_datatype()),
718                (3, ConcreteDataType::int64_datatype()),
719                (4, ConcreteDataType::int64_datatype()),
720                (0, ConcreteDataType::timestamp_millisecond_datatype())
721            ],
722            mapper.as_flat().unwrap().batch_schema()
723        );
724
725        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
726        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
727        let expect = "\
728+---------------------+----+----+----+----+
729| ts                  | k0 | k1 | v0 | v1 |
730+---------------------+----+----+----+----+
731| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
732| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
733| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
734+---------------------+----+----+----+----+";
735        assert_eq!(expect, print_record_batch(record_batch));
736    }
737
738    #[test]
739    fn test_flat_projection_mapper_with_projection() {
740        let metadata = Arc::new(
741            TestRegionMetadataBuilder::default()
742                .num_tags(2)
743                .num_fields(2)
744                .build(),
745        );
746        // Columns v1, k0
747        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
748        assert_eq!([4, 1], mapper.column_ids());
749        assert_eq!(
750            [
751                (1, ConcreteDataType::int64_datatype()),
752                (4, ConcreteDataType::int64_datatype()),
753                (0, ConcreteDataType::timestamp_millisecond_datatype())
754            ],
755            mapper.as_flat().unwrap().batch_schema()
756        );
757
758        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
759        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
760        let expect = "\
761+----+----+
762| v1 | k0 |
763+----+----+
764| 4  | 1  |
765| 4  | 1  |
766| 4  | 1  |
767+----+----+";
768        assert_eq!(expect, print_record_batch(record_batch));
769    }
770
771    #[test]
772    fn test_flat_projection_mapper_empty_projection() {
773        let metadata = Arc::new(
774            TestRegionMetadataBuilder::default()
775                .num_tags(2)
776                .num_fields(2)
777                .build(),
778        );
779        // Empty projection
780        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
781        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
782        assert!(mapper.output_schema().is_empty());
783        let flat_mapper = mapper.as_flat().unwrap();
784        assert!(flat_mapper.batch_schema().is_empty());
785
786        let batch = new_flat_batch(Some(0), &[], &[], 3);
787        let record_batch = flat_mapper.convert(&batch).unwrap();
788        assert_eq!(3, record_batch.num_rows());
789        assert_eq!(0, record_batch.num_columns());
790        assert!(record_batch.schema.is_empty());
791    }
792}