1use std::borrow::Cow;
20use std::collections::HashSet;
21use std::sync::Arc;
22
23use api::v1::SemanticType;
24use datatypes::arrow::array::{ArrayRef, BinaryArray, DictionaryArray, UInt32Array};
25use datatypes::arrow::datatypes::{Field, SchemaRef};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::prelude::{ConcreteDataType, DataType, Vector};
28use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
29use snafu::ResultExt;
30use store_api::metadata::RegionMetadataRef;
31use store_api::storage::ColumnId;
32
33use crate::error::{
34 DataTypeMismatchSnafu, DecodeSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu, Result,
35};
36use crate::memtable::BoxedBatchIterator;
37use crate::read::Batch;
38use crate::sst::{internal_fields, tag_maybe_to_dictionary_field, with_field_id};
39
40pub struct BatchToRecordBatchAdapter {
43 iter: BoxedBatchIterator,
44 codec: Arc<dyn PrimaryKeyCodec>,
45 output_schema: SchemaRef,
46 projected_pk: Vec<ProjectedPkColumn>,
47}
48
49struct ProjectedPkColumn {
50 column_id: ColumnId,
51 pk_index: usize,
52 data_type: ConcreteDataType,
53}
54
55impl BatchToRecordBatchAdapter {
56 pub fn new(
63 iter: BoxedBatchIterator,
64 metadata: RegionMetadataRef,
65 codec: Arc<dyn PrimaryKeyCodec>,
66 read_column_ids: &[ColumnId],
67 ) -> Self {
68 let read_column_id_set: HashSet<_> = read_column_ids.iter().copied().collect();
69 let projected_pk = metadata
70 .primary_key_columns()
71 .enumerate()
72 .filter(|(_, column_metadata)| read_column_id_set.contains(&column_metadata.column_id))
73 .map(|(pk_index, column_metadata)| ProjectedPkColumn {
74 column_id: column_metadata.column_id,
75 pk_index,
76 data_type: column_metadata.column_schema.data_type.clone(),
77 })
78 .collect();
79 let output_schema = compute_output_arrow_schema(&metadata, &read_column_id_set);
80
81 Self {
82 iter,
83 codec,
84 output_schema,
85 projected_pk,
86 }
87 }
88
89 fn convert_batch(&self, batch: &Batch) -> Result<RecordBatch> {
91 let num_rows = batch.num_rows();
92
93 let pk_values = if let Some(vals) = batch.pk_values() {
94 Cow::Borrowed(vals)
95 } else {
96 Cow::Owned(
97 self.codec
98 .decode(batch.primary_key())
99 .context(DecodeSnafu)?,
100 )
101 };
102
103 let mut columns: Vec<ArrayRef> = Vec::with_capacity(self.output_schema.fields().len());
104 for pk_column in &self.projected_pk {
105 if pk_column.data_type.is_string() {
106 let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
107 columns.push(build_string_tag_dict_array(
108 value,
109 &pk_column.data_type,
110 num_rows,
111 ));
112 } else {
113 let value = get_pk_value(&pk_values, pk_column.column_id, pk_column.pk_index);
114 let array = build_repeated_value_array(value, &pk_column.data_type, num_rows)?;
115 columns.push(array);
116 }
117 }
118 for batch_col in batch.fields() {
119 columns.push(batch_col.data.to_arrow_array());
120 }
121
122 columns.push(batch.timestamps().to_arrow_array());
123
124 let pk_bytes = batch.primary_key();
126 let values = Arc::new(BinaryArray::from_iter_values([pk_bytes]));
127 let keys = UInt32Array::from(vec![0u32; num_rows]);
128 let pk_dict: ArrayRef = Arc::new(DictionaryArray::new(keys, values));
129 columns.push(pk_dict);
130
131 columns.push(batch.sequences().to_arrow_array());
133
134 columns.push(batch.op_types().to_arrow_array());
136
137 RecordBatch::try_new(self.output_schema.clone(), columns).context(NewRecordBatchSnafu)
138 }
139}
140
141impl Iterator for BatchToRecordBatchAdapter {
142 type Item = Result<RecordBatch>;
143
144 fn next(&mut self) -> Option<Self::Item> {
145 loop {
146 match self.iter.next()? {
147 Ok(batch) => {
148 if batch.is_empty() {
149 continue;
150 }
151 return Some(self.convert_batch(&batch));
152 }
153 Err(e) => return Some(Err(e)),
154 }
155 }
156 }
157}
158
159fn get_pk_value(
161 pk_values: &CompositeValues,
162 column_id: ColumnId,
163 pk_index: usize,
164) -> &datatypes::value::Value {
165 match pk_values {
166 CompositeValues::Dense(dense) => {
167 if pk_index < dense.len() {
168 &dense[pk_index].1
169 } else {
170 &datatypes::value::Value::Null
171 }
172 }
173 CompositeValues::Sparse(sparse) => sparse.get_or_null(column_id),
174 }
175}
176
177fn build_repeated_value_array(
179 value: &datatypes::value::Value,
180 data_type: &ConcreteDataType,
181 num_rows: usize,
182) -> Result<ArrayRef> {
183 let scalar = value
184 .try_to_scalar_value(data_type)
185 .context(DataTypeMismatchSnafu)?;
186 scalar
187 .to_array_of_size(num_rows)
188 .context(EvalPartitionFilterSnafu)
189}
190
191fn build_string_tag_dict_array(
193 value: &datatypes::value::Value,
194 data_type: &ConcreteDataType,
195 num_rows: usize,
196) -> ArrayRef {
197 let mut builder = data_type.create_mutable_vector(1);
198 builder.push_value_ref(&value.as_value_ref());
199 let values = builder.to_vector().to_arrow_array();
200
201 let keys = UInt32Array::from(vec![0u32; num_rows]);
202 Arc::new(DictionaryArray::new(keys, values))
203}
204
205fn compute_output_arrow_schema(
206 metadata: &RegionMetadataRef,
207 read_column_id_set: &HashSet<ColumnId>,
208) -> SchemaRef {
209 let mut fields = Vec::new();
210
211 for column_metadata in metadata.primary_key_columns() {
212 if !read_column_id_set.contains(&column_metadata.column_id) {
213 continue;
214 }
215 let field = Field::new(
216 &column_metadata.column_schema.name,
217 column_metadata.column_schema.data_type.as_arrow_type(),
218 column_metadata.column_schema.is_nullable(),
219 );
220 let field = with_field_id(field, column_metadata.column_id);
221
222 if column_metadata.semantic_type == SemanticType::Tag {
223 fields.push(tag_maybe_to_dictionary_field(
224 &column_metadata.column_schema.data_type,
225 &Arc::new(field),
226 ));
227 } else {
228 fields.push(Arc::new(field));
229 }
230 }
231
232 for column_metadata in metadata.field_columns() {
233 if !read_column_id_set.contains(&column_metadata.column_id) {
234 continue;
235 }
236 let field = Field::new(
237 &column_metadata.column_schema.name,
238 column_metadata.column_schema.data_type.as_arrow_type(),
239 column_metadata.column_schema.is_nullable(),
240 );
241 fields.push(Arc::new(with_field_id(field, column_metadata.column_id)));
242 }
243
244 let time_index = metadata.time_index_column();
245 let time_index_field = Field::new(
246 &time_index.column_schema.name,
247 time_index.column_schema.data_type.as_arrow_type(),
248 time_index.column_schema.is_nullable(),
249 );
250 fields.push(Arc::new(with_field_id(
251 time_index_field,
252 time_index.column_id,
253 )));
254 fields.extend(internal_fields().iter().cloned());
255
256 Arc::new(datatypes::arrow::datatypes::Schema::new(fields))
257}
258
259#[cfg(test)]
260mod tests {
261 use std::sync::Arc;
262
263 use api::v1::{OpType, SemanticType};
264 use datatypes::arrow::array::{Array, TimestampMillisecondArray, UInt8Array, UInt64Array};
265 use datatypes::arrow::datatypes::UInt32Type;
266 use datatypes::prelude::ConcreteDataType;
267 use datatypes::schema::ColumnSchema;
268 use mito_codec::row_converter::{PrimaryKeyCodec, build_primary_key_codec};
269 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder, RegionMetadataRef};
270 use store_api::storage::RegionId;
271
272 use super::*;
273 use crate::read::flat_projection::FlatProjectionMapper;
274 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
275 use crate::test_util::new_batch_builder;
276 use crate::test_util::sst_util::{new_primary_key, sst_region_metadata};
277
278 fn build_adapter(
280 batches: Vec<Batch>,
281 metadata: &RegionMetadataRef,
282 codec: &Arc<dyn PrimaryKeyCodec>,
283 ) -> BatchToRecordBatchAdapter {
284 let read_column_ids = metadata
285 .column_metadatas
286 .iter()
287 .map(|column| column.column_id)
288 .collect::<Vec<_>>();
289 let iter: BoxedBatchIterator = Box::new(batches.into_iter().map(Ok));
290 BatchToRecordBatchAdapter::new(
291 iter,
292 Arc::clone(metadata),
293 Arc::clone(codec),
294 &read_column_ids,
295 )
296 }
297
298 #[test]
299 fn test_single_batch_two_tags() {
300 let metadata = Arc::new(sst_region_metadata());
302 let codec = build_primary_key_codec(&metadata);
303
304 let pk = new_primary_key(&["host-1", "region-a"]);
305 let batch = new_batch_builder(
306 &pk,
307 &[1, 2, 3],
308 &[100, 100, 100],
309 &[OpType::Put, OpType::Put, OpType::Put],
310 2,
311 &[10, 20, 30],
312 )
313 .build()
314 .unwrap();
315
316 let adapter = build_adapter(vec![batch], &metadata, &codec);
317 let results: Vec<_> = adapter.collect::<Vec<_>>();
318 assert_eq!(1, results.len());
319
320 let rb = results[0].as_ref().unwrap();
321 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
322 assert_eq!(rb.schema(), expected_schema);
323 assert_eq!(3, rb.num_rows());
324 assert_eq!(7, rb.num_columns());
326 }
327
328 #[test]
329 fn test_multiple_batches() {
330 let metadata = Arc::new(sst_region_metadata());
331 let codec = build_primary_key_codec(&metadata);
332
333 let pk1 = new_primary_key(&["a", "b"]);
334 let batch1 = new_batch_builder(
335 &pk1,
336 &[1, 2],
337 &[100, 100],
338 &[OpType::Put, OpType::Put],
339 2,
340 &[10, 20],
341 )
342 .build()
343 .unwrap();
344
345 let pk2 = new_primary_key(&["c", "d"]);
346 let batch2 = new_batch_builder(
347 &pk2,
348 &[3, 4],
349 &[200, 200],
350 &[OpType::Put, OpType::Put],
351 2,
352 &[30, 40],
353 )
354 .build()
355 .unwrap();
356
357 let adapter = build_adapter(vec![batch1, batch2], &metadata, &codec);
358 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
359 assert_eq!(2, results.len());
360
361 assert_eq!(2, results[0].num_rows());
362 assert_eq!(2, results[1].num_rows());
363 }
364
365 #[test]
366 fn test_empty_batch_skipped() {
367 let metadata = Arc::new(sst_region_metadata());
368 let codec = build_primary_key_codec(&metadata);
369
370 let empty = Batch::empty();
371 let pk = new_primary_key(&["x", "y"]);
372 let batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[42])
373 .build()
374 .unwrap();
375
376 let adapter = build_adapter(vec![empty, batch], &metadata, &codec);
377 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
378 assert_eq!(1, results.len());
379 assert_eq!(1, results[0].num_rows());
380 }
381
382 #[test]
383 fn test_no_tags() {
384 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
386 builder
387 .push_column_metadata(ColumnMetadata {
388 column_schema: ColumnSchema::new(
389 "field_0".to_string(),
390 ConcreteDataType::uint64_datatype(),
391 true,
392 ),
393 semantic_type: SemanticType::Field,
394 column_id: 0,
395 })
396 .push_column_metadata(ColumnMetadata {
397 column_schema: ColumnSchema::new(
398 "ts".to_string(),
399 ConcreteDataType::timestamp_millisecond_datatype(),
400 false,
401 ),
402 semantic_type: SemanticType::Timestamp,
403 column_id: 1,
404 });
405 builder.primary_key(vec![]);
406 let metadata = Arc::new(builder.build().unwrap());
407 let codec = build_primary_key_codec(&metadata);
408
409 let pk = vec![];
411 let batch = new_batch_builder(
412 &pk,
413 &[1, 2],
414 &[100, 100],
415 &[OpType::Put, OpType::Put],
416 0,
417 &[10, 20],
418 )
419 .build()
420 .unwrap();
421
422 let adapter = build_adapter(vec![batch], &metadata, &codec);
423 let results: Vec<_> = adapter.map(|r| r.unwrap()).collect();
424 assert_eq!(1, results.len());
425
426 let rb = &results[0];
427 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
428 assert_eq!(rb.schema(), expected_schema);
429 assert_eq!(5, rb.num_columns());
431 assert_eq!(2, rb.num_rows());
432 }
433
434 #[test]
435 fn test_primary_key_dict_column() {
436 let metadata = Arc::new(sst_region_metadata());
438 let codec = build_primary_key_codec(&metadata);
439
440 let pk = new_primary_key(&["host", "az"]);
441 let batch = new_batch_builder(
442 &pk,
443 &[1, 2],
444 &[1, 1],
445 &[OpType::Put, OpType::Put],
446 2,
447 &[5, 6],
448 )
449 .build()
450 .unwrap();
451
452 let adapter = build_adapter(vec![batch.clone()], &metadata, &codec);
453 let rb = adapter.into_iter().next().unwrap().unwrap();
454
455 let pk_col_idx = rb.num_columns() - 3;
457 let pk_array = rb
458 .column(pk_col_idx)
459 .as_any()
460 .downcast_ref::<DictionaryArray<UInt32Type>>()
461 .expect("should be DictionaryArray<UInt32>");
462
463 assert_eq!(2, pk_array.len());
465 assert_eq!(0, pk_array.keys().value(0));
466 assert_eq!(0, pk_array.keys().value(1));
467
468 let values = pk_array
470 .values()
471 .as_any()
472 .downcast_ref::<BinaryArray>()
473 .unwrap();
474 assert_eq!(1, values.len());
475 assert_eq!(batch.primary_key(), values.value(0));
476 }
477
478 #[test]
479 fn test_sequence_and_op_type_columns() {
480 let metadata = Arc::new(sst_region_metadata());
481 let codec = build_primary_key_codec(&metadata);
482
483 let pk = new_primary_key(&["a", "b"]);
484 let batch = new_batch_builder(
485 &pk,
486 &[10, 20, 30],
487 &[1, 2, 3],
488 &[OpType::Put, OpType::Delete, OpType::Put],
489 2,
490 &[100, 200, 300],
491 )
492 .build()
493 .unwrap();
494
495 let adapter = build_adapter(vec![batch], &metadata, &codec);
496 let rb = adapter.into_iter().next().unwrap().unwrap();
497
498 let seq_idx = rb.num_columns() - 2;
500 let seq_array = rb
501 .column(seq_idx)
502 .as_any()
503 .downcast_ref::<UInt64Array>()
504 .unwrap();
505 assert_eq!(&[1u64, 2, 3], seq_array.values().as_ref());
506
507 let op_idx = rb.num_columns() - 1;
509 let op_array = rb
510 .column(op_idx)
511 .as_any()
512 .downcast_ref::<UInt8Array>()
513 .unwrap();
514 assert_eq!(
515 &[OpType::Put as u8, OpType::Delete as u8, OpType::Put as u8],
516 op_array.values().as_ref()
517 );
518 }
519
520 #[test]
521 fn test_integer_tag_column() {
522 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
524 builder
525 .push_column_metadata(ColumnMetadata {
526 column_schema: ColumnSchema::new(
527 "tag_0".to_string(),
528 ConcreteDataType::uint32_datatype(),
529 false,
530 ),
531 semantic_type: SemanticType::Tag,
532 column_id: 0,
533 })
534 .push_column_metadata(ColumnMetadata {
535 column_schema: ColumnSchema::new(
536 "field_0".to_string(),
537 ConcreteDataType::uint64_datatype(),
538 true,
539 ),
540 semantic_type: SemanticType::Field,
541 column_id: 1,
542 })
543 .push_column_metadata(ColumnMetadata {
544 column_schema: ColumnSchema::new(
545 "ts".to_string(),
546 ConcreteDataType::timestamp_millisecond_datatype(),
547 false,
548 ),
549 semantic_type: SemanticType::Timestamp,
550 column_id: 2,
551 });
552 builder.primary_key(vec![0]);
553 let metadata = Arc::new(builder.build().unwrap());
554 let codec = build_primary_key_codec(&metadata);
555
556 let pk = {
558 use datatypes::value::ValueRef;
559 use mito_codec::row_converter::PrimaryKeyCodecExt;
560 let codec_ext = mito_codec::row_converter::DensePrimaryKeyCodec::with_fields(vec![(
561 0,
562 mito_codec::row_converter::SortField::new(ConcreteDataType::uint32_datatype()),
563 )]);
564 codec_ext
565 .encode([ValueRef::UInt32(42)].into_iter())
566 .unwrap()
567 };
568 let batch = new_batch_builder(
569 &pk,
570 &[1, 2],
571 &[1, 1],
572 &[OpType::Put, OpType::Put],
573 1,
574 &[10, 20],
575 )
576 .build()
577 .unwrap();
578
579 let adapter = build_adapter(vec![batch], &metadata, &codec);
580 let rb = adapter.into_iter().next().unwrap().unwrap();
581
582 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
583 assert_eq!(rb.schema(), expected_schema);
584
585 let tag_array = rb
587 .column(0)
588 .as_any()
589 .downcast_ref::<UInt32Array>()
590 .expect("integer tag should be a plain UInt32Array");
591 assert_eq!(&[42u32, 42], tag_array.values().as_ref());
592 }
593
594 #[test]
595 fn test_with_precomputed_pk_values() {
596 let metadata = Arc::new(sst_region_metadata());
599 let codec = build_primary_key_codec(&metadata);
600
601 let pk = new_primary_key(&["pre", "computed"]);
602 let mut batch = new_batch_builder(&pk, &[1], &[1], &[OpType::Put], 2, &[99])
603 .build()
604 .unwrap();
605
606 let decoded = codec.decode(&pk).unwrap();
608 batch.set_pk_values(decoded);
609
610 let adapter = build_adapter(vec![batch], &metadata, &codec);
611 let rb = adapter.into_iter().next().unwrap().unwrap();
612 assert_eq!(1, rb.num_rows());
613
614 let expected_schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
615 assert_eq!(rb.schema(), expected_schema);
616 }
617
618 #[test]
619 fn test_partial_projection_schema_matches_mapper() {
620 let mut builder = RegionMetadataBuilder::new(RegionId::new(0, 0));
621 builder
622 .push_column_metadata(ColumnMetadata {
623 column_schema: ColumnSchema::new(
624 "tag_0".to_string(),
625 ConcreteDataType::string_datatype(),
626 true,
627 ),
628 semantic_type: SemanticType::Tag,
629 column_id: 0,
630 })
631 .push_column_metadata(ColumnMetadata {
632 column_schema: ColumnSchema::new(
633 "tag_1".to_string(),
634 ConcreteDataType::string_datatype(),
635 true,
636 ),
637 semantic_type: SemanticType::Tag,
638 column_id: 1,
639 })
640 .push_column_metadata(ColumnMetadata {
641 column_schema: ColumnSchema::new(
642 "field_0".to_string(),
643 ConcreteDataType::uint64_datatype(),
644 true,
645 ),
646 semantic_type: SemanticType::Field,
647 column_id: 2,
648 })
649 .push_column_metadata(ColumnMetadata {
650 column_schema: ColumnSchema::new(
651 "field_1".to_string(),
652 ConcreteDataType::uint64_datatype(),
653 true,
654 ),
655 semantic_type: SemanticType::Field,
656 column_id: 3,
657 })
658 .push_column_metadata(ColumnMetadata {
659 column_schema: ColumnSchema::new(
660 "ts".to_string(),
661 ConcreteDataType::timestamp_millisecond_datatype(),
662 false,
663 ),
664 semantic_type: SemanticType::Timestamp,
665 column_id: 4,
666 });
667 builder.primary_key(vec![0, 1]);
668 let metadata = Arc::new(builder.build().unwrap());
669 let codec = build_primary_key_codec(&metadata);
670
671 let read_column_ids = vec![0, 3];
673
674 let pk = new_primary_key(&["host-1", "region-a"]);
675 let batch = new_batch_builder(
676 &pk,
677 &[1, 2, 3],
678 &[100, 100, 100],
679 &[OpType::Put, OpType::Put, OpType::Put],
680 3,
681 &[10, 20, 30],
682 )
683 .build()
684 .unwrap();
685
686 let iter: BoxedBatchIterator = Box::new(vec![Ok(batch)].into_iter());
687 let adapter =
688 BatchToRecordBatchAdapter::new(iter, metadata.clone(), codec, &read_column_ids);
689 let rb = adapter.into_iter().next().unwrap().unwrap();
690
691 let mapper = FlatProjectionMapper::new(&metadata, [0, 3]).unwrap();
692 assert_eq!(rb.schema(), mapper.input_arrow_schema(false));
693 assert_eq!(6, rb.num_columns());
695 assert_eq!(3, rb.num_rows());
696
697 let field_1 = rb.column(1).as_any().downcast_ref::<UInt64Array>().unwrap();
698 assert_eq!(&[10u64, 20, 30], field_1.values().as_ref());
699
700 let ts = rb
701 .column(2)
702 .as_any()
703 .downcast_ref::<TimestampMillisecondArray>()
704 .unwrap();
705 assert_eq!(&[1i64, 2, 3], ts.values().as_ref());
706 }
707}