1use std::cmp::Ordering;
18
19use common_recordbatch::error::DataTypesSnafu;
20use datatypes::prelude::{ConcreteDataType, DataType};
21use datatypes::value::Value;
22use datatypes::vectors::VectorRef;
23use snafu::{OptionExt, ResultExt};
24use store_api::metadata::RegionMetadataRef;
25use store_api::storage::ColumnId;
26
27use crate::cache::CacheStrategy;
28use crate::error::{InvalidRequestSnafu, Result};
29
30pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
32
33pub(crate) fn read_column_ids_from_projection(
34 metadata: &RegionMetadataRef,
35 projection: &[usize],
36) -> Result<Vec<ColumnId>> {
37 let mut column_ids = Vec::with_capacity(projection.len().max(1));
38 if projection.is_empty() {
39 column_ids.push(metadata.time_index_column().column_id);
40 return Ok(column_ids);
41 }
42
43 for idx in projection {
44 let column = metadata
45 .column_metadatas
46 .get(*idx)
47 .with_context(|| InvalidRequestSnafu {
48 region_id: metadata.region_id,
49 reason: format!("projection index {} is out of bound", idx),
50 })?;
51 column_ids.push(column.column_id);
52 }
53 Ok(column_ids)
54}
55
56pub(crate) fn repeated_vector_with_cache(
58 data_type: &ConcreteDataType,
59 value: &Value,
60 num_rows: usize,
61 cache_strategy: &CacheStrategy,
62) -> common_recordbatch::error::Result<VectorRef> {
63 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
64 match vector.len().cmp(&num_rows) {
65 Ordering::Less => (),
66 Ordering::Equal => return Ok(vector),
67 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
68 }
69 }
70
71 let vector = new_repeated_vector(data_type, value, num_rows)?;
72 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
73 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
74 }
75
76 Ok(vector)
77}
78
79pub(crate) fn new_repeated_vector(
81 data_type: &ConcreteDataType,
82 value: &Value,
83 num_rows: usize,
84) -> common_recordbatch::error::Result<VectorRef> {
85 let mut mutable_vector = data_type.create_mutable_vector(1);
86 mutable_vector
87 .try_push_value_ref(&value.as_value_ref())
88 .context(DataTypesSnafu)?;
89 let base_vector = mutable_vector.to_vector();
90 Ok(base_vector.replicate(&[num_rows]))
91}
92
93#[cfg(test)]
94mod tests {
95 use std::sync::Arc;
96
97 use api::v1::OpType;
98 use common_recordbatch::RecordBatch;
99 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
100 use datatypes::arrow::datatypes::Field;
101 use datatypes::arrow::util::pretty;
102 use datatypes::value::ValueRef;
103 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
104 use mito_codec::test_util::TestRegionMetadataBuilder;
105 use store_api::storage::consts::{
106 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
107 };
108
109 use super::*;
110 use crate::read::flat_projection::FlatProjectionMapper;
111 use crate::read::read_columns::ReadColumns;
112
113 fn print_record_batch(record_batch: RecordBatch) -> String {
114 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
115 .unwrap()
116 .to_string()
117 }
118
119 fn new_flat_batch(
120 ts_start: Option<i64>,
121 idx_tags: &[(usize, i64)],
122 idx_fields: &[(usize, i64)],
123 num_rows: usize,
124 ) -> datatypes::arrow::record_batch::RecordBatch {
125 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
126 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
127
128 for (i, tag) in idx_tags {
129 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
130 *tag, num_rows,
131 ))) as _;
132 columns.push(array);
133 fields.push(Field::new(
134 format!("k{i}"),
135 datatypes::arrow::datatypes::DataType::Int64,
136 true,
137 ));
138 }
139
140 for (i, field) in idx_fields {
141 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
142 *field, num_rows,
143 ))) as _;
144 columns.push(array);
145 fields.push(Field::new(
146 format!("v{i}"),
147 datatypes::arrow::datatypes::DataType::Int64,
148 true,
149 ));
150 }
151
152 if let Some(ts_start) = ts_start {
153 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
154 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
155 )) as _;
156 columns.push(timestamps);
157 fields.push(Field::new(
158 "ts",
159 datatypes::arrow::datatypes::DataType::Timestamp(
160 datatypes::arrow::datatypes::TimeUnit::Millisecond,
161 None,
162 ),
163 true,
164 ));
165 }
166
167 let converter = DensePrimaryKeyCodec::with_fields(
168 (0..idx_tags.len())
169 .map(|idx| {
170 (
171 idx as u32,
172 SortField::new(ConcreteDataType::int64_datatype()),
173 )
174 })
175 .collect(),
176 );
177 let encoded_pk = converter
178 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
179 .unwrap();
180
181 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
182 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
183 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
184 let pk_array =
185 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
186 columns.push(pk_array);
187 fields.push(Field::new_dictionary(
188 PRIMARY_KEY_COLUMN_NAME,
189 datatypes::arrow::datatypes::DataType::UInt32,
190 datatypes::arrow::datatypes::DataType::Binary,
191 false,
192 ));
193
194 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
195 fields.push(Field::new(
196 SEQUENCE_COLUMN_NAME,
197 datatypes::arrow::datatypes::DataType::UInt64,
198 false,
199 ));
200
201 columns.push(Arc::new(UInt8Array::from_iter_values(
202 (0..num_rows).map(|_| OpType::Put as u8),
203 )) as _);
204 fields.push(Field::new(
205 OP_TYPE_COLUMN_NAME,
206 datatypes::arrow::datatypes::DataType::UInt8,
207 false,
208 ));
209
210 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
211
212 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
213 }
214
215 #[test]
216 fn test_flat_projection_mapper_all() {
217 let metadata = Arc::new(
218 TestRegionMetadataBuilder::default()
219 .num_tags(2)
220 .num_fields(2)
221 .build(),
222 );
223 let cache = CacheStrategy::Disabled;
224 let mapper = FlatProjectionMapper::all(&metadata).unwrap();
225 assert_eq!(
226 &[0, 1, 2, 3, 4],
227 mapper.read_columns().column_ids().as_slice()
228 );
229 assert_eq!(
230 [
231 (1, ConcreteDataType::int64_datatype()),
232 (2, ConcreteDataType::int64_datatype()),
233 (3, ConcreteDataType::int64_datatype()),
234 (4, ConcreteDataType::int64_datatype()),
235 (0, ConcreteDataType::timestamp_millisecond_datatype())
236 ],
237 mapper.batch_schema()
238 );
239
240 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
241 let record_batch = mapper.convert(&batch, &cache).unwrap();
242 let expect = "\
243+---------------------+----+----+----+----+
244| ts | k0 | k1 | v0 | v1 |
245+---------------------+----+----+----+----+
246| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
247| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
248| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
249+---------------------+----+----+----+----+";
250 assert_eq!(expect, print_record_batch(record_batch));
251 }
252
253 #[test]
254 fn test_flat_projection_mapper_with_projection() {
255 let metadata = Arc::new(
256 TestRegionMetadataBuilder::default()
257 .num_tags(2)
258 .num_fields(2)
259 .build(),
260 );
261 let cache = CacheStrategy::Disabled;
262 let mapper = FlatProjectionMapper::new(&metadata, [4, 1]).unwrap();
263 assert_eq!(&[4, 1], mapper.read_columns().column_ids().as_slice());
264 assert_eq!(
265 [
266 (1, ConcreteDataType::int64_datatype()),
267 (4, ConcreteDataType::int64_datatype()),
268 (0, ConcreteDataType::timestamp_millisecond_datatype())
269 ],
270 mapper.batch_schema()
271 );
272
273 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
274 let record_batch = mapper.convert(&batch, &cache).unwrap();
275 let expect = "\
276+----+----+
277| v1 | k0 |
278+----+----+
279| 4 | 1 |
280| 4 | 1 |
281| 4 | 1 |
282+----+----+";
283 assert_eq!(expect, print_record_batch(record_batch));
284 }
285
286 #[test]
287 fn test_flat_projection_mapper_read_superset() {
288 let metadata = Arc::new(
289 TestRegionMetadataBuilder::default()
290 .num_tags(2)
291 .num_fields(2)
292 .build(),
293 );
294 let cache = CacheStrategy::Disabled;
295 let mapper = FlatProjectionMapper::new_with_read_columns(
296 &metadata,
297 vec![4, 1],
298 ReadColumns::from_deduped_column_ids([4, 1, 3]),
299 )
300 .unwrap();
301 assert_eq!(&[4, 1, 3], mapper.read_columns().column_ids().as_slice());
302
303 let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
304 let record_batch = mapper.convert(&batch, &cache).unwrap();
305 let expect = "\
306+----+----+
307| v1 | k0 |
308+----+----+
309| 4 | 1 |
310| 4 | 1 |
311| 4 | 1 |
312+----+----+";
313 assert_eq!(expect, print_record_batch(record_batch));
314 }
315
316 #[test]
317 fn test_flat_projection_mapper_empty_projection() {
318 let metadata = Arc::new(
319 TestRegionMetadataBuilder::default()
320 .num_tags(2)
321 .num_fields(2)
322 .build(),
323 );
324 let cache = CacheStrategy::Disabled;
325 let mapper = FlatProjectionMapper::new(&metadata, []).unwrap();
326 assert_eq!(&[0], mapper.read_columns().column_ids().as_slice());
327 assert!(mapper.output_schema().is_empty());
328 assert_eq!(
329 [(0, ConcreteDataType::timestamp_millisecond_datatype())],
330 mapper.batch_schema()
331 );
332
333 let batch = new_flat_batch(Some(0), &[], &[], 3);
334 let record_batch = mapper.convert(&batch, &cache).unwrap();
335 assert_eq!(3, record_batch.num_rows());
336 assert_eq!(0, record_batch.num_columns());
337 assert!(record_batch.schema.is_empty());
338 }
339}