1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::{DataTypesSnafu, ExternalSnafu};
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39pub(crate) const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42pub enum ProjectionMapper {
44 PrimaryKey(PrimaryKeyProjectionMapper),
46 Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51 pub fn new(
53 metadata: &RegionMetadataRef,
54 projection: impl Iterator<Item = usize> + Clone,
55 ) -> Result<Self> {
56 Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
57 metadata, projection,
58 )?))
59 }
60
61 pub fn new_with_read_columns(
63 metadata: &RegionMetadataRef,
64 projection: impl Iterator<Item = usize>,
65 read_column_ids: Vec<ColumnId>,
66 ) -> Result<Self> {
67 let projection: Vec<_> = projection.collect();
68 Ok(ProjectionMapper::Flat(
69 FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
70 ))
71 }
72
73 pub fn all(metadata: &RegionMetadataRef) -> Result<Self> {
75 Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
76 }
77
78 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
80 match self {
81 ProjectionMapper::PrimaryKey(m) => m.metadata(),
82 ProjectionMapper::Flat(m) => m.metadata(),
83 }
84 }
85
86 pub(crate) fn has_tags(&self) -> bool {
88 match self {
89 ProjectionMapper::PrimaryKey(m) => m.has_tags(),
90 ProjectionMapper::Flat(_) => false,
91 }
92 }
93
94 pub(crate) fn column_ids(&self) -> &[ColumnId] {
97 match self {
98 ProjectionMapper::PrimaryKey(m) => m.column_ids(),
99 ProjectionMapper::Flat(m) => m.column_ids(),
100 }
101 }
102
103 pub(crate) fn output_schema(&self) -> SchemaRef {
105 match self {
106 ProjectionMapper::PrimaryKey(m) => m.output_schema(),
107 ProjectionMapper::Flat(m) => m.output_schema(),
108 }
109 }
110
111 pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
113 match self {
114 ProjectionMapper::PrimaryKey(m) => Some(m),
115 ProjectionMapper::Flat(_) => None,
116 }
117 }
118
119 pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
121 match self {
122 ProjectionMapper::PrimaryKey(_) => None,
123 ProjectionMapper::Flat(m) => Some(m),
124 }
125 }
126
127 pub fn empty_record_batch(&self) -> RecordBatch {
130 match self {
131 ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
132 ProjectionMapper::Flat(m) => m.empty_record_batch(),
133 }
134 }
135}
136
137#[allow(dead_code)]
139pub struct PrimaryKeyProjectionMapper {
140 metadata: RegionMetadataRef,
142 batch_indices: Vec<BatchIndex>,
144 has_tags: bool,
146 codec: Arc<dyn PrimaryKeyCodec>,
148 output_schema: SchemaRef,
150 read_column_ids: Vec<ColumnId>,
152 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
154 is_empty_projection: bool,
156}
157
158#[allow(dead_code)]
159impl PrimaryKeyProjectionMapper {
160 pub fn new(
165 metadata: &RegionMetadataRef,
166 projection: impl Iterator<Item = usize>,
167 ) -> Result<PrimaryKeyProjectionMapper> {
168 let projection: Vec<_> = projection.collect();
169 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
170 Self::new_with_read_columns(metadata, projection, read_column_ids)
171 }
172
173 pub fn new_with_read_columns(
175 metadata: &RegionMetadataRef,
176 projection: Vec<usize>,
177 read_column_ids: Vec<ColumnId>,
178 ) -> Result<PrimaryKeyProjectionMapper> {
179 let is_empty_projection = projection.is_empty();
181
182 let mut column_schemas = Vec::with_capacity(projection.len());
183 for idx in &projection {
184 column_schemas.push(
186 metadata
187 .schema
188 .column_schemas()
189 .get(*idx)
190 .with_context(|| InvalidRequestSnafu {
191 region_id: metadata.region_id,
192 reason: format!("projection index {} is out of bound", idx),
193 })?
194 .clone(),
195 );
196 }
197
198 let codec = build_primary_key_codec(metadata);
199 let output_schema = if is_empty_projection {
201 Arc::new(Schema::new(vec![]))
202 } else {
203 Arc::new(Schema::new(column_schemas))
205 };
206 let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
208
209 let field_id_to_index: HashMap<_, _> = batch_fields
211 .iter()
212 .enumerate()
213 .map(|(index, (column_id, _))| (*column_id, index))
214 .collect();
215 let mut batch_indices = Vec::with_capacity(projection.len());
217 let mut has_tags = false;
218 if !is_empty_projection {
219 for idx in &projection {
220 let column = &metadata.column_metadatas[*idx];
222 let batch_index = match column.semantic_type {
224 SemanticType::Tag => {
225 let index = metadata.primary_key_index(column.column_id).unwrap();
227 has_tags = true;
229 BatchIndex::Tag((index, column.column_id))
232 }
233 SemanticType::Timestamp => BatchIndex::Timestamp,
234 SemanticType::Field => {
235 let index = *field_id_to_index.get(&column.column_id).context(
236 InvalidRequestSnafu {
237 region_id: metadata.region_id,
238 reason: format!(
239 "field column {} is missing in read projection",
240 column.column_schema.name
241 ),
242 },
243 )?;
244 BatchIndex::Field(index)
245 }
246 };
247 batch_indices.push(batch_index);
248 }
249 }
250
251 Ok(PrimaryKeyProjectionMapper {
252 metadata: metadata.clone(),
253 batch_indices,
254 has_tags,
255 codec,
256 output_schema,
257 read_column_ids,
258 batch_fields,
259 is_empty_projection,
260 })
261 }
262
263 pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
265 PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
266 }
267
268 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
270 &self.metadata
271 }
272
273 pub(crate) fn has_tags(&self) -> bool {
275 self.has_tags
276 }
277
278 pub(crate) fn column_ids(&self) -> &[ColumnId] {
281 &self.read_column_ids
282 }
283
284 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
286 &self.batch_fields
287 }
288
289 pub(crate) fn output_schema(&self) -> SchemaRef {
293 self.output_schema.clone()
294 }
295
296 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
298 RecordBatch::new_empty(self.output_schema.clone())
299 }
300
301 pub(crate) fn convert(
305 &self,
306 batch: &Batch,
307 cache_strategy: &CacheStrategy,
308 ) -> common_recordbatch::error::Result<RecordBatch> {
309 if self.is_empty_projection {
310 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
311 }
312
313 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
314 debug_assert!(
315 self.batch_fields
316 .iter()
317 .zip(batch.fields())
318 .all(|((id, _), batch_col)| *id == batch_col.column_id)
319 );
320
321 let pk_values = if self.has_tags {
323 match batch.pk_values() {
324 Some(v) => v.clone(),
325 None => self
326 .codec
327 .decode(batch.primary_key())
328 .map_err(BoxedError::new)
329 .context(ExternalSnafu)?,
330 }
331 } else {
332 CompositeValues::Dense(vec![])
333 };
334
335 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
336 let num_rows = batch.num_rows();
337 for (index, column_schema) in self
338 .batch_indices
339 .iter()
340 .zip(self.output_schema.column_schemas())
341 {
342 match index {
343 BatchIndex::Tag((idx, column_id)) => {
344 let value = match &pk_values {
345 CompositeValues::Dense(v) => &v[*idx].1,
346 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
347 };
348 let vector = repeated_vector_with_cache(
349 &column_schema.data_type,
350 value,
351 num_rows,
352 cache_strategy,
353 )?;
354 columns.push(vector);
355 }
356 BatchIndex::Timestamp => {
357 columns.push(batch.timestamps().clone());
358 }
359 BatchIndex::Field(idx) => {
360 columns.push(batch.fields()[*idx].data.clone());
361 }
362 }
363 }
364
365 RecordBatch::new(self.output_schema.clone(), columns)
366 }
367}
368
369pub(crate) fn read_column_ids_from_projection(
370 metadata: &RegionMetadataRef,
371 projection: &[usize],
372) -> Result<Vec<ColumnId>> {
373 let mut column_ids = Vec::with_capacity(projection.len().max(1));
374 if projection.is_empty() {
375 column_ids.push(metadata.time_index_column().column_id);
376 return Ok(column_ids);
377 }
378
379 for idx in projection {
380 let column = metadata
381 .column_metadatas
382 .get(*idx)
383 .with_context(|| InvalidRequestSnafu {
384 region_id: metadata.region_id,
385 reason: format!("projection index {} is out of bound", idx),
386 })?;
387 column_ids.push(column.column_id);
388 }
389 Ok(column_ids)
390}
391
392#[derive(Debug, Clone, Copy)]
394#[allow(dead_code)]
395enum BatchIndex {
396 Tag((usize, ColumnId)),
398 Timestamp,
400 Field(usize),
402}
403
404pub(crate) fn repeated_vector_with_cache(
406 data_type: &ConcreteDataType,
407 value: &Value,
408 num_rows: usize,
409 cache_strategy: &CacheStrategy,
410) -> common_recordbatch::error::Result<VectorRef> {
411 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
412 match vector.len().cmp(&num_rows) {
415 Ordering::Less => (),
416 Ordering::Equal => return Ok(vector),
417 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
418 }
419 }
420
421 let vector = new_repeated_vector(data_type, value, num_rows)?;
423 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
425 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
426 }
427
428 Ok(vector)
429}
430
431pub(crate) fn new_repeated_vector(
433 data_type: &ConcreteDataType,
434 value: &Value,
435 num_rows: usize,
436) -> common_recordbatch::error::Result<VectorRef> {
437 let mut mutable_vector = data_type.create_mutable_vector(1);
438 mutable_vector
439 .try_push_value_ref(&value.as_value_ref())
440 .context(DataTypesSnafu)?;
441 let base_vector = mutable_vector.to_vector();
443 Ok(base_vector.replicate(&[num_rows]))
444}
445
446#[cfg(test)]
447mod tests {
448 use std::sync::Arc;
449
450 use api::v1::OpType;
451 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
452 use datatypes::arrow::datatypes::Field;
453 use datatypes::arrow::util::pretty;
454 use datatypes::value::ValueRef;
455 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
456 use mito_codec::test_util::TestRegionMetadataBuilder;
457 use store_api::storage::consts::{
458 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
459 };
460
461 use super::*;
462
463 fn print_record_batch(record_batch: RecordBatch) -> String {
464 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
465 .unwrap()
466 .to_string()
467 }
468
469 fn new_flat_batch(
470 ts_start: Option<i64>,
471 idx_tags: &[(usize, i64)],
472 idx_fields: &[(usize, i64)],
473 num_rows: usize,
474 ) -> datatypes::arrow::record_batch::RecordBatch {
475 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
476 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
477
478 for (i, tag) in idx_tags {
482 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
483 *tag, num_rows,
484 ))) as _;
485 columns.push(array);
486 fields.push(Field::new(
487 format!("k{i}"),
488 datatypes::arrow::datatypes::DataType::Int64,
489 true,
490 ));
491 }
492
493 for (i, field) in idx_fields {
495 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
496 *field, num_rows,
497 ))) as _;
498 columns.push(array);
499 fields.push(Field::new(
500 format!("v{i}"),
501 datatypes::arrow::datatypes::DataType::Int64,
502 true,
503 ));
504 }
505
506 if let Some(ts_start) = ts_start {
508 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
509 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
510 )) as _;
511 columns.push(timestamps);
512 fields.push(Field::new(
513 "ts",
514 datatypes::arrow::datatypes::DataType::Timestamp(
515 datatypes::arrow::datatypes::TimeUnit::Millisecond,
516 None,
517 ),
518 true,
519 ));
520 }
521
522 let converter = DensePrimaryKeyCodec::with_fields(
525 (0..idx_tags.len())
526 .map(|idx| {
527 (
528 idx as u32,
529 SortField::new(ConcreteDataType::int64_datatype()),
530 )
531 })
532 .collect(),
533 );
534 let encoded_pk = converter
535 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
536 .unwrap();
537
538 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
540 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
541 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
542 let pk_array =
543 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
544 columns.push(pk_array);
545 fields.push(Field::new_dictionary(
546 PRIMARY_KEY_COLUMN_NAME,
547 datatypes::arrow::datatypes::DataType::UInt32,
548 datatypes::arrow::datatypes::DataType::Binary,
549 false,
550 ));
551
552 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
554 fields.push(Field::new(
555 SEQUENCE_COLUMN_NAME,
556 datatypes::arrow::datatypes::DataType::UInt64,
557 false,
558 ));
559
560 columns.push(Arc::new(UInt8Array::from_iter_values(
562 (0..num_rows).map(|_| OpType::Put as u8),
563 )) as _);
564 fields.push(Field::new(
565 OP_TYPE_COLUMN_NAME,
566 datatypes::arrow::datatypes::DataType::UInt8,
567 false,
568 ));
569
570 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
571
572 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
573 }
574
575 #[test]
576 fn test_flat_projection_mapper_all() {
577 let metadata = Arc::new(
578 TestRegionMetadataBuilder::default()
579 .num_tags(2)
580 .num_fields(2)
581 .build(),
582 );
583 let cache = CacheStrategy::Disabled;
584 let mapper = ProjectionMapper::all(&metadata).unwrap();
585 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
586 assert_eq!(
587 [
588 (1, ConcreteDataType::int64_datatype()),
589 (2, ConcreteDataType::int64_datatype()),
590 (3, ConcreteDataType::int64_datatype()),
591 (4, ConcreteDataType::int64_datatype()),
592 (0, ConcreteDataType::timestamp_millisecond_datatype())
593 ],
594 mapper.as_flat().unwrap().batch_schema()
595 );
596
597 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
598 let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
599 let expect = "\
600+---------------------+----+----+----+----+
601| ts | k0 | k1 | v0 | v1 |
602+---------------------+----+----+----+----+
603| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
604| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
605| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
606+---------------------+----+----+----+----+";
607 assert_eq!(expect, print_record_batch(record_batch));
608 }
609
610 #[test]
611 fn test_flat_projection_mapper_with_projection() {
612 let metadata = Arc::new(
613 TestRegionMetadataBuilder::default()
614 .num_tags(2)
615 .num_fields(2)
616 .build(),
617 );
618 let cache = CacheStrategy::Disabled;
619 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter()).unwrap();
621 assert_eq!([4, 1], mapper.column_ids());
622 assert_eq!(
623 [
624 (1, ConcreteDataType::int64_datatype()),
625 (4, ConcreteDataType::int64_datatype()),
626 (0, ConcreteDataType::timestamp_millisecond_datatype())
627 ],
628 mapper.as_flat().unwrap().batch_schema()
629 );
630
631 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
632 let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
633 let expect = "\
634+----+----+
635| v1 | k0 |
636+----+----+
637| 4 | 1 |
638| 4 | 1 |
639| 4 | 1 |
640+----+----+";
641 assert_eq!(expect, print_record_batch(record_batch));
642 }
643
644 #[test]
645 fn test_flat_projection_mapper_read_superset() {
646 let metadata = Arc::new(
647 TestRegionMetadataBuilder::default()
648 .num_tags(2)
649 .num_fields(2)
650 .build(),
651 );
652 let cache = CacheStrategy::Disabled;
653 let mapper =
655 ProjectionMapper::new_with_read_columns(&metadata, [4, 1].into_iter(), vec![4, 1, 3])
656 .unwrap();
657 assert_eq!([4, 1, 3], mapper.column_ids());
658
659 let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
660 let record_batch = mapper.as_flat().unwrap().convert(&batch, &cache).unwrap();
661 let expect = "\
662+----+----+
663| v1 | k0 |
664+----+----+
665| 4 | 1 |
666| 4 | 1 |
667| 4 | 1 |
668+----+----+";
669 assert_eq!(expect, print_record_batch(record_batch));
670 }
671
672 #[test]
673 fn test_flat_projection_mapper_empty_projection() {
674 let metadata = Arc::new(
675 TestRegionMetadataBuilder::default()
676 .num_tags(2)
677 .num_fields(2)
678 .build(),
679 );
680 let cache = CacheStrategy::Disabled;
681 let mapper = ProjectionMapper::new(&metadata, [].into_iter()).unwrap();
683 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
685 let flat_mapper = mapper.as_flat().unwrap();
686 assert_eq!(
687 [(0, ConcreteDataType::timestamp_millisecond_datatype())],
688 flat_mapper.batch_schema()
689 );
690
691 let batch = new_flat_batch(Some(0), &[], &[], 3);
692 let record_batch = flat_mapper.convert(&batch, &cache).unwrap();
693 assert_eq!(3, record_batch.num_rows());
694 assert_eq!(0, record_batch.num_columns());
695 assert!(record_batch.schema.is_empty());
696 }
697}