mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for projection.
16
17use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::RecordBatch;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, Result};
35use crate::read::Batch;
36use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
37
38/// Only cache vector when its length `<=` this value.
39const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
40
41/// Handles projection and converts a projected [Batch] to a projected [RecordBatch].
42pub struct ProjectionMapper {
43    /// Metadata of the region.
44    metadata: RegionMetadataRef,
45    /// Maps column in [RecordBatch] to index in [Batch].
46    batch_indices: Vec<BatchIndex>,
47    /// Output record batch contains tags.
48    has_tags: bool,
49    /// Decoder for primary key.
50    codec: Arc<dyn PrimaryKeyCodec>,
51    /// Schema for converted [RecordBatch].
52    output_schema: SchemaRef,
53    /// Ids of columns to project. It keeps ids in the same order as the `projection`
54    /// indices to build the mapper.
55    column_ids: Vec<ColumnId>,
56    /// Ids and DataTypes of field columns in the [Batch].
57    batch_fields: Vec<(ColumnId, ConcreteDataType)>,
58    /// `true` If the original projection is empty.
59    is_empty_projection: bool,
60}
61
62impl ProjectionMapper {
63    /// Returns a new mapper with projection.
64    /// If `projection` is empty, it outputs [RecordBatch] without any column but only a row count.
65    /// `SELECT COUNT(*) FROM table` is an example that uses an empty projection. DataFusion accepts
66    /// empty `RecordBatch` and only use its row count in this query.
67    pub fn new(
68        metadata: &RegionMetadataRef,
69        projection: impl Iterator<Item = usize>,
70    ) -> Result<ProjectionMapper> {
71        let mut projection: Vec<_> = projection.collect();
72        // If the original projection is empty.
73        let is_empty_projection = projection.is_empty();
74        if is_empty_projection {
75            // If the projection is empty, we still read the time index column.
76            projection.push(metadata.time_index_column_pos());
77        }
78
79        let mut column_schemas = Vec::with_capacity(projection.len());
80        let mut column_ids = Vec::with_capacity(projection.len());
81        for idx in &projection {
82            // For each projection index, we get the column id for projection.
83            let column = metadata
84                .column_metadatas
85                .get(*idx)
86                .context(InvalidRequestSnafu {
87                    region_id: metadata.region_id,
88                    reason: format!("projection index {} is out of bound", idx),
89                })?;
90
91            column_ids.push(column.column_id);
92            // Safety: idx is valid.
93            column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
94        }
95
96        let codec = build_primary_key_codec(metadata);
97        if is_empty_projection {
98            // If projection is empty, we don't output any column.
99            return Ok(ProjectionMapper {
100                metadata: metadata.clone(),
101                batch_indices: vec![],
102                has_tags: false,
103                codec,
104                output_schema: Arc::new(Schema::new(vec![])),
105                column_ids,
106                batch_fields: vec![],
107                is_empty_projection,
108            });
109        }
110
111        // Safety: Columns come from existing schema.
112        let output_schema = Arc::new(Schema::new(column_schemas));
113        // Get fields in each batch.
114        let batch_fields = Batch::projected_fields(metadata, &column_ids);
115
116        // Field column id to its index in batch.
117        let field_id_to_index: HashMap<_, _> = batch_fields
118            .iter()
119            .enumerate()
120            .map(|(index, (column_id, _))| (*column_id, index))
121            .collect();
122        // For each projected column, compute its index in batches.
123        let mut batch_indices = Vec::with_capacity(projection.len());
124        let mut has_tags = false;
125        for idx in &projection {
126            // Safety: idx is valid.
127            let column = &metadata.column_metadatas[*idx];
128            // Get column index in a batch by its semantic type and column id.
129            let batch_index = match column.semantic_type {
130                SemanticType::Tag => {
131                    // Safety: It is a primary key column.
132                    let index = metadata.primary_key_index(column.column_id).unwrap();
133                    // We need to output a tag.
134                    has_tags = true;
135                    // We always read all primary key so the column always exists and the tag
136                    // index is always valid.
137                    BatchIndex::Tag((index, column.column_id))
138                }
139                SemanticType::Timestamp => BatchIndex::Timestamp,
140                SemanticType::Field => {
141                    // Safety: It is a field column so it should be in `field_id_to_index`.
142                    let index = field_id_to_index[&column.column_id];
143                    BatchIndex::Field(index)
144                }
145            };
146            batch_indices.push(batch_index);
147        }
148
149        Ok(ProjectionMapper {
150            metadata: metadata.clone(),
151            batch_indices,
152            has_tags,
153            codec,
154            output_schema,
155            column_ids,
156            batch_fields,
157            is_empty_projection,
158        })
159    }
160
161    /// Returns a new mapper without projection.
162    pub fn all(metadata: &RegionMetadataRef) -> Result<ProjectionMapper> {
163        ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
164    }
165
166    /// Returns the metadata that created the mapper.
167    pub(crate) fn metadata(&self) -> &RegionMetadataRef {
168        &self.metadata
169    }
170
171    /// Returns ids of projected columns that we need to read
172    /// from memtables and SSTs.
173    pub(crate) fn column_ids(&self) -> &[ColumnId] {
174        &self.column_ids
175    }
176
177    /// Returns ids of fields in [Batch]es the mapper expects to convert.
178    pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
179        &self.batch_fields
180    }
181
182    /// Returns the schema of converted [RecordBatch].
183    /// This is the schema that the stream will output. This schema may contain
184    /// less columns than [ProjectionMapper::column_ids()].
185    pub(crate) fn output_schema(&self) -> SchemaRef {
186        self.output_schema.clone()
187    }
188
189    /// Returns an empty [RecordBatch].
190    pub(crate) fn empty_record_batch(&self) -> RecordBatch {
191        RecordBatch::new_empty(self.output_schema.clone())
192    }
193
194    /// Converts a [Batch] to a [RecordBatch].
195    ///
196    /// The batch must match the `projection` using to build the mapper.
197    pub(crate) fn convert(
198        &self,
199        batch: &Batch,
200        cache_strategy: &CacheStrategy,
201    ) -> common_recordbatch::error::Result<RecordBatch> {
202        if self.is_empty_projection {
203            return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
204        }
205
206        debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
207        debug_assert!(self
208            .batch_fields
209            .iter()
210            .zip(batch.fields())
211            .all(|((id, _), batch_col)| *id == batch_col.column_id));
212
213        // Skips decoding pk if we don't need to output it.
214        let pk_values = if self.has_tags {
215            match batch.pk_values() {
216                Some(v) => v.clone(),
217                None => self
218                    .codec
219                    .decode(batch.primary_key())
220                    .map_err(BoxedError::new)
221                    .context(ExternalSnafu)?,
222            }
223        } else {
224            CompositeValues::Dense(vec![])
225        };
226
227        let mut columns = Vec::with_capacity(self.output_schema.num_columns());
228        let num_rows = batch.num_rows();
229        for (index, column_schema) in self
230            .batch_indices
231            .iter()
232            .zip(self.output_schema.column_schemas())
233        {
234            match index {
235                BatchIndex::Tag((idx, column_id)) => {
236                    let value = match &pk_values {
237                        CompositeValues::Dense(v) => &v[*idx].1,
238                        CompositeValues::Sparse(v) => v.get_or_null(*column_id),
239                    };
240                    let vector = repeated_vector_with_cache(
241                        &column_schema.data_type,
242                        value,
243                        num_rows,
244                        cache_strategy,
245                    )?;
246                    columns.push(vector);
247                }
248                BatchIndex::Timestamp => {
249                    columns.push(batch.timestamps().clone());
250                }
251                BatchIndex::Field(idx) => {
252                    columns.push(batch.fields()[*idx].data.clone());
253                }
254            }
255        }
256
257        RecordBatch::new(self.output_schema.clone(), columns)
258    }
259}
260
261/// Index of a vector in a [Batch].
262#[derive(Debug, Clone, Copy)]
263enum BatchIndex {
264    /// Index in primary keys.
265    Tag((usize, ColumnId)),
266    /// The time index column.
267    Timestamp,
268    /// Index in fields.
269    Field(usize),
270}
271
272/// Gets a vector with repeated values from specific cache or creates a new one.
273fn repeated_vector_with_cache(
274    data_type: &ConcreteDataType,
275    value: &Value,
276    num_rows: usize,
277    cache_strategy: &CacheStrategy,
278) -> common_recordbatch::error::Result<VectorRef> {
279    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
280        // Tries to get the vector from cache manager. If the vector doesn't
281        // have enough length, creates a new one.
282        match vector.len().cmp(&num_rows) {
283            Ordering::Less => (),
284            Ordering::Equal => return Ok(vector),
285            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
286        }
287    }
288
289    // Creates a new one.
290    let vector = new_repeated_vector(data_type, value, num_rows)?;
291    // Updates cache.
292    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
293        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
294    }
295
296    Ok(vector)
297}
298
299/// Returns a vector with repeated values.
300fn new_repeated_vector(
301    data_type: &ConcreteDataType,
302    value: &Value,
303    num_rows: usize,
304) -> common_recordbatch::error::Result<VectorRef> {
305    let mut mutable_vector = data_type.create_mutable_vector(1);
306    mutable_vector
307        .try_push_value_ref(value.as_value_ref())
308        .map_err(BoxedError::new)
309        .context(ExternalSnafu)?;
310    // This requires an additional allocation.
311    let base_vector = mutable_vector.to_vector();
312    Ok(base_vector.replicate(&[num_rows]))
313}
314
315#[cfg(test)]
316mod tests {
317    use std::sync::Arc;
318
319    use api::v1::OpType;
320    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
321    use datatypes::arrow::util::pretty;
322    use datatypes::value::ValueRef;
323
324    use super::*;
325    use crate::cache::CacheManager;
326    use crate::read::BatchBuilder;
327    use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
328    use crate::test_util::meta_util::TestRegionMetadataBuilder;
329
330    fn new_batch(
331        ts_start: i64,
332        tags: &[i64],
333        fields: &[(ColumnId, i64)],
334        num_rows: usize,
335    ) -> Batch {
336        let converter = DensePrimaryKeyCodec::with_fields(
337            (0..tags.len())
338                .map(|idx| {
339                    (
340                        idx as u32,
341                        SortField::new(ConcreteDataType::int64_datatype()),
342                    )
343                })
344                .collect(),
345        );
346        let primary_key = converter
347            .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
348            .unwrap();
349
350        let mut builder = BatchBuilder::new(primary_key);
351        builder
352            .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
353                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
354            )))
355            .unwrap()
356            .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
357            .unwrap()
358            .op_types_array(Arc::new(UInt8Array::from_iter_values(
359                (0..num_rows).map(|_| OpType::Put as u8),
360            )))
361            .unwrap();
362        for (column_id, field) in fields {
363            builder
364                .push_field_array(
365                    *column_id,
366                    Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
367                        *field, num_rows,
368                    ))),
369                )
370                .unwrap();
371        }
372        builder.build().unwrap()
373    }
374
375    fn print_record_batch(record_batch: RecordBatch) -> String {
376        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
377            .unwrap()
378            .to_string()
379    }
380
381    #[test]
382    fn test_projection_mapper_all() {
383        let metadata = Arc::new(
384            TestRegionMetadataBuilder::default()
385                .num_tags(2)
386                .num_fields(2)
387                .build(),
388        );
389        let mapper = ProjectionMapper::all(&metadata).unwrap();
390        assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
391        assert_eq!(
392            [
393                (3, ConcreteDataType::int64_datatype()),
394                (4, ConcreteDataType::int64_datatype())
395            ],
396            mapper.batch_fields()
397        );
398
399        // With vector cache.
400        let cache = CacheManager::builder().vector_cache_size(1024).build();
401        let cache = CacheStrategy::EnableAll(Arc::new(cache));
402        let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
403        let record_batch = mapper.convert(&batch, &cache).unwrap();
404        let expect = "\
405+---------------------+----+----+----+----+
406| ts                  | k0 | k1 | v0 | v1 |
407+---------------------+----+----+----+----+
408| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
409| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
410| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
411+---------------------+----+----+----+----+";
412        assert_eq!(expect, print_record_batch(record_batch));
413
414        assert!(cache
415            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
416            .is_some());
417        assert!(cache
418            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
419            .is_some());
420        assert!(cache
421            .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
422            .is_none());
423        let record_batch = mapper.convert(&batch, &cache).unwrap();
424        assert_eq!(expect, print_record_batch(record_batch));
425    }
426
427    #[test]
428    fn test_projection_mapper_with_projection() {
429        let metadata = Arc::new(
430            TestRegionMetadataBuilder::default()
431                .num_tags(2)
432                .num_fields(2)
433                .build(),
434        );
435        // Columns v1, k0
436        let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
437        assert_eq!([4, 1], mapper.column_ids());
438        assert_eq!(
439            [(4, ConcreteDataType::int64_datatype())],
440            mapper.batch_fields()
441        );
442
443        let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
444        let cache = CacheManager::builder().vector_cache_size(1024).build();
445        let cache = CacheStrategy::EnableAll(Arc::new(cache));
446        let record_batch = mapper.convert(&batch, &cache).unwrap();
447        let expect = "\
448+----+----+
449| v1 | k0 |
450+----+----+
451| 4  | 1  |
452| 4  | 1  |
453| 4  | 1  |
454+----+----+";
455        assert_eq!(expect, print_record_batch(record_batch));
456    }
457
458    #[test]
459    fn test_projection_mapper_empty_projection() {
460        let metadata = Arc::new(
461            TestRegionMetadataBuilder::default()
462                .num_tags(2)
463                .num_fields(2)
464                .build(),
465        );
466        // Empty projection
467        let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
468        assert_eq!([0], mapper.column_ids()); // Should still read the time index column
469        assert!(mapper.batch_fields().is_empty());
470        assert!(!mapper.has_tags);
471        assert!(mapper.batch_indices.is_empty());
472        assert!(mapper.output_schema().is_empty());
473        assert!(mapper.is_empty_projection);
474
475        let batch = new_batch(0, &[1, 2], &[], 3);
476        let cache = CacheManager::builder().vector_cache_size(1024).build();
477        let cache = CacheStrategy::EnableAll(Arc::new(cache));
478        let record_batch = mapper.convert(&batch, &cache).unwrap();
479        assert_eq!(3, record_batch.num_rows());
480        assert_eq!(0, record_batch.num_columns());
481        assert!(record_batch.schema.is_empty());
482    }
483}