1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::RecordBatch;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use snafu::{OptionExt, ResultExt};
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::cache::CacheStrategy;
34use crate::error::{InvalidRequestSnafu, Result};
35use crate::read::Batch;
36use crate::row_converter::{build_primary_key_codec, CompositeValues, PrimaryKeyCodec};
37
38const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
40
41pub struct ProjectionMapper {
43 metadata: RegionMetadataRef,
45 batch_indices: Vec<BatchIndex>,
47 has_tags: bool,
49 codec: Arc<dyn PrimaryKeyCodec>,
51 output_schema: SchemaRef,
53 column_ids: Vec<ColumnId>,
56 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
58 is_empty_projection: bool,
60}
61
62impl ProjectionMapper {
63 pub fn new(
68 metadata: &RegionMetadataRef,
69 projection: impl Iterator<Item = usize>,
70 ) -> Result<ProjectionMapper> {
71 let mut projection: Vec<_> = projection.collect();
72 let is_empty_projection = projection.is_empty();
74 if is_empty_projection {
75 projection.push(metadata.time_index_column_pos());
77 }
78
79 let mut column_schemas = Vec::with_capacity(projection.len());
80 let mut column_ids = Vec::with_capacity(projection.len());
81 for idx in &projection {
82 let column = metadata
84 .column_metadatas
85 .get(*idx)
86 .context(InvalidRequestSnafu {
87 region_id: metadata.region_id,
88 reason: format!("projection index {} is out of bound", idx),
89 })?;
90
91 column_ids.push(column.column_id);
92 column_schemas.push(metadata.schema.column_schemas()[*idx].clone());
94 }
95
96 let codec = build_primary_key_codec(metadata);
97 if is_empty_projection {
98 return Ok(ProjectionMapper {
100 metadata: metadata.clone(),
101 batch_indices: vec![],
102 has_tags: false,
103 codec,
104 output_schema: Arc::new(Schema::new(vec![])),
105 column_ids,
106 batch_fields: vec![],
107 is_empty_projection,
108 });
109 }
110
111 let output_schema = Arc::new(Schema::new(column_schemas));
113 let batch_fields = Batch::projected_fields(metadata, &column_ids);
115
116 let field_id_to_index: HashMap<_, _> = batch_fields
118 .iter()
119 .enumerate()
120 .map(|(index, (column_id, _))| (*column_id, index))
121 .collect();
122 let mut batch_indices = Vec::with_capacity(projection.len());
124 let mut has_tags = false;
125 for idx in &projection {
126 let column = &metadata.column_metadatas[*idx];
128 let batch_index = match column.semantic_type {
130 SemanticType::Tag => {
131 let index = metadata.primary_key_index(column.column_id).unwrap();
133 has_tags = true;
135 BatchIndex::Tag((index, column.column_id))
138 }
139 SemanticType::Timestamp => BatchIndex::Timestamp,
140 SemanticType::Field => {
141 let index = field_id_to_index[&column.column_id];
143 BatchIndex::Field(index)
144 }
145 };
146 batch_indices.push(batch_index);
147 }
148
149 Ok(ProjectionMapper {
150 metadata: metadata.clone(),
151 batch_indices,
152 has_tags,
153 codec,
154 output_schema,
155 column_ids,
156 batch_fields,
157 is_empty_projection,
158 })
159 }
160
161 pub fn all(metadata: &RegionMetadataRef) -> Result<ProjectionMapper> {
163 ProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
164 }
165
166 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
168 &self.metadata
169 }
170
171 pub(crate) fn column_ids(&self) -> &[ColumnId] {
174 &self.column_ids
175 }
176
177 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
179 &self.batch_fields
180 }
181
182 pub(crate) fn output_schema(&self) -> SchemaRef {
186 self.output_schema.clone()
187 }
188
189 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
191 RecordBatch::new_empty(self.output_schema.clone())
192 }
193
194 pub(crate) fn convert(
198 &self,
199 batch: &Batch,
200 cache_strategy: &CacheStrategy,
201 ) -> common_recordbatch::error::Result<RecordBatch> {
202 if self.is_empty_projection {
203 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
204 }
205
206 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
207 debug_assert!(self
208 .batch_fields
209 .iter()
210 .zip(batch.fields())
211 .all(|((id, _), batch_col)| *id == batch_col.column_id));
212
213 let pk_values = if self.has_tags {
215 match batch.pk_values() {
216 Some(v) => v.clone(),
217 None => self
218 .codec
219 .decode(batch.primary_key())
220 .map_err(BoxedError::new)
221 .context(ExternalSnafu)?,
222 }
223 } else {
224 CompositeValues::Dense(vec![])
225 };
226
227 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
228 let num_rows = batch.num_rows();
229 for (index, column_schema) in self
230 .batch_indices
231 .iter()
232 .zip(self.output_schema.column_schemas())
233 {
234 match index {
235 BatchIndex::Tag((idx, column_id)) => {
236 let value = match &pk_values {
237 CompositeValues::Dense(v) => &v[*idx].1,
238 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
239 };
240 let vector = repeated_vector_with_cache(
241 &column_schema.data_type,
242 value,
243 num_rows,
244 cache_strategy,
245 )?;
246 columns.push(vector);
247 }
248 BatchIndex::Timestamp => {
249 columns.push(batch.timestamps().clone());
250 }
251 BatchIndex::Field(idx) => {
252 columns.push(batch.fields()[*idx].data.clone());
253 }
254 }
255 }
256
257 RecordBatch::new(self.output_schema.clone(), columns)
258 }
259}
260
261#[derive(Debug, Clone, Copy)]
263enum BatchIndex {
264 Tag((usize, ColumnId)),
266 Timestamp,
268 Field(usize),
270}
271
272fn repeated_vector_with_cache(
274 data_type: &ConcreteDataType,
275 value: &Value,
276 num_rows: usize,
277 cache_strategy: &CacheStrategy,
278) -> common_recordbatch::error::Result<VectorRef> {
279 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
280 match vector.len().cmp(&num_rows) {
283 Ordering::Less => (),
284 Ordering::Equal => return Ok(vector),
285 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
286 }
287 }
288
289 let vector = new_repeated_vector(data_type, value, num_rows)?;
291 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
293 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
294 }
295
296 Ok(vector)
297}
298
299fn new_repeated_vector(
301 data_type: &ConcreteDataType,
302 value: &Value,
303 num_rows: usize,
304) -> common_recordbatch::error::Result<VectorRef> {
305 let mut mutable_vector = data_type.create_mutable_vector(1);
306 mutable_vector
307 .try_push_value_ref(value.as_value_ref())
308 .map_err(BoxedError::new)
309 .context(ExternalSnafu)?;
310 let base_vector = mutable_vector.to_vector();
312 Ok(base_vector.replicate(&[num_rows]))
313}
314
315#[cfg(test)]
316mod tests {
317 use std::sync::Arc;
318
319 use api::v1::OpType;
320 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt64Array, UInt8Array};
321 use datatypes::arrow::util::pretty;
322 use datatypes::value::ValueRef;
323
324 use super::*;
325 use crate::cache::CacheManager;
326 use crate::read::BatchBuilder;
327 use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
328 use crate::test_util::meta_util::TestRegionMetadataBuilder;
329
330 fn new_batch(
331 ts_start: i64,
332 tags: &[i64],
333 fields: &[(ColumnId, i64)],
334 num_rows: usize,
335 ) -> Batch {
336 let converter = DensePrimaryKeyCodec::with_fields(
337 (0..tags.len())
338 .map(|idx| {
339 (
340 idx as u32,
341 SortField::new(ConcreteDataType::int64_datatype()),
342 )
343 })
344 .collect(),
345 );
346 let primary_key = converter
347 .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
348 .unwrap();
349
350 let mut builder = BatchBuilder::new(primary_key);
351 builder
352 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
353 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
354 )))
355 .unwrap()
356 .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
357 .unwrap()
358 .op_types_array(Arc::new(UInt8Array::from_iter_values(
359 (0..num_rows).map(|_| OpType::Put as u8),
360 )))
361 .unwrap();
362 for (column_id, field) in fields {
363 builder
364 .push_field_array(
365 *column_id,
366 Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
367 *field, num_rows,
368 ))),
369 )
370 .unwrap();
371 }
372 builder.build().unwrap()
373 }
374
375 fn print_record_batch(record_batch: RecordBatch) -> String {
376 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
377 .unwrap()
378 .to_string()
379 }
380
381 #[test]
382 fn test_projection_mapper_all() {
383 let metadata = Arc::new(
384 TestRegionMetadataBuilder::default()
385 .num_tags(2)
386 .num_fields(2)
387 .build(),
388 );
389 let mapper = ProjectionMapper::all(&metadata).unwrap();
390 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
391 assert_eq!(
392 [
393 (3, ConcreteDataType::int64_datatype()),
394 (4, ConcreteDataType::int64_datatype())
395 ],
396 mapper.batch_fields()
397 );
398
399 let cache = CacheManager::builder().vector_cache_size(1024).build();
401 let cache = CacheStrategy::EnableAll(Arc::new(cache));
402 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
403 let record_batch = mapper.convert(&batch, &cache).unwrap();
404 let expect = "\
405+---------------------+----+----+----+----+
406| ts | k0 | k1 | v0 | v1 |
407+---------------------+----+----+----+----+
408| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
409| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
410| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
411+---------------------+----+----+----+----+";
412 assert_eq!(expect, print_record_batch(record_batch));
413
414 assert!(cache
415 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
416 .is_some());
417 assert!(cache
418 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
419 .is_some());
420 assert!(cache
421 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
422 .is_none());
423 let record_batch = mapper.convert(&batch, &cache).unwrap();
424 assert_eq!(expect, print_record_batch(record_batch));
425 }
426
427 #[test]
428 fn test_projection_mapper_with_projection() {
429 let metadata = Arc::new(
430 TestRegionMetadataBuilder::default()
431 .num_tags(2)
432 .num_fields(2)
433 .build(),
434 );
435 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
437 assert_eq!([4, 1], mapper.column_ids());
438 assert_eq!(
439 [(4, ConcreteDataType::int64_datatype())],
440 mapper.batch_fields()
441 );
442
443 let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
444 let cache = CacheManager::builder().vector_cache_size(1024).build();
445 let cache = CacheStrategy::EnableAll(Arc::new(cache));
446 let record_batch = mapper.convert(&batch, &cache).unwrap();
447 let expect = "\
448+----+----+
449| v1 | k0 |
450+----+----+
451| 4 | 1 |
452| 4 | 1 |
453| 4 | 1 |
454+----+----+";
455 assert_eq!(expect, print_record_batch(record_batch));
456 }
457
458 #[test]
459 fn test_projection_mapper_empty_projection() {
460 let metadata = Arc::new(
461 TestRegionMetadataBuilder::default()
462 .num_tags(2)
463 .num_fields(2)
464 .build(),
465 );
466 let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
468 assert_eq!([0], mapper.column_ids()); assert!(mapper.batch_fields().is_empty());
470 assert!(!mapper.has_tags);
471 assert!(mapper.batch_indices.is_empty());
472 assert!(mapper.output_schema().is_empty());
473 assert!(mapper.is_empty_projection);
474
475 let batch = new_batch(0, &[1, 2], &[], 3);
476 let cache = CacheManager::builder().vector_cache_size(1024).build();
477 let cache = CacheStrategy::EnableAll(Arc::new(cache));
478 let record_batch = mapper.convert(&batch, &cache).unwrap();
479 assert_eq!(3, record_batch.num_rows());
480 assert_eq!(0, record_batch.num_columns());
481 assert!(record_batch.schema.is_empty());
482 }
483}