mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection operations.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39/// Only cache vector when its length `<=` this value.
40const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42/// Wrapper enum for different projection mapper implementations.
43pub enum ProjectionMapper {
44    /// Projection mapper for primary key format.
45    PrimaryKey(PrimaryKeyProjectionMapper),
46    /// Projection mapper for flat format.
47    Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51    /// Returns a new mapper with projection.
52    pub fn new(
53        metadata: &RegionMetadataRef,
54        projection: impl Iterator<Item = usize> + Clone,
55        flat_format: bool,
56    ) -> Result<Self> {
57        if flat_format {
58            Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59                metadata, projection,
60            )?))
61        } else {
62            Ok(ProjectionMapper::PrimaryKey(
63                PrimaryKeyProjectionMapper::new(metadata, projection)?,
64            ))
65        }
66    }
67
68    /// Returns a new mapper with output projection and explicit read columns.
69    pub fn new_with_read_columns(
70        metadata: &RegionMetadataRef,
71        projection: impl Iterator<Item = usize>,
72        flat_format: bool,
73        read_column_ids: Vec<ColumnId>,
74    ) -> Result<Self> {
75        let projection: Vec<_> = projection.collect();
76        if flat_format {
77            Ok(ProjectionMapper::Flat(
78                FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
79            ))
80        } else {
81            Ok(ProjectionMapper::PrimaryKey(
82                PrimaryKeyProjectionMapper::new_with_read_columns(
83                    metadata,
84                    projection,
85                    read_column_ids,
86                )?,
87            ))
88        }
89    }
90
91    /// Returns a new mapper without projection.
92    pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
93        if flat_format {
94            Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
95        } else {
96            Ok(ProjectionMapper::PrimaryKey(
97                PrimaryKeyProjectionMapper::all(metadata)?,
98            ))
99        }
100    }
101
102    /// Returns the metadata that created the mapper.
103    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
104        match self {
105            ProjectionMapper::PrimaryKey(m) => m.metadata(),
106            ProjectionMapper::Flat(m) => m.metadata(),
107        }
108    }
109
110    /// Returns true if the projection includes any tag columns.
111    pub(crate) fn has_tags(&self) -> bool {
112        match self {
113            ProjectionMapper::PrimaryKey(m) => m.has_tags(),
114            ProjectionMapper::Flat(_) => false,
115        }
116    }
117
118    /// Returns ids of projected columns that we need to read
119    /// from memtables and SSTs.
120    pub(crate) fn column_ids(&self) -> &[ColumnId] {
121        match self {
122            ProjectionMapper::PrimaryKey(m) => m.column_ids(),
123            ProjectionMapper::Flat(m) => m.column_ids(),
124        }
125    }
126
127    /// Returns the schema of converted [RecordBatch].
128    pub(crate) fn output_schema(&self) -> SchemaRef {
129        match self {
130            ProjectionMapper::PrimaryKey(m) => m.output_schema(),
131            ProjectionMapper::Flat(m) => m.output_schema(),
132        }
133    }
134
135    /// Returns the primary key projection mapper or None if this is not a primary key mapper.
136    pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
137        match self {
138            ProjectionMapper::PrimaryKey(m) => Some(m),
139            ProjectionMapper::Flat(_) => None,
140        }
141    }
142
143    /// Returns the flat projection mapper or None if this is not a flat mapper.
144    pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
145        match self {
146            ProjectionMapper::PrimaryKey(_) => None,
147            ProjectionMapper::Flat(m) => Some(m),
148        }
149    }
150
151    /// Returns an empty [RecordBatch].
152    // TODO(yingwen): This is unused now. Use it after we finishing the flat format.
153    pub fn empty_record_batch(&self) -> RecordBatch {
154        match self {
155            ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
156            ProjectionMapper::Flat(m) => m.empty_record_batch(),
157        }
158    }
159}
160
161/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
162pub struct PrimaryKeyProjectionMapper {
163    /// Metadata of the region.
164    metadata: RegionMetadataRef,
165    /// Maps column in [RecordBatch] to index in [Batch].
166    batch_indices: Vec<BatchIndex>,
167    /// Output record batch contains tags.
168    has_tags: bool,
169    /// Decoder for primary key.
170    codec: Arc<dyn PrimaryKeyCodec>,
171    /// Schema for converted [RecordBatch].
172    output_schema: SchemaRef,
173    /// Ids of columns to read from memtables and SSTs.
174    read_column_ids: Vec<ColumnId>,
175    /// Ids and DataTypes of field columns in the read [Batch].
176    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
177    /// `true` If the original projection is empty.
178    is_empty_projection: bool,
179}
180
181impl PrimaryKeyProjectionMapper {
182    /// Returns a new mapper with projection.
183    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
184    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
185    /// empty `RecordBatch` and only use its row count in this query.
186    pub fn new(
187        metadata: &RegionMetadataRef,
188        projection: impl Iterator<Item = usize>,
189    ) -> Result<PrimaryKeyProjectionMapper> {
190        let projection: Vec<_> = projection.collect();
191        let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
192        Self::new_with_read_columns(metadata, projection, read_column_ids)
193    }
194
195    /// Returns a new mapper with output projection and explicit read columns.
196    pub fn new_with_read_columns(
197        metadata: &RegionMetadataRef,
198        projection: Vec<usize>,
199        read_column_ids: Vec<ColumnId>,
200    ) -> Result<PrimaryKeyProjectionMapper> {
201        // If the original projection is empty.
202        let is_empty_projection = projection.is_empty();
203
204        let mut column_schemas = Vec::with_capacity(projection.len());
205        for idx in &projection {
206            // For each projection index, we get the column schema for projection
207            column_schemas.push(
208                metadata
209                    .schema
210                    .column_schemas()
211                    .get(*idx)
212                    .with_context(|| InvalidRequestSnafu {
213                        region_id: metadata.region_id,
214                        reason: format!("projection index {} is out of bound", idx),
215                    })?
216                    .clone(),
217            );
218        }
219
220        let codec = build_primary_key_codec(metadata);
221        // If projection is empty, we don't output any column.
222        let output_schema = if is_empty_projection {
223            Arc::new(Schema::new(vec![]))
224        } else {
225            // Safety: Columns come from existing schema.
226            Arc::new(Schema::new(column_schemas))
227        };
228        // Get fields in each read batch.
229        let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
230
231        // Field column id to its index in batch.
232        let field_id_to_index: HashMap<_, _> = batch_fields
233            .iter()
234            .enumerate()
235            .map(|(index, (column_id, _))| (*column_id, index))
236            .collect();
237        // For each projected column, compute its index in batches.
238        let mut batch_indices = Vec::with_capacity(projection.len());
239        let mut has_tags = false;
240        if !is_empty_projection {
241            for idx in &projection {
242                // Safety: idx is valid.
243                let column = &metadata.column_metadatas[*idx];
244                // Get column index in a batch by its semantic type and column id.
245                let batch_index = match column.semantic_type {
246                    SemanticType::Tag => {
247                        // Safety: It is a primary key column.
248                        let index = metadata.primary_key_index(column.column_id).unwrap();
249                        // We need to output a tag.
250                        has_tags = true;
251                        // We always read all primary key so the column always exists and the tag
252                        // index is always valid.
253                        BatchIndex::Tag((index, column.column_id))
254                    }
255                    SemanticType::Timestamp => BatchIndex::Timestamp,
256                    SemanticType::Field => {
257                        let index = *field_id_to_index.get(&column.column_id).context(
258                            InvalidRequestSnafu {
259                                region_id: metadata.region_id,
260                                reason: format!(
261                                    "field column {} is missing in read projection",
262                                    column.column_schema.name
263                                ),
264                            },
265                        )?;
266                        BatchIndex::Field(index)
267                    }
268                };
269                batch_indices.push(batch_index);
270            }
271        }
272
273        Ok(PrimaryKeyProjectionMapper {
274            metadata: metadata.clone(),
275            batch_indices,
276            has_tags,
277            codec,
278            output_schema,
279            read_column_ids,
280            batch_fields,
281            is_empty_projection,
282        })
283    }
284
285    /// Returns a new mapper without projection.
286    pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
287        PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
288    }
289
290    /// Returns the metadata that created the mapper.
291    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
292        &self.metadata
293    }
294
295    /// Returns true if the projection includes any tag columns.
296    pub(crate) fn has_tags(&self) -> bool {
297        self.has_tags
298    }
299
300    /// Returns ids of projected columns that we need to read
301    /// from memtables and SSTs.
302    pub(crate) fn column_ids(&self) -> &[ColumnId] {
303        &self.read_column_ids
304    }
305
306    /// Returns ids of fields in [Batch]es the mapper expects to convert.
307    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
308        &self.batch_fields
309    }
310
311    /// Returns the schema of converted [RecordBatch].
312    /// This is the schema that the stream will output. This schema may contain
313    /// less columns than [PrimaryKeyProjectionMapper::column_ids()].
314    pub(crate) fn output_schema(&self) -> SchemaRef {
315        self.output_schema.clone()
316    }
317
318    /// Returns an empty [RecordBatch].
319    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
320        RecordBatch::new_empty(self.output_schema.clone())
321    }
322
323    /// Converts a [Batch] to a [RecordBatch].
324    ///
325    /// The batch must match the `projection` using to build the mapper.
326    pub(crate) fn convert(
327        &self,
328        batch: &Batch,
329        cache_strategy: &CacheStrategy,
330    ) -> common_recordbatch::error::Result<RecordBatch> {
331        if self.is_empty_projection {
332            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
333        }
334
335        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
336        debug_assert!(
337            self.batch_fields
338                .iter()
339                .zip(batch.fields())
340                .all(|((id, _), batch_col)| *id == batch_col.column_id)
341        );
342
343        // Skips decoding pk if we don't need to output it.
344        let pk_values = if self.has_tags {
345            match batch.pk_values() {
346                Some(v) => v.clone(),
347                None => self
348                    .codec
349                    .decode(batch.primary_key())
350                    .map_err(BoxedError::new)
351                    .context(ExternalSnafu)?,
352            }
353        } else {
354            CompositeValues::Dense(vec![])
355        };
356
357        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
358        let num_rows = batch.num_rows();
359        for (index, column_schema) in self
360            .batch_indices
361            .iter()
362            .zip(self.output_schema.column_schemas())
363        {
364            match index {
365                BatchIndex::Tag((idx, column_id)) => {
366                    let value = match &pk_values {
367                        CompositeValues::Dense(v) => &v[*idx].1,
368                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
369                    };
370                    let vector = repeated_vector_with_cache(
371                        &column_schema.data_type,
372                        value,
373                        num_rows,
374                        cache_strategy,
375                    )?;
376                    columns.push(vector);
377                }
378                BatchIndex::Timestamp => {
379                    columns.push(batch.timestamps().clone());
380                }
381                BatchIndex::Field(idx) => {
382                    columns.push(batch.fields()[*idx].data.clone());
383                }
384            }
385        }
386
387        RecordBatch::new(self.output_schema.clone(), columns)
388    }
389}
390
391pub(crate) fn read_column_ids_from_projection(
392    metadata: &RegionMetadataRef,
393    projection: &[usize],
394) -> Result<Vec<ColumnId>> {
395    let mut column_ids = Vec::with_capacity(projection.len().max(1));
396    if projection.is_empty() {
397        column_ids.push(metadata.time_index_column().column_id);
398        return Ok(column_ids);
399    }
400
401    for idx in projection {
402        let column = metadata
403            .column_metadatas
404            .get(*idx)
405            .with_context(|| InvalidRequestSnafu {
406                region_id: metadata.region_id,
407                reason: format!("projection index {} is out of bound", idx),
408            })?;
409        column_ids.push(column.column_id);
410    }
411    Ok(column_ids)
412}
413
414/// Index of a vector in a [Batch].
415#[derive(Debug, Clone, Copy)]
416enum BatchIndex {
417    /// Index in primary keys.
418    Tag((usize, ColumnId)),
419    /// The time index column.
420    Timestamp,
421    /// Index in fields.
422    Field(usize),
423}
424
425/// Gets a vector with repeated values from specific cache or creates a new one.
426fn repeated_vector_with_cache(
427    data_type: &ConcreteDataType,
428    value: &Value,
429    num_rows: usize,
430    cache_strategy: &CacheStrategy,
431) -> common_recordbatch::error::Result<VectorRef> {
432    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
433        // Tries to get the vector from cache manager. If the vector doesn't
434        // have enough length, creates a new one.
435        match vector.len().cmp(&num_rows) {
436            Ordering::Less => (),
437            Ordering::Equal => return Ok(vector),
438            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
439        }
440    }
441
442    // Creates a new one.
443    let vector = new_repeated_vector(data_type, value, num_rows)?;
444    // Updates cache.
445    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
446        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
447    }
448
449    Ok(vector)
450}
451
452/// Returns a vector with repeated values.
453fn new_repeated_vector(
454    data_type: &ConcreteDataType,
455    value: &Value,
456    num_rows: usize,
457) -> common_recordbatch::error::Result<VectorRef> {
458    let mut mutable_vector = data_type.create_mutable_vector(1);
459    mutable_vector
460        .try_push_value_ref(&value.as_value_ref())
461        .map_err(BoxedError::new)
462        .context(ExternalSnafu)?;
463    // This requires an additional allocation.
464    let base_vector = mutable_vector.to_vector();
465    Ok(base_vector.replicate(&[num_rows]))
466}
467
468#[cfg(test)]
469mod tests {
470    use std::sync::Arc;
471
472    use api::v1::OpType;
473    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
474    use datatypes::arrow::datatypes::Field;
475    use datatypes::arrow::util::pretty;
476    use datatypes::value::ValueRef;
477    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
478    use mito_codec::test_util::TestRegionMetadataBuilder;
479    use store_api::storage::consts::{
480        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
481    };
482
483    use super::*;
484    use crate::cache::CacheManager;
485    use crate::read::BatchBuilder;
486
487    fn new_batch(
488        ts_start: i64,
489        tags: &[i64],
490        fields: &[(ColumnId, i64)],
491        num_rows: usize,
492    ) -> Batch {
493        let converter = DensePrimaryKeyCodec::with_fields(
494            (0..tags.len())
495                .map(|idx| {
496                    (
497                        idx as u32,
498                        SortField::new(ConcreteDataType::int64_datatype()),
499                    )
500                })
501                .collect(),
502        );
503        let primary_key = converter
504            .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
505            .unwrap();
506
507        let mut builder = BatchBuilder::new(primary_key);
508        builder
509            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
510                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
511            )))
512            .unwrap()
513            .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
514            .unwrap()
515            .op_types_array(Arc::new(UInt8Array::from_iter_values(
516                (0..num_rows).map(|_| OpType::Put as u8),
517            )))
518            .unwrap();
519        for (column_id, field) in fields {
520            builder
521                .push_field_array(
522                    *column_id,
523                    Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
524                        *field, num_rows,
525                    ))),
526                )
527                .unwrap();
528        }
529        builder.build().unwrap()
530    }
531
532    fn print_record_batch(record_batch: RecordBatch) -> String {
533        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
534            .unwrap()
535            .to_string()
536    }
537
538    #[test]
539    fn test_projection_mapper_all() {
540        let metadata = Arc::new(
541            TestRegionMetadataBuilder::default()
542                .num_tags(2)
543                .num_fields(2)
544                .build(),
545        );
546        // Create the enum wrapper with default format (primary key)
547        let mapper = ProjectionMapper::all(&metadata, false).unwrap();
548        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
549        assert_eq!(
550            [
551                (3, ConcreteDataType::int64_datatype()),
552                (4, ConcreteDataType::int64_datatype())
553            ],
554            mapper.as_primary_key().unwrap().batch_fields()
555        );
556
557        // With vector cache.
558        let cache = CacheManager::builder().vector_cache_size(1024).build();
559        let cache = CacheStrategy::EnableAll(Arc::new(cache));
560        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
561        let record_batch = mapper
562            .as_primary_key()
563            .unwrap()
564            .convert(&batch, &cache)
565            .unwrap();
566        let expect = "\
567+---------------------+----+----+----+----+
568| ts                  | k0 | k1 | v0 | v1 |
569+---------------------+----+----+----+----+
570| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
571| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
572| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
573+---------------------+----+----+----+----+";
574        assert_eq!(expect, print_record_batch(record_batch));
575
576        assert!(
577            cache
578                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
579                .is_some()
580        );
581        assert!(
582            cache
583                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
584                .is_some()
585        );
586        assert!(
587            cache
588                .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
589                .is_none()
590        );
591        let record_batch = mapper
592            .as_primary_key()
593            .unwrap()
594            .convert(&batch, &cache)
595            .unwrap();
596        assert_eq!(expect, print_record_batch(record_batch));
597    }
598
599    #[test]
600    fn test_projection_mapper_with_projection() {
601        let metadata = Arc::new(
602            TestRegionMetadataBuilder::default()
603                .num_tags(2)
604                .num_fields(2)
605                .build(),
606        );
607        // Columns v1, k0
608        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
609        assert_eq!([4, 1], mapper.column_ids());
610        assert_eq!(
611            [(4, ConcreteDataType::int64_datatype())],
612            mapper.as_primary_key().unwrap().batch_fields()
613        );
614
615        let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
616        let cache = CacheManager::builder().vector_cache_size(1024).build();
617        let cache = CacheStrategy::EnableAll(Arc::new(cache));
618        let record_batch = mapper
619            .as_primary_key()
620            .unwrap()
621            .convert(&batch, &cache)
622            .unwrap();
623        let expect = "\
624+----+----+
625| v1 | k0 |
626+----+----+
627| 4  | 1  |
628| 4  | 1  |
629| 4  | 1  |
630+----+----+";
631        assert_eq!(expect, print_record_batch(record_batch));
632    }
633
634    #[test]
635    fn test_projection_mapper_read_superset() {
636        let metadata = Arc::new(
637            TestRegionMetadataBuilder::default()
638                .num_tags(2)
639                .num_fields(2)
640                .build(),
641        );
642        // Output columns v1, k0. Read also includes v0.
643        let mapper = ProjectionMapper::new_with_read_columns(
644            &metadata,
645            [4, 1].into_iter(),
646            false,
647            vec![4, 1, 3],
648        )
649        .unwrap();
650        assert_eq!([4, 1, 3], mapper.column_ids());
651
652        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
653        let cache = CacheManager::builder().vector_cache_size(1024).build();
654        let cache = CacheStrategy::EnableAll(Arc::new(cache));
655        let record_batch = mapper
656            .as_primary_key()
657            .unwrap()
658            .convert(&batch, &cache)
659            .unwrap();
660        let expect = "\
661+----+----+
662| v1 | k0 |
663+----+----+
664| 4  | 1  |
665| 4  | 1  |
666| 4  | 1  |
667+----+----+";
668        assert_eq!(expect, print_record_batch(record_batch));
669    }
670
671    #[test]
672    fn test_projection_mapper_empty_projection() {
673        let metadata = Arc::new(
674            TestRegionMetadataBuilder::default()
675                .num_tags(2)
676                .num_fields(2)
677                .build(),
678        );
679        // Empty projection
680        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
681        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
682        assert!(mapper.output_schema().is_empty());
683        let pk_mapper = mapper.as_primary_key().unwrap();
684        assert!(pk_mapper.batch_fields().is_empty());
685        assert!(!pk_mapper.has_tags);
686        assert!(pk_mapper.batch_indices.is_empty());
687        assert!(pk_mapper.is_empty_projection);
688
689        let batch = new_batch(0, &[1, 2], &[], 3);
690        let cache = CacheManager::builder().vector_cache_size(1024).build();
691        let cache = CacheStrategy::EnableAll(Arc::new(cache));
692        let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
693        assert_eq!(3, record_batch.num_rows());
694        assert_eq!(0, record_batch.num_columns());
695        assert!(record_batch.schema.is_empty());
696    }
697
698    fn new_flat_batch(
699        ts_start: Option<i64>,
700        idx_tags: &[(usize, i64)],
701        idx_fields: &[(usize, i64)],
702        num_rows: usize,
703    ) -> datatypes::arrow::record_batch::RecordBatch {
704        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
705        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
706
707        // Flat format: primary key columns, field columns, time index, __primary_key, __sequence, __op_type
708
709        // Primary key columns first
710        for (i, tag) in idx_tags {
711            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
712                *tag, num_rows,
713            ))) as _;
714            columns.push(array);
715            fields.push(Field::new(
716                format!("k{i}"),
717                datatypes::arrow::datatypes::DataType::Int64,
718                true,
719            ));
720        }
721
722        // Field columns
723        for (i, field) in idx_fields {
724            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
725                *field, num_rows,
726            ))) as _;
727            columns.push(array);
728            fields.push(Field::new(
729                format!("v{i}"),
730                datatypes::arrow::datatypes::DataType::Int64,
731                true,
732            ));
733        }
734
735        // Time index
736        if let Some(ts_start) = ts_start {
737            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
738                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
739            )) as _;
740            columns.push(timestamps);
741            fields.push(Field::new(
742                "ts",
743                datatypes::arrow::datatypes::DataType::Timestamp(
744                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
745                    None,
746                ),
747                true,
748            ));
749        }
750
751        // __primary_key column (encoded primary key as dictionary)
752        // Create encoded primary key
753        let converter = DensePrimaryKeyCodec::with_fields(
754            (0..idx_tags.len())
755                .map(|idx| {
756                    (
757                        idx as u32,
758                        SortField::new(ConcreteDataType::int64_datatype()),
759                    )
760                })
761                .collect(),
762        );
763        let encoded_pk = converter
764            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
765            .unwrap();
766
767        // Create dictionary array for the encoded primary key
768        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
769        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
770        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
771        let pk_array =
772            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
773        columns.push(pk_array);
774        fields.push(Field::new_dictionary(
775            PRIMARY_KEY_COLUMN_NAME,
776            datatypes::arrow::datatypes::DataType::UInt32,
777            datatypes::arrow::datatypes::DataType::Binary,
778            false,
779        ));
780
781        // __sequence column
782        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
783        fields.push(Field::new(
784            SEQUENCE_COLUMN_NAME,
785            datatypes::arrow::datatypes::DataType::UInt64,
786            false,
787        ));
788
789        // __op_type column
790        columns.push(Arc::new(UInt8Array::from_iter_values(
791            (0..num_rows).map(|_| OpType::Put as u8),
792        )) as _);
793        fields.push(Field::new(
794            OP_TYPE_COLUMN_NAME,
795            datatypes::arrow::datatypes::DataType::UInt8,
796            false,
797        ));
798
799        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
800
801        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
802    }
803
804    #[test]
805    fn test_flat_projection_mapper_all() {
806        let metadata = Arc::new(
807            TestRegionMetadataBuilder::default()
808                .num_tags(2)
809                .num_fields(2)
810                .build(),
811        );
812        let mapper = ProjectionMapper::all(&metadata, true).unwrap();
813        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
814        assert_eq!(
815            [
816                (1, ConcreteDataType::int64_datatype()),
817                (2, ConcreteDataType::int64_datatype()),
818                (3, ConcreteDataType::int64_datatype()),
819                (4, ConcreteDataType::int64_datatype()),
820                (0, ConcreteDataType::timestamp_millisecond_datatype())
821            ],
822            mapper.as_flat().unwrap().batch_schema()
823        );
824
825        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
826        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
827        let expect = "\
828+---------------------+----+----+----+----+
829| ts                  | k0 | k1 | v0 | v1 |
830+---------------------+----+----+----+----+
831| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
832| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
833| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
834+---------------------+----+----+----+----+";
835        assert_eq!(expect, print_record_batch(record_batch));
836    }
837
838    #[test]
839    fn test_flat_projection_mapper_with_projection() {
840        let metadata = Arc::new(
841            TestRegionMetadataBuilder::default()
842                .num_tags(2)
843                .num_fields(2)
844                .build(),
845        );
846        // Columns v1, k0
847        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
848        assert_eq!([4, 1], mapper.column_ids());
849        assert_eq!(
850            [
851                (1, ConcreteDataType::int64_datatype()),
852                (4, ConcreteDataType::int64_datatype()),
853                (0, ConcreteDataType::timestamp_millisecond_datatype())
854            ],
855            mapper.as_flat().unwrap().batch_schema()
856        );
857
858        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
859        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
860        let expect = "\
861+----+----+
862| v1 | k0 |
863+----+----+
864| 4  | 1  |
865| 4  | 1  |
866| 4  | 1  |
867+----+----+";
868        assert_eq!(expect, print_record_batch(record_batch));
869    }
870
871    #[test]
872    fn test_flat_projection_mapper_read_superset() {
873        let metadata = Arc::new(
874            TestRegionMetadataBuilder::default()
875                .num_tags(2)
876                .num_fields(2)
877                .build(),
878        );
879        // Output columns v1, k0. Read also includes v0.
880        let mapper = ProjectionMapper::new_with_read_columns(
881            &metadata,
882            [4, 1].into_iter(),
883            true,
884            vec![4, 1, 3],
885        )
886        .unwrap();
887        assert_eq!([4, 1, 3], mapper.column_ids());
888
889        let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
890        let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
891        let expect = "\
892+----+----+
893| v1 | k0 |
894+----+----+
895| 4  | 1  |
896| 4  | 1  |
897| 4  | 1  |
898+----+----+";
899        assert_eq!(expect, print_record_batch(record_batch));
900    }
901
902    #[test]
903    fn test_flat_projection_mapper_empty_projection() {
904        let metadata = Arc::new(
905            TestRegionMetadataBuilder::default()
906                .num_tags(2)
907                .num_fields(2)
908                .build(),
909        );
910        // Empty projection
911        let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
912        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
913        assert!(mapper.output_schema().is_empty());
914        let flat_mapper = mapper.as_flat().unwrap();
915        assert_eq!(
916            [(0, ConcreteDataType::timestamp_millisecond_datatype())],
917            flat_mapper.batch_schema()
918        );
919
920        let batch = new_flat_batch(Some(0), &[], &[], 3);
921        let record_batch = flat_mapper.convert(&batch).unwrap();
922        assert_eq!(3, record_batch.num_rows());
923        assert_eq!(0, record_batch.num_columns());
924        assert!(record_batch.schema.is_empty());
925    }
926}