1use std::cmp::Ordering;
18use std::collections::HashMap;
19use std::sync::Arc;
20
21use api::v1::SemanticType;
22use common_error::ext::BoxedError;
23use common_recordbatch::RecordBatch;
24use common_recordbatch::error::ExternalSnafu;
25use datatypes::prelude::{ConcreteDataType, DataType};
26use datatypes::schema::{Schema, SchemaRef};
27use datatypes::value::Value;
28use datatypes::vectors::VectorRef;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec, build_primary_key_codec};
30use snafu::{OptionExt, ResultExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::storage::ColumnId;
33
34use crate::cache::CacheStrategy;
35use crate::error::{InvalidRequestSnafu, Result};
36use crate::read::Batch;
37use crate::read::flat_projection::FlatProjectionMapper;
38
39const MAX_VECTOR_LENGTH_TO_CACHE: usize = 16384;
41
42pub enum ProjectionMapper {
44 PrimaryKey(PrimaryKeyProjectionMapper),
46 Flat(FlatProjectionMapper),
48}
49
50impl ProjectionMapper {
51 pub fn new(
53 metadata: &RegionMetadataRef,
54 projection: impl Iterator<Item = usize> + Clone,
55 flat_format: bool,
56 ) -> Result<Self> {
57 if flat_format {
58 Ok(ProjectionMapper::Flat(FlatProjectionMapper::new(
59 metadata, projection,
60 )?))
61 } else {
62 Ok(ProjectionMapper::PrimaryKey(
63 PrimaryKeyProjectionMapper::new(metadata, projection)?,
64 ))
65 }
66 }
67
68 pub fn new_with_read_columns(
70 metadata: &RegionMetadataRef,
71 projection: impl Iterator<Item = usize>,
72 flat_format: bool,
73 read_column_ids: Vec<ColumnId>,
74 ) -> Result<Self> {
75 let projection: Vec<_> = projection.collect();
76 if flat_format {
77 Ok(ProjectionMapper::Flat(
78 FlatProjectionMapper::new_with_read_columns(metadata, projection, read_column_ids)?,
79 ))
80 } else {
81 Ok(ProjectionMapper::PrimaryKey(
82 PrimaryKeyProjectionMapper::new_with_read_columns(
83 metadata,
84 projection,
85 read_column_ids,
86 )?,
87 ))
88 }
89 }
90
91 pub fn all(metadata: &RegionMetadataRef, flat_format: bool) -> Result<Self> {
93 if flat_format {
94 Ok(ProjectionMapper::Flat(FlatProjectionMapper::all(metadata)?))
95 } else {
96 Ok(ProjectionMapper::PrimaryKey(
97 PrimaryKeyProjectionMapper::all(metadata)?,
98 ))
99 }
100 }
101
102 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
104 match self {
105 ProjectionMapper::PrimaryKey(m) => m.metadata(),
106 ProjectionMapper::Flat(m) => m.metadata(),
107 }
108 }
109
110 pub(crate) fn has_tags(&self) -> bool {
112 match self {
113 ProjectionMapper::PrimaryKey(m) => m.has_tags(),
114 ProjectionMapper::Flat(_) => false,
115 }
116 }
117
118 pub(crate) fn column_ids(&self) -> &[ColumnId] {
121 match self {
122 ProjectionMapper::PrimaryKey(m) => m.column_ids(),
123 ProjectionMapper::Flat(m) => m.column_ids(),
124 }
125 }
126
127 pub(crate) fn output_schema(&self) -> SchemaRef {
129 match self {
130 ProjectionMapper::PrimaryKey(m) => m.output_schema(),
131 ProjectionMapper::Flat(m) => m.output_schema(),
132 }
133 }
134
135 pub fn as_primary_key(&self) -> Option<&PrimaryKeyProjectionMapper> {
137 match self {
138 ProjectionMapper::PrimaryKey(m) => Some(m),
139 ProjectionMapper::Flat(_) => None,
140 }
141 }
142
143 pub fn as_flat(&self) -> Option<&FlatProjectionMapper> {
145 match self {
146 ProjectionMapper::PrimaryKey(_) => None,
147 ProjectionMapper::Flat(m) => Some(m),
148 }
149 }
150
151 pub fn empty_record_batch(&self) -> RecordBatch {
154 match self {
155 ProjectionMapper::PrimaryKey(m) => m.empty_record_batch(),
156 ProjectionMapper::Flat(m) => m.empty_record_batch(),
157 }
158 }
159}
160
161pub struct PrimaryKeyProjectionMapper {
163 metadata: RegionMetadataRef,
165 batch_indices: Vec<BatchIndex>,
167 has_tags: bool,
169 codec: Arc<dyn PrimaryKeyCodec>,
171 output_schema: SchemaRef,
173 read_column_ids: Vec<ColumnId>,
175 batch_fields: Vec<(ColumnId, ConcreteDataType)>,
177 is_empty_projection: bool,
179}
180
181impl PrimaryKeyProjectionMapper {
182 pub fn new(
187 metadata: &RegionMetadataRef,
188 projection: impl Iterator<Item = usize>,
189 ) -> Result<PrimaryKeyProjectionMapper> {
190 let projection: Vec<_> = projection.collect();
191 let read_column_ids = read_column_ids_from_projection(metadata, &projection)?;
192 Self::new_with_read_columns(metadata, projection, read_column_ids)
193 }
194
195 pub fn new_with_read_columns(
197 metadata: &RegionMetadataRef,
198 projection: Vec<usize>,
199 read_column_ids: Vec<ColumnId>,
200 ) -> Result<PrimaryKeyProjectionMapper> {
201 let is_empty_projection = projection.is_empty();
203
204 let mut column_schemas = Vec::with_capacity(projection.len());
205 for idx in &projection {
206 column_schemas.push(
208 metadata
209 .schema
210 .column_schemas()
211 .get(*idx)
212 .with_context(|| InvalidRequestSnafu {
213 region_id: metadata.region_id,
214 reason: format!("projection index {} is out of bound", idx),
215 })?
216 .clone(),
217 );
218 }
219
220 let codec = build_primary_key_codec(metadata);
221 let output_schema = if is_empty_projection {
223 Arc::new(Schema::new(vec![]))
224 } else {
225 Arc::new(Schema::new(column_schemas))
227 };
228 let batch_fields = Batch::projected_fields(metadata, &read_column_ids);
230
231 let field_id_to_index: HashMap<_, _> = batch_fields
233 .iter()
234 .enumerate()
235 .map(|(index, (column_id, _))| (*column_id, index))
236 .collect();
237 let mut batch_indices = Vec::with_capacity(projection.len());
239 let mut has_tags = false;
240 if !is_empty_projection {
241 for idx in &projection {
242 let column = &metadata.column_metadatas[*idx];
244 let batch_index = match column.semantic_type {
246 SemanticType::Tag => {
247 let index = metadata.primary_key_index(column.column_id).unwrap();
249 has_tags = true;
251 BatchIndex::Tag((index, column.column_id))
254 }
255 SemanticType::Timestamp => BatchIndex::Timestamp,
256 SemanticType::Field => {
257 let index = *field_id_to_index.get(&column.column_id).context(
258 InvalidRequestSnafu {
259 region_id: metadata.region_id,
260 reason: format!(
261 "field column {} is missing in read projection",
262 column.column_schema.name
263 ),
264 },
265 )?;
266 BatchIndex::Field(index)
267 }
268 };
269 batch_indices.push(batch_index);
270 }
271 }
272
273 Ok(PrimaryKeyProjectionMapper {
274 metadata: metadata.clone(),
275 batch_indices,
276 has_tags,
277 codec,
278 output_schema,
279 read_column_ids,
280 batch_fields,
281 is_empty_projection,
282 })
283 }
284
285 pub fn all(metadata: &RegionMetadataRef) -> Result<PrimaryKeyProjectionMapper> {
287 PrimaryKeyProjectionMapper::new(metadata, 0..metadata.column_metadatas.len())
288 }
289
290 pub(crate) fn metadata(&self) -> &RegionMetadataRef {
292 &self.metadata
293 }
294
295 pub(crate) fn has_tags(&self) -> bool {
297 self.has_tags
298 }
299
300 pub(crate) fn column_ids(&self) -> &[ColumnId] {
303 &self.read_column_ids
304 }
305
306 pub(crate) fn batch_fields(&self) -> &[(ColumnId, ConcreteDataType)] {
308 &self.batch_fields
309 }
310
311 pub(crate) fn output_schema(&self) -> SchemaRef {
315 self.output_schema.clone()
316 }
317
318 pub(crate) fn empty_record_batch(&self) -> RecordBatch {
320 RecordBatch::new_empty(self.output_schema.clone())
321 }
322
323 pub(crate) fn convert(
327 &self,
328 batch: &Batch,
329 cache_strategy: &CacheStrategy,
330 ) -> common_recordbatch::error::Result<RecordBatch> {
331 if self.is_empty_projection {
332 return RecordBatch::new_with_count(self.output_schema.clone(), batch.num_rows());
333 }
334
335 debug_assert_eq!(self.batch_fields.len(), batch.fields().len());
336 debug_assert!(
337 self.batch_fields
338 .iter()
339 .zip(batch.fields())
340 .all(|((id, _), batch_col)| *id == batch_col.column_id)
341 );
342
343 let pk_values = if self.has_tags {
345 match batch.pk_values() {
346 Some(v) => v.clone(),
347 None => self
348 .codec
349 .decode(batch.primary_key())
350 .map_err(BoxedError::new)
351 .context(ExternalSnafu)?,
352 }
353 } else {
354 CompositeValues::Dense(vec![])
355 };
356
357 let mut columns = Vec::with_capacity(self.output_schema.num_columns());
358 let num_rows = batch.num_rows();
359 for (index, column_schema) in self
360 .batch_indices
361 .iter()
362 .zip(self.output_schema.column_schemas())
363 {
364 match index {
365 BatchIndex::Tag((idx, column_id)) => {
366 let value = match &pk_values {
367 CompositeValues::Dense(v) => &v[*idx].1,
368 CompositeValues::Sparse(v) => v.get_or_null(*column_id),
369 };
370 let vector = repeated_vector_with_cache(
371 &column_schema.data_type,
372 value,
373 num_rows,
374 cache_strategy,
375 )?;
376 columns.push(vector);
377 }
378 BatchIndex::Timestamp => {
379 columns.push(batch.timestamps().clone());
380 }
381 BatchIndex::Field(idx) => {
382 columns.push(batch.fields()[*idx].data.clone());
383 }
384 }
385 }
386
387 RecordBatch::new(self.output_schema.clone(), columns)
388 }
389}
390
391pub(crate) fn read_column_ids_from_projection(
392 metadata: &RegionMetadataRef,
393 projection: &[usize],
394) -> Result<Vec<ColumnId>> {
395 let mut column_ids = Vec::with_capacity(projection.len().max(1));
396 if projection.is_empty() {
397 column_ids.push(metadata.time_index_column().column_id);
398 return Ok(column_ids);
399 }
400
401 for idx in projection {
402 let column = metadata
403 .column_metadatas
404 .get(*idx)
405 .with_context(|| InvalidRequestSnafu {
406 region_id: metadata.region_id,
407 reason: format!("projection index {} is out of bound", idx),
408 })?;
409 column_ids.push(column.column_id);
410 }
411 Ok(column_ids)
412}
413
414#[derive(Debug, Clone, Copy)]
416enum BatchIndex {
417 Tag((usize, ColumnId)),
419 Timestamp,
421 Field(usize),
423}
424
425fn repeated_vector_with_cache(
427 data_type: &ConcreteDataType,
428 value: &Value,
429 num_rows: usize,
430 cache_strategy: &CacheStrategy,
431) -> common_recordbatch::error::Result<VectorRef> {
432 if let Some(vector) = cache_strategy.get_repeated_vector(data_type, value) {
433 match vector.len().cmp(&num_rows) {
436 Ordering::Less => (),
437 Ordering::Equal => return Ok(vector),
438 Ordering::Greater => return Ok(vector.slice(0, num_rows)),
439 }
440 }
441
442 let vector = new_repeated_vector(data_type, value, num_rows)?;
444 if vector.len() <= MAX_VECTOR_LENGTH_TO_CACHE {
446 cache_strategy.put_repeated_vector(value.clone(), vector.clone());
447 }
448
449 Ok(vector)
450}
451
452fn new_repeated_vector(
454 data_type: &ConcreteDataType,
455 value: &Value,
456 num_rows: usize,
457) -> common_recordbatch::error::Result<VectorRef> {
458 let mut mutable_vector = data_type.create_mutable_vector(1);
459 mutable_vector
460 .try_push_value_ref(&value.as_value_ref())
461 .map_err(BoxedError::new)
462 .context(ExternalSnafu)?;
463 let base_vector = mutable_vector.to_vector();
465 Ok(base_vector.replicate(&[num_rows]))
466}
467
468#[cfg(test)]
469mod tests {
470 use std::sync::Arc;
471
472 use api::v1::OpType;
473 use datatypes::arrow::array::{Int64Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
474 use datatypes::arrow::datatypes::Field;
475 use datatypes::arrow::util::pretty;
476 use datatypes::value::ValueRef;
477 use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodecExt, SortField};
478 use mito_codec::test_util::TestRegionMetadataBuilder;
479 use store_api::storage::consts::{
480 OP_TYPE_COLUMN_NAME, PRIMARY_KEY_COLUMN_NAME, SEQUENCE_COLUMN_NAME,
481 };
482
483 use super::*;
484 use crate::cache::CacheManager;
485 use crate::read::BatchBuilder;
486
487 fn new_batch(
488 ts_start: i64,
489 tags: &[i64],
490 fields: &[(ColumnId, i64)],
491 num_rows: usize,
492 ) -> Batch {
493 let converter = DensePrimaryKeyCodec::with_fields(
494 (0..tags.len())
495 .map(|idx| {
496 (
497 idx as u32,
498 SortField::new(ConcreteDataType::int64_datatype()),
499 )
500 })
501 .collect(),
502 );
503 let primary_key = converter
504 .encode(tags.iter().map(|v| ValueRef::Int64(*v)))
505 .unwrap();
506
507 let mut builder = BatchBuilder::new(primary_key);
508 builder
509 .timestamps_array(Arc::new(TimestampMillisecondArray::from_iter_values(
510 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
511 )))
512 .unwrap()
513 .sequences_array(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)))
514 .unwrap()
515 .op_types_array(Arc::new(UInt8Array::from_iter_values(
516 (0..num_rows).map(|_| OpType::Put as u8),
517 )))
518 .unwrap();
519 for (column_id, field) in fields {
520 builder
521 .push_field_array(
522 *column_id,
523 Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
524 *field, num_rows,
525 ))),
526 )
527 .unwrap();
528 }
529 builder.build().unwrap()
530 }
531
532 fn print_record_batch(record_batch: RecordBatch) -> String {
533 pretty::pretty_format_batches(&[record_batch.into_df_record_batch()])
534 .unwrap()
535 .to_string()
536 }
537
538 #[test]
539 fn test_projection_mapper_all() {
540 let metadata = Arc::new(
541 TestRegionMetadataBuilder::default()
542 .num_tags(2)
543 .num_fields(2)
544 .build(),
545 );
546 let mapper = ProjectionMapper::all(&metadata, false).unwrap();
548 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
549 assert_eq!(
550 [
551 (3, ConcreteDataType::int64_datatype()),
552 (4, ConcreteDataType::int64_datatype())
553 ],
554 mapper.as_primary_key().unwrap().batch_fields()
555 );
556
557 let cache = CacheManager::builder().vector_cache_size(1024).build();
559 let cache = CacheStrategy::EnableAll(Arc::new(cache));
560 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
561 let record_batch = mapper
562 .as_primary_key()
563 .unwrap()
564 .convert(&batch, &cache)
565 .unwrap();
566 let expect = "\
567+---------------------+----+----+----+----+
568| ts | k0 | k1 | v0 | v1 |
569+---------------------+----+----+----+----+
570| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
571| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
572| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
573+---------------------+----+----+----+----+";
574 assert_eq!(expect, print_record_batch(record_batch));
575
576 assert!(
577 cache
578 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(1))
579 .is_some()
580 );
581 assert!(
582 cache
583 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(2))
584 .is_some()
585 );
586 assert!(
587 cache
588 .get_repeated_vector(&ConcreteDataType::int64_datatype(), &Value::Int64(3))
589 .is_none()
590 );
591 let record_batch = mapper
592 .as_primary_key()
593 .unwrap()
594 .convert(&batch, &cache)
595 .unwrap();
596 assert_eq!(expect, print_record_batch(record_batch));
597 }
598
599 #[test]
600 fn test_projection_mapper_with_projection() {
601 let metadata = Arc::new(
602 TestRegionMetadataBuilder::default()
603 .num_tags(2)
604 .num_fields(2)
605 .build(),
606 );
607 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), false).unwrap();
609 assert_eq!([4, 1], mapper.column_ids());
610 assert_eq!(
611 [(4, ConcreteDataType::int64_datatype())],
612 mapper.as_primary_key().unwrap().batch_fields()
613 );
614
615 let batch = new_batch(0, &[1, 2], &[(4, 4)], 3);
616 let cache = CacheManager::builder().vector_cache_size(1024).build();
617 let cache = CacheStrategy::EnableAll(Arc::new(cache));
618 let record_batch = mapper
619 .as_primary_key()
620 .unwrap()
621 .convert(&batch, &cache)
622 .unwrap();
623 let expect = "\
624+----+----+
625| v1 | k0 |
626+----+----+
627| 4 | 1 |
628| 4 | 1 |
629| 4 | 1 |
630+----+----+";
631 assert_eq!(expect, print_record_batch(record_batch));
632 }
633
634 #[test]
635 fn test_projection_mapper_read_superset() {
636 let metadata = Arc::new(
637 TestRegionMetadataBuilder::default()
638 .num_tags(2)
639 .num_fields(2)
640 .build(),
641 );
642 let mapper = ProjectionMapper::new_with_read_columns(
644 &metadata,
645 [4, 1].into_iter(),
646 false,
647 vec![4, 1, 3],
648 )
649 .unwrap();
650 assert_eq!([4, 1, 3], mapper.column_ids());
651
652 let batch = new_batch(0, &[1, 2], &[(3, 3), (4, 4)], 3);
653 let cache = CacheManager::builder().vector_cache_size(1024).build();
654 let cache = CacheStrategy::EnableAll(Arc::new(cache));
655 let record_batch = mapper
656 .as_primary_key()
657 .unwrap()
658 .convert(&batch, &cache)
659 .unwrap();
660 let expect = "\
661+----+----+
662| v1 | k0 |
663+----+----+
664| 4 | 1 |
665| 4 | 1 |
666| 4 | 1 |
667+----+----+";
668 assert_eq!(expect, print_record_batch(record_batch));
669 }
670
671 #[test]
672 fn test_projection_mapper_empty_projection() {
673 let metadata = Arc::new(
674 TestRegionMetadataBuilder::default()
675 .num_tags(2)
676 .num_fields(2)
677 .build(),
678 );
679 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), false).unwrap();
681 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
683 let pk_mapper = mapper.as_primary_key().unwrap();
684 assert!(pk_mapper.batch_fields().is_empty());
685 assert!(!pk_mapper.has_tags);
686 assert!(pk_mapper.batch_indices.is_empty());
687 assert!(pk_mapper.is_empty_projection);
688
689 let batch = new_batch(0, &[1, 2], &[], 3);
690 let cache = CacheManager::builder().vector_cache_size(1024).build();
691 let cache = CacheStrategy::EnableAll(Arc::new(cache));
692 let record_batch = pk_mapper.convert(&batch, &cache).unwrap();
693 assert_eq!(3, record_batch.num_rows());
694 assert_eq!(0, record_batch.num_columns());
695 assert!(record_batch.schema.is_empty());
696 }
697
698 fn new_flat_batch(
699 ts_start: Option<i64>,
700 idx_tags: &[(usize, i64)],
701 idx_fields: &[(usize, i64)],
702 num_rows: usize,
703 ) -> datatypes::arrow::record_batch::RecordBatch {
704 let mut columns = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
705 let mut fields = Vec::with_capacity(1 + idx_tags.len() + idx_fields.len() + 3);
706
707 for (i, tag) in idx_tags {
711 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
712 *tag, num_rows,
713 ))) as _;
714 columns.push(array);
715 fields.push(Field::new(
716 format!("k{i}"),
717 datatypes::arrow::datatypes::DataType::Int64,
718 true,
719 ));
720 }
721
722 for (i, field) in idx_fields {
724 let array = Arc::new(Int64Array::from_iter_values(std::iter::repeat_n(
725 *field, num_rows,
726 ))) as _;
727 columns.push(array);
728 fields.push(Field::new(
729 format!("v{i}"),
730 datatypes::arrow::datatypes::DataType::Int64,
731 true,
732 ));
733 }
734
735 if let Some(ts_start) = ts_start {
737 let timestamps = Arc::new(TimestampMillisecondArray::from_iter_values(
738 (0..num_rows).map(|i| ts_start + i as i64 * 1000),
739 )) as _;
740 columns.push(timestamps);
741 fields.push(Field::new(
742 "ts",
743 datatypes::arrow::datatypes::DataType::Timestamp(
744 datatypes::arrow::datatypes::TimeUnit::Millisecond,
745 None,
746 ),
747 true,
748 ));
749 }
750
751 let converter = DensePrimaryKeyCodec::with_fields(
754 (0..idx_tags.len())
755 .map(|idx| {
756 (
757 idx as u32,
758 SortField::new(ConcreteDataType::int64_datatype()),
759 )
760 })
761 .collect(),
762 );
763 let encoded_pk = converter
764 .encode(idx_tags.iter().map(|(_, v)| ValueRef::Int64(*v)))
765 .unwrap();
766
767 let pk_values: Vec<&[u8]> = std::iter::repeat_n(encoded_pk.as_slice(), num_rows).collect();
769 let keys = datatypes::arrow::array::UInt32Array::from_iter(0..num_rows as u32);
770 let values = Arc::new(datatypes::arrow::array::BinaryArray::from_vec(pk_values));
771 let pk_array =
772 Arc::new(datatypes::arrow::array::DictionaryArray::try_new(keys, values).unwrap()) as _;
773 columns.push(pk_array);
774 fields.push(Field::new_dictionary(
775 PRIMARY_KEY_COLUMN_NAME,
776 datatypes::arrow::datatypes::DataType::UInt32,
777 datatypes::arrow::datatypes::DataType::Binary,
778 false,
779 ));
780
781 columns.push(Arc::new(UInt64Array::from_iter_values(0..num_rows as u64)) as _);
783 fields.push(Field::new(
784 SEQUENCE_COLUMN_NAME,
785 datatypes::arrow::datatypes::DataType::UInt64,
786 false,
787 ));
788
789 columns.push(Arc::new(UInt8Array::from_iter_values(
791 (0..num_rows).map(|_| OpType::Put as u8),
792 )) as _);
793 fields.push(Field::new(
794 OP_TYPE_COLUMN_NAME,
795 datatypes::arrow::datatypes::DataType::UInt8,
796 false,
797 ));
798
799 let schema = Arc::new(datatypes::arrow::datatypes::Schema::new(fields));
800
801 datatypes::arrow::record_batch::RecordBatch::try_new(schema, columns).unwrap()
802 }
803
804 #[test]
805 fn test_flat_projection_mapper_all() {
806 let metadata = Arc::new(
807 TestRegionMetadataBuilder::default()
808 .num_tags(2)
809 .num_fields(2)
810 .build(),
811 );
812 let mapper = ProjectionMapper::all(&metadata, true).unwrap();
813 assert_eq!([0, 1, 2, 3, 4], mapper.column_ids());
814 assert_eq!(
815 [
816 (1, ConcreteDataType::int64_datatype()),
817 (2, ConcreteDataType::int64_datatype()),
818 (3, ConcreteDataType::int64_datatype()),
819 (4, ConcreteDataType::int64_datatype()),
820 (0, ConcreteDataType::timestamp_millisecond_datatype())
821 ],
822 mapper.as_flat().unwrap().batch_schema()
823 );
824
825 let batch = new_flat_batch(Some(0), &[(1, 1), (2, 2)], &[(3, 3), (4, 4)], 3);
826 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
827 let expect = "\
828+---------------------+----+----+----+----+
829| ts | k0 | k1 | v0 | v1 |
830+---------------------+----+----+----+----+
831| 1970-01-01T00:00:00 | 1 | 2 | 3 | 4 |
832| 1970-01-01T00:00:01 | 1 | 2 | 3 | 4 |
833| 1970-01-01T00:00:02 | 1 | 2 | 3 | 4 |
834+---------------------+----+----+----+----+";
835 assert_eq!(expect, print_record_batch(record_batch));
836 }
837
838 #[test]
839 fn test_flat_projection_mapper_with_projection() {
840 let metadata = Arc::new(
841 TestRegionMetadataBuilder::default()
842 .num_tags(2)
843 .num_fields(2)
844 .build(),
845 );
846 let mapper = ProjectionMapper::new(&metadata, [4, 1].into_iter(), true).unwrap();
848 assert_eq!([4, 1], mapper.column_ids());
849 assert_eq!(
850 [
851 (1, ConcreteDataType::int64_datatype()),
852 (4, ConcreteDataType::int64_datatype()),
853 (0, ConcreteDataType::timestamp_millisecond_datatype())
854 ],
855 mapper.as_flat().unwrap().batch_schema()
856 );
857
858 let batch = new_flat_batch(None, &[(1, 1)], &[(4, 4)], 3);
859 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
860 let expect = "\
861+----+----+
862| v1 | k0 |
863+----+----+
864| 4 | 1 |
865| 4 | 1 |
866| 4 | 1 |
867+----+----+";
868 assert_eq!(expect, print_record_batch(record_batch));
869 }
870
871 #[test]
872 fn test_flat_projection_mapper_read_superset() {
873 let metadata = Arc::new(
874 TestRegionMetadataBuilder::default()
875 .num_tags(2)
876 .num_fields(2)
877 .build(),
878 );
879 let mapper = ProjectionMapper::new_with_read_columns(
881 &metadata,
882 [4, 1].into_iter(),
883 true,
884 vec![4, 1, 3],
885 )
886 .unwrap();
887 assert_eq!([4, 1, 3], mapper.column_ids());
888
889 let batch = new_flat_batch(None, &[(1, 1)], &[(3, 3), (4, 4)], 3);
890 let record_batch = mapper.as_flat().unwrap().convert(&batch).unwrap();
891 let expect = "\
892+----+----+
893| v1 | k0 |
894+----+----+
895| 4 | 1 |
896| 4 | 1 |
897| 4 | 1 |
898+----+----+";
899 assert_eq!(expect, print_record_batch(record_batch));
900 }
901
902 #[test]
903 fn test_flat_projection_mapper_empty_projection() {
904 let metadata = Arc::new(
905 TestRegionMetadataBuilder::default()
906 .num_tags(2)
907 .num_fields(2)
908 .build(),
909 );
910 let mapper = ProjectionMapper::new(&metadata, [].into_iter(), true).unwrap();
912 assert_eq!([0], mapper.column_ids()); assert!(mapper.output_schema().is_empty());
914 let flat_mapper = mapper.as_flat().unwrap();
915 assert_eq!(
916 [(0, ConcreteDataType::timestamp_millisecond_datatype())],
917 flat_mapper.batch_schema()
918 );
919
920 let batch = new_flat_batch(Some(0), &[], &[], 3);
921 let record_batch = flat_mapper.convert(&batch).unwrap();
922 assert_eq!(3, record_batch.num_rows());
923 assert_eq!(0, record_batch.num_columns());
924 assert!(record_batch.schema.is_empty());
925 }
926}