Skip to main content

mito2/read/
projection.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Projection helpers shared by flat projection code.
16
17use std::cmp::Ordering;
18
19use common_recordbatch::error::DataTypesSnafu;
20use datatypes::prelude::{ConcreteDataType, DataType};
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use snafu::{OptionExt, ResultExt};
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::ColumnId;
26
27use crate::cache::CacheStrategy;
28use crate::error::{InvalidRequestSnafu, Result};
29
30/// Only cache vector when its length `<=` this value.
31pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
32
33pub(crate) fn read_column_ids_from_projection(
34    metadata: &RegionMetadataRef,
35    projection: &[usize],
36) -> Result<Vec<ColumnId>> {
37    let mut column_ids = Vec::with_capacity(projection.len().max(1));
38    if projection.is_empty() {
39        column_ids.push(metadata.time_index_column().column_id);
40        return Ok(column_ids);
41    }
42
43    for idx in projection {
44        let column = metadata
45            .column_metadatas
46            .get(*idx)
47            .with_context(|| InvalidRequestSnafu {
48                region_id: metadata.region_id,
49                reason: format!("projection index {} is out of bound", idx),
50            })?;
51        column_ids.push(column.column_id);
52    }
53    Ok(column_ids)
54}
55
56/// Gets a vector with repeated values from specific cache or creates a new one.
57pub(crate) fn repeated_vector_with_cache(
58    data_type: &ConcreteDataType,
59    value: &Value,
60    num_rows: usize,
61    cache_strategy: &CacheStrategy,
62) -> common_recordbatch::error::Result<VectorRef> {
63    if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
64        match vector.len().cmp(&num_rows) {
65            Ordering::Less => (),
66            Ordering::Equal => return Ok(vector),
67            Ordering::Greater => return Ok(vector.slice(0, num_rows)),
68        }
69    }
70
71    let vector = new_repeated_vector(data_type, value, num_rows)?;
72    if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
73        cache_strategy.put_repeated_vector(value.clone(), vector.clone());
74    }
75
76    Ok(vector)
77}
78
79/// Returns a vector with repeated values.
80pub(crate) fn new_repeated_vector(
81    data_type: &ConcreteDataType,
82    value: &Value,
83    num_rows: usize,
84) -> common_recordbatch::error::Result<VectorRef> {
85    let mut mutable_vector = data_type.create_mutable_vector(1);
86    mutable_vector
87        .try_push_value_ref(&value.as_value_ref())
88        .context(DataTypesSnafu)?;
89    let base_vector = mutable_vector.to_vector();
90    Ok(base_vector.replicate(&[num_rows]))
91}
92
93#[cfg(test)]
94mod tests {
95    use std::sync::Arc;
96
97    use api::v1::OpType;
98    use common_recordbatch::RecordBatch;
99    use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
100    use datatypes::arrow::datatypes::Field;
101    use datatypes::arrow::util::pretty;
102    use datatypes::value::ValueRef;
103    use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
104    use mito_codec::test_util::TestRegionMetadataBuilder;
105    use store_api::storage::consts::{
106        OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
107    };
108
109    use super::*;
110    use crate::read::flat_projection::FlatProjectionMapper;
111    use crate::read::read_columns::ReadColumns;
112
113    fn print_record_batch(record_batch: RecordBatch) -> String {
114        pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
115            .unwrap()
116            .to_string()
117    }
118
119    fn new_flat_batch(
120        ts_start: Option<i64>,
121        idx_tags: &[(usize, i64)],
122        idx_fields: &[(usize, i64)],
123        num_rows: usize,
124    ) -> datatypes::arrow::record_batch::RecordBatch {
125        let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
126        let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
127
128        for (i, tag) in idx_tags {
129            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
130                *tag, num_rows,
131            ))) as _;
132            columns.push(array);
133            fields.push(Field::new(
134                format!("k{i}"),
135                datatypes::arrow::datatypes::DataType::Int64,
136                true,
137            ));
138        }
139
140        for (i, field) in idx_fields {
141            let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
142                *field, num_rows,
143            ))) as _;
144            columns.push(array);
145            fields.push(Field::new(
146                format!("v{i}"),
147                datatypes::arrow::datatypes::DataType::Int64,
148                true,
149            ));
150        }
151
152        if let Some(ts_start) = ts_start {
153            let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
154                (0..num_rows).map(|i| ts_start + i as i64 * 1000),
155            )) as _;
156            columns.push(timestamps);
157            fields.push(Field::new(
158                "ts",
159                datatypes::arrow::datatypes::DataType::Timestamp(
160                    datatypes::arrow::datatypes::TimeUnit::Millisecond,
161                    None,
162                ),
163                true,
164            ));
165        }
166
167        let converter = DensePrimaryKeyCodec::with_fields(
168            (0..idx_tags.len())
169                .map(|idx| {
170                    (
171                        idx as u32,
172                        SortField::new(ConcreteDataType::int64_datatype()),
173                    )
174                })
175                .collect(),
176        );
177        let encoded_pk = converter
178            .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
179            .unwrap();
180
181        let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
182        let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
183        let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
184        let pk_array =
185            Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
186        columns.push(pk_array);
187        fields.push(Field::new_dictionary(
188            PRIMARY_KEY_COLUMN_NAME,
189            datatypes::arrow::datatypes::DataType::UInt32,
190            datatypes::arrow::datatypes::DataType::Binary,
191            false,
192        ));
193
194        columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
195        fields.push(Field::new(
196            SEQUENCE_COLUMN_NAME,
197            datatypes::arrow::datatypes::DataType::UInt64,
198            false,
199        ));
200
201        columns.push(Arc::new(UInt8Array::from_iter_values(
202            (0..num_rows).map(|_| OpType::Put as u8),
203        )) as _);
204        fields.push(Field::new(
205            OP_TYPE_COLUMN_NAME,
206            datatypes::arrow::datatypes::DataType::UInt8,
207            false,
208        ));
209
210        let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
211
212        datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
213    }
214
215    #[test]
216    fn test_flat_projection_mapper_all() {
217        let metadata = Arc::new(
218            TestRegionMetadataBuilder::default()
219                .num_tags(2)
220                .num_fields(2)
221                .build(),
222        );
223        let cache = CacheStrategy::Disabled;
224        let mapper = FlatProjectionMapper::all(&metadata).unwrap();
225        assert_eq!(
226            &[0, 1, 2, 3, 4],
227            mapper.read_columns().column_ids().as_slice()
228        );
229        assert_eq!(
230            [
231                (1, ConcreteDataType::int64_datatype()),
232                (2, ConcreteDataType::int64_datatype()),
233                (3, ConcreteDataType::int64_datatype()),
234                (4, ConcreteDataType::int64_datatype()),
235                (0, ConcreteDataType::timestamp_millisecond_datatype())
236            ],
237            mapper.batch_schema()
238        );
239
240        let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
241        let record_batch = mapper.convert(&batch, &cache).unwrap();
242        let expect = "\
243+---------------------+----+----+----+----+
244| ts                  | k0 | k1 | v0 | v1 |
245+---------------------+----+----+----+----+
246| 1970-01-01T00:00:00 | 1  | 2  | 3  | 4  |
247| 1970-01-01T00:00:01 | 1  | 2  | 3  | 4  |
248| 1970-01-01T00:00:02 | 1  | 2  | 3  | 4  |
249+---------------------+----+----+----+----+";
250        assert_eq!(expect, print_record_batch(record_batch));
251    }
252
253    #[test]
254    fn test_flat_projection_mapper_with_projection() {
255        let metadata = Arc::new(
256            TestRegionMetadataBuilder::default()
257                .num_tags(2)
258                .num_fields(2)
259                .build(),
260        );
261        let cache = CacheStrategy::Disabled;
262        let mapper = FlatProjectionMapper::new(&metadata, [4, 1]).unwrap();
263        assert_eq!(&[4, 1], mapper.read_columns().column_ids().as_slice());
264        assert_eq!(
265            [
266                (1, ConcreteDataType::int64_datatype()),
267                (4, ConcreteDataType::int64_datatype()),
268                (0, ConcreteDataType::timestamp_millisecond_datatype())
269            ],
270            mapper.batch_schema()
271        );
272
273        let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
274        let record_batch = mapper.convert(&batch, &cache).unwrap();
275        let expect = "\
276+----+----+
277| v1 | k0 |
278+----+----+
279| 4  | 1  |
280| 4  | 1  |
281| 4  | 1  |
282+----+----+";
283        assert_eq!(expect, print_record_batch(record_batch));
284    }
285
286    #[test]
287    fn test_flat_projection_mapper_read_superset() {
288        let metadata = Arc::new(
289            TestRegionMetadataBuilder::default()
290                .num_tags(2)
291                .num_fields(2)
292                .build(),
293        );
294        let cache = CacheStrategy::Disabled;
295        let mapper = FlatProjectionMapper::new_with_read_columns(
296            &metadata,
297            vec![4, 1],
298            ReadColumns::from_deduped_column_ids([4, 1, 3]),
299        )
300        .unwrap();
301        assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice());
302
303        let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
304        let record_batch = mapper.convert(&batch, &cache).unwrap();
305        let expect = "\
306+----+----+
307| v1 | k0 |
308+----+----+
309| 4  | 1  |
310| 4  | 1  |
311| 4  | 1  |
312+----+----+";
313        assert_eq!(expect, print_record_batch(record_batch));
314    }
315
316    #[test]
317    fn test_flat_projection_mapper_empty_projection() {
318        let metadata = Arc::new(
319            TestRegionMetadataBuilder::default()
320                .num_tags(2)
321                .num_fields(2)
322                .build(),
323        );
324        let cache = CacheStrategy::Disabled;
325        let mapper = FlatProjectionMapper::new(&metadata, []).unwrap();
326        assert_eq!(&[0], mapper.read_columns().column_ids().as_slice());
327        assert!(mapper.output_schema().is_empty());
328        assert_eq!(
329            [(0, ConcreteDataType::timestamp_millisecond_datatype())],
330            mapper.batch_schema()
331        );
332
333        let batch = new_flat_batch(Some(0), &[], &[], 3);
334        let record_batch = mapper.convert(&batch, &cache).unwrap();
335        assert_eq!(3, record_batch.num_rows());
336        assert_eq!(0, record_batch.num_columns());
337        assert!(record_batch.schema.is_empty());
338    }
339}