1use std::borrow::Cow;
20use std::collections::HashSet;
21use std::sync::Arc;
22
23use api::v1::SemanticType;
24use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
25use datatypes::arrow::datatypes::{Field, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::{ConcreteDataType, DataType, Vector};
28use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
29use snafu::ResultExt;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::error::{
34 DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result,
35};
36use crate::memtable::BoxedBatchIterator;
37use crate::read::Batch;
38use crate::sst::{internal_fields, tag_maybe_to_dictionary_field};
39
40pub struct BatchToRecordBatchAdapter {
43 iter: BoxedBatchIterator,
44 codec: Arc<dyn PrimaryKeyCodec>,
45 output_schema: SchemaRef,
46 projected_pk: Vec<ProjectedPkColumn>,
47}
48
49struct ProjectedPkColumn {
50 column_id: ColumnId,
51 pk_index: usize,
52 data_type: ConcreteDataType,
53}
54
55impl BatchToRecordBatchAdapter {
56 pub fn new(
63 iter: BoxedBatchIterator,
64 metadata: RegionMetadataRef,
65 codec: Arc<dyn PrimaryKeyCodec>,
66 read_column_ids: &[ColumnId],
67 ) -> Self {
68 let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect();
69 let projected_pk = metadata
70 .primary_key_columns()
71 .enumerate()
72 .filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id))
73 .map(|(pk_index, column_metadata)| ProjectedPkColumn {
74 column_id: column_metadata.column_id,
75 pk_index,
76 data_type: column_metadata.column_schema.data_type.clone(),
77 })
78 .collect();
79 let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set);
80
81 Self {
82 iter,
83 codec,
84 output_schema,
85 projected_pk,
86 }
87 }
88
89 fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
91 let num_rows = batch.num_rows();
92
93 let pk_values = if let Some(vals) = batch.pk_values() {
94 Cow::Borrowed(vals)
95 } else {
96 Cow::Owned(
97 self.codec
98 .decode(batch.primary_key())
99 .context(DecodeSnafu)?,
100 )
101 };
102
103 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
104 for pk_column in &self.projected_pk {
105 if pk_column.data_type.is_string() {
106 let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
107 columns.push(build_string_tag_dict_array(
108 value,
109 &pk_column.data_type,
110 num_rows,
111 ));
112 } else {
113 let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
114 let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?;
115 columns.push(array);
116 }
117 }
118 for batch_col in batch.fields() {
119 columns.push(batch_col.data.to_arrow_array());
120 }
121
122 columns.push(batch.timestamps().to_arrow_array());
123
124 let pk_bytes = batch.primary_key();
126 let values = Arc::new(BinaryArray::from_iter_values([pk_bytes]));
127 let keys = UInt32Array::from(vec![0u32; num_rows]);
128 let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values));
129 columns.push(pk_dict);
130
131 columns.push(batch.sequences().to_arrow_array());
133
134 columns.push(batch.op_types().to_arrow_array());
136
137 RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu)
138 }
139}
140
141impl Iterator for BatchToRecordBatchAdapter {
142 type Item = Result<RecordBatch>;
143
144 fn next(&mut self) -> Option<Self::Item> {
145 loop {
146 match self.iter.next()? {
147 Ok(batch) => {
148 if batch.is_empty() {
149 continue;
150 }
151 return Some(self.convert_batch(&batch));
152 }
153 Err(e) => return Some(Err(e)),
154 }
155 }
156 }
157}
158
159fn get_pk_value(
161 pk_values: &CompositeValues,
162 column_id: ColumnId,
163 pk_index: usize,
164) -> &datatypes::value::Value {
165 match pk_values {
166 CompositeValues::Dense(dense) => {
167 if pk_index < dense.len() {
168 &dense[pk_index].1
169 } else {
170 &datatypes::value::Value::Null
171 }
172 }
173 CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id),
174 }
175}
176
177fn build_repeated_value_array(
179 value: &datatypes::value::Value,
180 data_type: &ConcreteDataType,
181 num_rows: usize,
182) -> Result<ArrayRef> {
183 let scalar = value
184 .try_to_scalar_value(data_type)
185 .context(DataTypeMismatchSnafu)?;
186 scalar
187 .to_array_of_size(num_rows)
188 .context(EvalPartitionFilterSnafu)
189}
190
191fn build_string_tag_dict_array(
193 value: &datatypes::value::Value,
194 data_type: &ConcreteDataType,
195 num_rows: usize,
196) -> ArrayRef {
197 let mut builder = data_type.create_mutable_vector(1);
198 builder.push_value_ref(&value.as_value_ref());
199 let values = builder.to_vector().to_arrow_array();
200
201 let keys = UInt32Array::from(vec![0u32; num_rows]);
202 Arc::new(DictionaryArray::new(keys, values))
203}
204
205fn compute_output_arrow_schema(
206 metadata: &RegionMetadataRef,
207 read_column_id_set: &HashSet<ColumnId>,
208) -> SchemaRef {
209 let mut fields = Vec::new();
210
211 for column_metadata in metadata.primary_key_columns() {
212 if !read_column_id_set.contains(&column_metadata.column_id) {
213 continue;
214 }
215 let field = Arc::new(Field::new(
216 &column_metadata.column_schema.name,
217 column_metadata.column_schema.data_type.as_arrow_type(),
218 column_metadata.column_schema.is_nullable(),
219 ));
220 let field = if column_metadata.semantic_type == SemanticType::Tag {
221 tag_maybe_to_dictionary_field(&column_metadata.column_schema.data_type, &field)
222 } else {
223 field
224 };
225 fields.push(field);
226 }
227
228 for column_metadata in metadata.field_columns() {
229 if !read_column_id_set.contains(&column_metadata.column_id) {
230 continue;
231 }
232 let field = Arc::new(Field::new(
233 &column_metadata.column_schema.name,
234 column_metadata.column_schema.data_type.as_arrow_type(),
235 column_metadata.column_schema.is_nullable(),
236 ));
237 fields.push(field);
238 }
239
240 let time_index = metadata.time_index_column();
241 let time_index_field = Arc::new(Field::new(
242 &time_index.column_schema.name,
243 time_index.column_schema.data_type.as_arrow_type(),
244 time_index.column_schema.is_nullable(),
245 ));
246 fields.push(time_index_field);
247 fields.extend(internal_fields().iter().cloned());
248
249 Arc::new(datatypes::arrow::datatypes::Schema::new(fields))
250}
251
252#[cfg(test)]
253mod tests {
254 use std::sync::Arc;
255
256 use api::v1::{OpType, SemanticType};
257 use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
258 use datatypes::arrow::datatypes::UInt32Type;
259 use datatypes::prelude::ConcreteDataType;
260 use datatypes::schema::ColumnSchema;
261 use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
262 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
263 use store_api::storage::RegionId;
264
265 use super::*;
266 use crate::read::flat_projection::FlatProjectionMapper;
267 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
268 use crate::test_util::new_batch_builder;
269 use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
270
271 fn build_adapter(
273 batches: Vec<Batch>,
274 metadata: &RegionMetadataRef,
275 codec: &Arc<dyn PrimaryKeyCodec>,
276 ) -> BatchToRecordBatchAdapter {
277 let read_column_ids = metadata
278 .column_metadatas
279 .iter()
280 .map(|column| column.column_id)
281 .collect::<Vec<_>>();
282 let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok));
283 BatchToRecordBatchAdapter::new(
284 iter,
285 Arc::clone(metadata),
286 Arc::clone(codec),
287 &read_column_ids,
288 )
289 }
290
291 #[test]
292 fn test_single_batch_two_tags() {
293 let metadata = Arc::new(sst_region_metadata());
295 let codec = build_primary_key_codec(&metadata);
296
297 let pk = new_primary_key(&["host-1", "region-a"]);
298 let batch = new_batch_builder(
299 &pk,
300 &[1, 2, 3],
301 &[100, 100, 100],
302 &[OpType::Put, OpType::Put, OpType::Put],
303 2,
304 &[10, 20, 30],
305 )
306 .build()
307 .unwrap();
308
309 let adapter = build_adapter(vec![batch], &metadata, &codec);
310 let results: Vec<_> = adapter.collect::<Vec<_>>();
311 assert_eq!(1, results.len());
312
313 let rb = results[0].as_ref().unwrap();
314 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
315 assert_eq!(rb.schema(), expected_schema);
316 assert_eq!(3, rb.num_rows());
317 assert_eq!(7, rb.num_columns());
319 }
320
321 #[test]
322 fn test_multiple_batches() {
323 let metadata = Arc::new(sst_region_metadata());
324 let codec = build_primary_key_codec(&metadata);
325
326 let pk1 = new_primary_key(&["a", "b"]);
327 let batch1 = new_batch_builder(
328 &pk1,
329 &[1, 2],
330 &[100, 100],
331 &[OpType::Put, OpType::Put],
332 2,
333 &[10, 20],
334 )
335 .build()
336 .unwrap();
337
338 let pk2 = new_primary_key(&["c", "d"]);
339 let batch2 = new_batch_builder(
340 &pk2,
341 &[3, 4],
342 &[200, 200],
343 &[OpType::Put, OpType::Put],
344 2,
345 &[30, 40],
346 )
347 .build()
348 .unwrap();
349
350 let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec);
351 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
352 assert_eq!(2, results.len());
353
354 assert_eq!(2, results[0].num_rows());
355 assert_eq!(2, results[1].num_rows());
356 }
357
358 #[test]
359 fn test_empty_batch_skipped() {
360 let metadata = Arc::new(sst_region_metadata());
361 let codec = build_primary_key_codec(&metadata);
362
363 let empty = Batch::empty();
364 let pk = new_primary_key(&["x", "y"]);
365 let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42])
366 .build()
367 .unwrap();
368
369 let adapter = build_adapter(vec![empty, batch], &metadata, &codec);
370 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
371 assert_eq!(1, results.len());
372 assert_eq!(1, results[0].num_rows());
373 }
374
375 #[test]
376 fn test_no_tags() {
377 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
379 builder
380 .push_column_metadata(ColumnMetadata {
381 column_schema: ColumnSchema::new(
382 "field_0".to_string(),
383 ConcreteDataType::uint64_datatype(),
384 true,
385 ),
386 semantic_type: SemanticType::Field,
387 column_id: 0,
388 })
389 .push_column_metadata(ColumnMetadata {
390 column_schema: ColumnSchema::new(
391 "ts".to_string(),
392 ConcreteDataType::timestamp_millisecond_datatype(),
393 false,
394 ),
395 semantic_type: SemanticType::Timestamp,
396 column_id: 1,
397 });
398 builder.primary_key(vec![]);
399 let metadata = Arc::new(builder.build().unwrap());
400 let codec = build_primary_key_codec(&metadata);
401
402 let pk = vec![];
404 let batch = new_batch_builder(
405 &pk,
406 &[1, 2],
407 &[100, 100],
408 &[OpType::Put, OpType::Put],
409 0,
410 &[10, 20],
411 )
412 .build()
413 .unwrap();
414
415 let adapter = build_adapter(vec![batch], &metadata, &codec);
416 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
417 assert_eq!(1, results.len());
418
419 let rb = &results[0];
420 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
421 assert_eq!(rb.schema(), expected_schema);
422 assert_eq!(5, rb.num_columns());
424 assert_eq!(2, rb.num_rows());
425 }
426
427 #[test]
428 fn test_primary_key_dict_column() {
429 let metadata = Arc::new(sst_region_metadata());
431 let codec = build_primary_key_codec(&metadata);
432
433 let pk = new_primary_key(&["host", "az"]);
434 let batch = new_batch_builder(
435 &pk,
436 &[1, 2],
437 &[1, 1],
438 &[OpType::Put, OpType::Put],
439 2,
440 &[5, 6],
441 )
442 .build()
443 .unwrap();
444
445 let adapter = build_adapter(vec![batch.clone()], &metadata, &codec);
446 let rb = adapter.into_iter().next().unwrap().unwrap();
447
448 let pk_col_idx = rb.num_columns() - 3;
450 let pk_array = rb
451 .column(pk_col_idx)
452 .as_any()
453 .downcast_ref::<DictionaryArray<UInt32Type>>()
454 .expect("should be DictionaryArray<UInt32>");
455
456 assert_eq!(2, pk_array.len());
458 assert_eq!(0, pk_array.keys().value(0));
459 assert_eq!(0, pk_array.keys().value(1));
460
461 let values = pk_array
463 .values()
464 .as_any()
465 .downcast_ref::<BinaryArray>()
466 .unwrap();
467 assert_eq!(1, values.len());
468 assert_eq!(batch.primary_key(), values.value(0));
469 }
470
471 #[test]
472 fn test_sequence_and_op_type_columns() {
473 let metadata = Arc::new(sst_region_metadata());
474 let codec = build_primary_key_codec(&metadata);
475
476 let pk = new_primary_key(&["a", "b"]);
477 let batch = new_batch_builder(
478 &pk,
479 &[10, 20, 30],
480 &[1, 2, 3],
481 &[OpType::Put, OpType::Delete, OpType::Put],
482 2,
483 &[100, 200, 300],
484 )
485 .build()
486 .unwrap();
487
488 let adapter = build_adapter(vec![batch], &metadata, &codec);
489 let rb = adapter.into_iter().next().unwrap().unwrap();
490
491 let seq_idx = rb.num_columns() - 2;
493 let seq_array = rb
494 .column(seq_idx)
495 .as_any()
496 .downcast_ref::<UInt64Array>()
497 .unwrap();
498 assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref());
499
500 let op_idx = rb.num_columns() - 1;
502 let op_array = rb
503 .column(op_idx)
504 .as_any()
505 .downcast_ref::<UInt8Array>()
506 .unwrap();
507 assert_eq!(
508 &[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8],
509 op_array.values().as_ref()
510 );
511 }
512
513 #[test]
514 fn test_integer_tag_column() {
515 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
517 builder
518 .push_column_metadata(ColumnMetadata {
519 column_schema: ColumnSchema::new(
520 "tag_0".to_string(),
521 ConcreteDataType::uint32_datatype(),
522 false,
523 ),
524 semantic_type: SemanticType::Tag,
525 column_id: 0,
526 })
527 .push_column_metadata(ColumnMetadata {
528 column_schema: ColumnSchema::new(
529 "field_0".to_string(),
530 ConcreteDataType::uint64_datatype(),
531 true,
532 ),
533 semantic_type: SemanticType::Field,
534 column_id: 1,
535 })
536 .push_column_metadata(ColumnMetadata {
537 column_schema: ColumnSchema::new(
538 "ts".to_string(),
539 ConcreteDataType::timestamp_millisecond_datatype(),
540 false,
541 ),
542 semantic_type: SemanticType::Timestamp,
543 column_id: 2,
544 });
545 builder.primary_key(vec![0]);
546 let metadata = Arc::new(builder.build().unwrap());
547 let codec = build_primary_key_codec(&metadata);
548
549 let pk = {
551 use datatypes::value::ValueRef;
552 use mito_codec::row_converter::PrimaryKeyCodecExt;
553 let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![(
554 0,
555 mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
556 )]);
557 codec_ext
558 .encode([ValueRef::UInt32(42)].into_iter())
559 .unwrap()
560 };
561 let batch = new_batch_builder(
562 &pk,
563 &[1, 2],
564 &[1, 1],
565 &[OpType::Put, OpType::Put],
566 1,
567 &[10, 20],
568 )
569 .build()
570 .unwrap();
571
572 let adapter = build_adapter(vec![batch], &metadata, &codec);
573 let rb = adapter.into_iter().next().unwrap().unwrap();
574
575 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
576 assert_eq!(rb.schema(), expected_schema);
577
578 let tag_array = rb
580 .column(0)
581 .as_any()
582 .downcast_ref::<UInt32Array>()
583 .expect("integer tag should be a plain UInt32Array");
584 assert_eq!(&[42u32, 42], tag_array.values().as_ref());
585 }
586
587 #[test]
588 fn test_with_precomputed_pk_values() {
589 let metadata = Arc::new(sst_region_metadata());
592 let codec = build_primary_key_codec(&metadata);
593
594 let pk = new_primary_key(&["pre", "computed"]);
595 let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99])
596 .build()
597 .unwrap();
598
599 let decoded = codec.decode(&pk).unwrap();
601 batch.set_pk_values(decoded);
602
603 let adapter = build_adapter(vec![batch], &metadata, &codec);
604 let rb = adapter.into_iter().next().unwrap().unwrap();
605 assert_eq!(1, rb.num_rows());
606
607 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
608 assert_eq!(rb.schema(), expected_schema);
609 }
610
611 #[test]
612 fn test_partial_projection_schema_matches_mapper() {
613 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
614 builder
615 .push_column_metadata(ColumnMetadata {
616 column_schema: ColumnSchema::new(
617 "tag_0".to_string(),
618 ConcreteDataType::string_datatype(),
619 true,
620 ),
621 semantic_type: SemanticType::Tag,
622 column_id: 0,
623 })
624 .push_column_metadata(ColumnMetadata {
625 column_schema: ColumnSchema::new(
626 "tag_1".to_string(),
627 ConcreteDataType::string_datatype(),
628 true,
629 ),
630 semantic_type: SemanticType::Tag,
631 column_id: 1,
632 })
633 .push_column_metadata(ColumnMetadata {
634 column_schema: ColumnSchema::new(
635 "field_0".to_string(),
636 ConcreteDataType::uint64_datatype(),
637 true,
638 ),
639 semantic_type: SemanticType::Field,
640 column_id: 2,
641 })
642 .push_column_metadata(ColumnMetadata {
643 column_schema: ColumnSchema::new(
644 "field_1".to_string(),
645 ConcreteDataType::uint64_datatype(),
646 true,
647 ),
648 semantic_type: SemanticType::Field,
649 column_id: 3,
650 })
651 .push_column_metadata(ColumnMetadata {
652 column_schema: ColumnSchema::new(
653 "ts".to_string(),
654 ConcreteDataType::timestamp_millisecond_datatype(),
655 false,
656 ),
657 semantic_type: SemanticType::Timestamp,
658 column_id: 4,
659 });
660 builder.primary_key(vec![0, 1]);
661 let metadata = Arc::new(builder.build().unwrap());
662 let codec = build_primary_key_codec(&metadata);
663
664 let read_column_ids = vec![0, 3];
666
667 let pk = new_primary_key(&["host-1", "region-a"]);
668 let batch = new_batch_builder(
669 &pk,
670 &[1, 2, 3],
671 &[100, 100, 100],
672 &[OpType::Put, OpType::Put, OpType::Put],
673 3,
674 &[10, 20, 30],
675 )
676 .build()
677 .unwrap();
678
679 let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter());
680 let adapter =
681 BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
682 let rb = adapter.into_iter().next().unwrap().unwrap();
683
684 let mapper = FlatProjectionMapper::new(&metadata, [0, 3].into_iter()).unwrap();
685 assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
686 assert_eq!(6, rb.num_columns());
688 assert_eq!(3, rb.num_rows());
689
690 let field_1 = rb.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
691 assert_eq!(&[10u64, 20, 30], field_1.values().as_ref());
692
693 let ts = rb
694 .column(2)
695 .as_any()
696 .downcast_ref::<TimestampMillisecondArray>()
697 .unwrap();
698 assert_eq!(&[1i64, 2, 3], ts.values().as_ref());
699 }
700}