1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
28use datatypes::arrow;
29use datatypes::arrow::array::{
30 Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
31 TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
32 UInt8Builder,
33};
34use datatypes::arrow::compute::TakeOptions;
35use datatypes::arrow::datatypes::SchemaRef;
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::Value;
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValuesRef};
42use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
43use parquet::arrow::ArrowWriter;
44use parquet::data_type::AsBytes;
45use parquet::file::metadata::ParquetMetaData;
46use parquet::file::properties::WriterProperties;
47use snafu::{OptionExt, ResultExt, Snafu};
48use store_api::metadata::RegionMetadataRef;
49use store_api::storage::SequenceNumber;
50use table::predicate::Predicate;
51
52use crate::error::{
53 self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
54};
55use crate::memtable::bulk::context::BulkIterContextRef;
56use crate::memtable::bulk::part_reader::BulkPartIter;
57use crate::memtable::BoxedBatchIterator;
58use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
59use crate::sst::parquet::helper::parse_parquet_metadata;
60use crate::sst::to_sst_arrow_schema;
61
62#[derive(Clone)]
63pub struct BulkPart {
64 pub batch: RecordBatch,
65 pub max_ts: i64,
66 pub min_ts: i64,
67 pub sequence: u64,
68 pub timestamp_index: usize,
69 pub raw_data: Option<ArrowIpc>,
70}
71
72impl TryFrom<BulkWalEntry> for BulkPart {
73 type Error = error::Error;
74
75 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
76 match value.body.expect("Entry payload should be present") {
77 Body::ArrowIpc(ipc) => {
78 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
79 .context(error::ConvertBulkWalEntrySnafu)?;
80 let batch = decoder
81 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
82 .context(error::ConvertBulkWalEntrySnafu)?;
83 Ok(Self {
84 batch,
85 max_ts: value.max_ts,
86 min_ts: value.min_ts,
87 sequence: value.sequence,
88 timestamp_index: value.timestamp_index as usize,
89 raw_data: Some(ipc),
90 })
91 }
92 }
93 }
94}
95
96impl From<&BulkPart> for BulkWalEntry {
97 fn from(value: &BulkPart) -> Self {
98 if let Some(ipc) = &value.raw_data {
99 BulkWalEntry {
100 sequence: value.sequence,
101 max_ts: value.max_ts,
102 min_ts: value.min_ts,
103 timestamp_index: value.timestamp_index as u32,
104 body: Some(Body::ArrowIpc(ipc.clone())),
105 }
106 } else {
107 let mut encoder = FlightEncoder::default();
108 let schema_bytes = encoder
109 .encode(FlightMessage::Schema(value.batch.schema()))
110 .data_header;
111 let rb_data = encoder.encode(FlightMessage::RecordBatch(value.batch.clone()));
112 BulkWalEntry {
113 sequence: value.sequence,
114 max_ts: value.max_ts,
115 min_ts: value.min_ts,
116 timestamp_index: value.timestamp_index as u32,
117 body: Some(Body::ArrowIpc(ArrowIpc {
118 schema: schema_bytes,
119 data_header: rb_data.data_header,
120 payload: rb_data.data_body,
121 })),
122 }
123 }
124 }
125}
126
127impl BulkPart {
128 pub(crate) fn estimated_size(&self) -> usize {
129 self.batch.get_array_memory_size()
130 }
131
132 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
134 let vectors = region_metadata
135 .schema
136 .column_schemas()
137 .iter()
138 .map(|col| match self.batch.column_by_name(&col.name) {
139 None => Ok(None),
140 Some(col) => Helper::try_into_vector(col).map(Some),
141 })
142 .collect::<datatypes::error::Result<Vec<_>>>()
143 .context(error::ComputeVectorSnafu)?;
144
145 let rows = (0..self.num_rows())
146 .map(|row_idx| {
147 let values = (0..self.batch.num_columns())
148 .map(|col_idx| {
149 if let Some(v) = &vectors[col_idx] {
150 value_to_grpc_value(v.get(row_idx))
151 } else {
152 api::v1::Value { value_data: None }
153 }
154 })
155 .collect::<Vec<_>>();
156 api::v1::Row { values }
157 })
158 .collect::<Vec<_>>();
159
160 let schema = region_metadata
161 .column_metadatas
162 .iter()
163 .map(|c| {
164 let data_type_wrapper =
165 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
166 Ok(api::v1::ColumnSchema {
167 column_name: c.column_schema.name.clone(),
168 datatype: data_type_wrapper.datatype() as i32,
169 semantic_type: c.semantic_type as i32,
170 ..Default::default()
171 })
172 })
173 .collect::<api::error::Result<Vec<_>>>()
174 .context(error::ConvertColumnDataTypeSnafu {
175 reason: "failed to convert region metadata to column schema",
176 })?;
177
178 let rows = api::v1::Rows { schema, rows };
179
180 Ok(Mutation {
181 op_type: OpType::Put as i32,
182 sequence: self.sequence,
183 rows: Some(rows),
184 write_hint: None,
185 })
186 }
187
188 pub fn timestamps(&self) -> &ArrayRef {
189 self.batch.column(self.timestamp_index)
190 }
191
192 pub fn num_rows(&self) -> usize {
193 self.batch.num_rows()
194 }
195}
196
197#[derive(Debug)]
198pub struct EncodedBulkPart {
199 data: Bytes,
200 metadata: BulkPartMeta,
201}
202
203impl EncodedBulkPart {
204 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
205 Self { data, metadata }
206 }
207
208 pub(crate) fn metadata(&self) -> &BulkPartMeta {
209 &self.metadata
210 }
211
212 pub(crate) fn read(
213 &self,
214 context: BulkIterContextRef,
215 sequence: Option<SequenceNumber>,
216 ) -> Result<Option<BoxedBatchIterator>> {
217 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
219
220 if row_groups_to_read.is_empty() {
221 return Ok(None);
223 }
224
225 let iter = BulkPartIter::try_new(
226 context,
227 row_groups_to_read,
228 self.metadata.parquet_metadata.clone(),
229 self.data.clone(),
230 sequence,
231 )?;
232 Ok(Some(Box::new(iter) as BoxedBatchIterator))
233 }
234}
235
236#[derive(Debug)]
237pub struct BulkPartMeta {
238 pub num_rows: usize,
240 pub max_timestamp: i64,
242 pub min_timestamp: i64,
244 pub parquet_metadata: Arc<ParquetMetaData>,
246 pub region_metadata: RegionMetadataRef,
248}
249
250pub struct BulkPartEncoder {
251 metadata: RegionMetadataRef,
252 pk_encoder: DensePrimaryKeyCodec,
253 row_group_size: usize,
254 dedup: bool,
255 writer_props: Option<WriterProperties>,
256}
257
258impl BulkPartEncoder {
259 pub(crate) fn new(
260 metadata: RegionMetadataRef,
261 dedup: bool,
262 row_group_size: usize,
263 ) -> BulkPartEncoder {
264 let codec = DensePrimaryKeyCodec::new(&metadata);
265 let writer_props = Some(
266 WriterProperties::builder()
267 .set_write_batch_size(row_group_size)
268 .set_max_row_group_size(row_group_size)
269 .build(),
270 );
271 Self {
272 metadata,
273 pk_encoder: codec,
274 row_group_size,
275 dedup,
276 writer_props,
277 }
278 }
279}
280
281impl BulkPartEncoder {
282 fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
284 let Some((arrow_record_batch, min_ts, max_ts)) =
285 mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
286 else {
287 return Ok(None);
288 };
289
290 let mut buf = Vec::with_capacity(4096);
291 let arrow_schema = arrow_record_batch.schema();
292
293 let file_metadata = {
294 let mut writer =
295 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
296 .context(EncodeMemtableSnafu)?;
297 writer
298 .write(&arrow_record_batch)
299 .context(EncodeMemtableSnafu)?;
300 writer.finish().context(EncodeMemtableSnafu)?
301 };
302
303 let buf = Bytes::from(buf);
304 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
305
306 Ok(Some(EncodedBulkPart {
307 data: buf,
308 metadata: BulkPartMeta {
309 num_rows: arrow_record_batch.num_rows(),
310 max_timestamp: max_ts,
311 min_timestamp: min_ts,
312 parquet_metadata,
313 region_metadata: self.metadata.clone(),
314 },
315 }))
316 }
317}
318
319fn mutations_to_record_batch(
321 mutations: &[Mutation],
322 metadata: &RegionMetadataRef,
323 pk_encoder: &DensePrimaryKeyCodec,
324 dedup: bool,
325) -> Result<Option<(RecordBatch, i64, i64)>> {
326 let total_rows: usize = mutations
327 .iter()
328 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
329 .sum();
330
331 if total_rows == 0 {
332 return Ok(None);
333 }
334
335 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
336
337 let mut ts_vector: Box<dyn MutableVector> = metadata
338 .time_index_column()
339 .column_schema
340 .data_type
341 .create_mutable_vector(total_rows);
342 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
343 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
344
345 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
346 .field_columns()
347 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
348 .collect();
349
350 let mut pk_buffer = vec![];
351 for m in mutations {
352 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
353 continue;
354 };
355
356 for row in key_values.iter() {
357 pk_buffer.clear();
358 pk_encoder
359 .encode_to_vec(row.primary_keys(), &mut pk_buffer)
360 .context(EncodeSnafu)?;
361 pk_builder.append_value(pk_buffer.as_bytes());
362 ts_vector.push_value_ref(row.timestamp());
363 sequence_builder.append_value(row.sequence());
364 op_type_builder.append_value(row.op_type() as u8);
365 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
366 builder.push_value_ref(field);
367 }
368 }
369 }
370
371 let arrow_schema = to_sst_arrow_schema(metadata);
372 let timestamp_unit = metadata
374 .time_index_column()
375 .column_schema
376 .data_type
377 .as_timestamp()
378 .unwrap()
379 .unit();
380 let sorter = ArraysSorter {
381 encoded_primary_keys: pk_builder.finish(),
382 timestamp_unit,
383 timestamp: ts_vector.to_vector().to_arrow_array(),
384 sequence: sequence_builder.finish(),
385 op_type: op_type_builder.finish(),
386 fields: field_builders
387 .iter_mut()
388 .map(|f| f.to_vector().to_arrow_array()),
389 dedup,
390 arrow_schema,
391 };
392
393 sorter.sort().map(Some)
394}
395
396struct ArraysSorter<I> {
397 encoded_primary_keys: BinaryArray,
398 timestamp_unit: TimeUnit,
399 timestamp: ArrayRef,
400 sequence: UInt64Array,
401 op_type: UInt8Array,
402 fields: I,
403 dedup: bool,
404 arrow_schema: SchemaRef,
405}
406
407impl<I> ArraysSorter<I>
408where
409 I: Iterator<Item = ArrayRef>,
410{
411 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
413 debug_assert!(!self.timestamp.is_empty());
414 debug_assert!(self.timestamp.len() == self.sequence.len());
415 debug_assert!(self.timestamp.len() == self.op_type.len());
416 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
417
418 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
419 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
420 let mut to_sort = self
421 .encoded_primary_keys
422 .iter()
423 .zip(timestamp_iter)
424 .zip(self.sequence.iter())
425 .map(|((pk, timestamp), sequence)| {
426 max_timestamp = max_timestamp.max(*timestamp);
427 min_timestamp = min_timestamp.min(*timestamp);
428 (pk, timestamp, sequence)
429 })
430 .enumerate()
431 .collect::<Vec<_>>();
432
433 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
434 l_pk.cmp(r_pk)
435 .then(l_ts.cmp(r_ts))
436 .then(l_seq.cmp(r_seq).reverse())
437 });
438
439 if self.dedup {
440 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
442 l_pk == r_pk && l_ts == r_ts
443 });
444 }
445
446 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
447
448 let pk_dictionary = Arc::new(binary_array_to_dictionary(
449 arrow::compute::take(
451 &self.encoded_primary_keys,
452 &indices,
453 Some(TakeOptions {
454 check_bounds: false,
455 }),
456 )
457 .context(ComputeArrowSnafu)?
458 .as_any()
459 .downcast_ref::<BinaryArray>()
460 .unwrap(),
461 )?) as ArrayRef;
462
463 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
464 for arr in self.fields {
465 arrays.push(
466 arrow::compute::take(
467 &arr,
468 &indices,
469 Some(TakeOptions {
470 check_bounds: false,
471 }),
472 )
473 .context(ComputeArrowSnafu)?,
474 );
475 }
476
477 let timestamp = arrow::compute::take(
478 &self.timestamp,
479 &indices,
480 Some(TakeOptions {
481 check_bounds: false,
482 }),
483 )
484 .context(ComputeArrowSnafu)?;
485
486 arrays.push(timestamp);
487 arrays.push(pk_dictionary);
488 arrays.push(
489 arrow::compute::take(
490 &self.sequence,
491 &indices,
492 Some(TakeOptions {
493 check_bounds: false,
494 }),
495 )
496 .context(ComputeArrowSnafu)?,
497 );
498
499 arrays.push(
500 arrow::compute::take(
501 &self.op_type,
502 &indices,
503 Some(TakeOptions {
504 check_bounds: false,
505 }),
506 )
507 .context(ComputeArrowSnafu)?,
508 );
509
510 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
511 Ok((batch, min_timestamp, max_timestamp))
512 }
513}
514
515fn timestamp_array_to_iter(
517 timestamp_unit: TimeUnit,
518 timestamp: &ArrayRef,
519) -> impl Iterator<Item = &i64> {
520 match timestamp_unit {
521 TimeUnit::Second => timestamp
523 .as_any()
524 .downcast_ref::<TimestampSecondArray>()
525 .unwrap()
526 .values()
527 .iter(),
528 TimeUnit::Millisecond => timestamp
529 .as_any()
530 .downcast_ref::<TimestampMillisecondArray>()
531 .unwrap()
532 .values()
533 .iter(),
534 TimeUnit::Microsecond => timestamp
535 .as_any()
536 .downcast_ref::<TimestampMicrosecondArray>()
537 .unwrap()
538 .values()
539 .iter(),
540 TimeUnit::Nanosecond => timestamp
541 .as_any()
542 .downcast_ref::<TimestampNanosecondArray>()
543 .unwrap()
544 .values()
545 .iter(),
546 }
547}
548
549fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
551 if input.is_empty() {
552 return Ok(DictionaryArray::new(
553 UInt32Array::from(Vec::<u32>::new()),
554 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
555 ));
556 }
557 let mut keys = Vec::with_capacity(16);
558 let mut values = BinaryBuilder::new();
559 let mut prev: usize = 0;
560 keys.push(prev as u32);
561 values.append_value(input.value(prev));
562
563 for current_bytes in input.iter().skip(1) {
564 let current_bytes = current_bytes.unwrap();
566 let prev_bytes = input.value(prev);
567 if current_bytes != prev_bytes {
568 values.append_value(current_bytes);
569 prev += 1;
570 }
571 keys.push(prev as u32);
572 }
573
574 Ok(DictionaryArray::new(
575 UInt32Array::from(keys),
576 Arc::new(values.finish()) as ArrayRef,
577 ))
578}
579
580#[cfg(test)]
581mod tests {
582 use std::collections::VecDeque;
583
584 use datafusion_common::ScalarValue;
585 use datatypes::prelude::{ScalarVector, Value};
586 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
587
588 use super::*;
589 use crate::memtable::bulk::context::BulkIterContext;
590 use crate::sst::parquet::format::ReadFormat;
591 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
592
593 fn check_binary_array_to_dictionary(
594 input: &[&[u8]],
595 expected_keys: &[u32],
596 expected_values: &[&[u8]],
597 ) {
598 let input = BinaryArray::from_iter_values(input.iter());
599 let array = binary_array_to_dictionary(&input).unwrap();
600 assert_eq!(
601 &expected_keys,
602 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
603 );
604 assert_eq!(
605 expected_values,
606 &array
607 .values()
608 .as_any()
609 .downcast_ref::<BinaryArray>()
610 .unwrap()
611 .iter()
612 .map(|v| v.unwrap())
613 .collect::<Vec<_>>()
614 );
615 }
616
617 #[test]
618 fn test_binary_array_to_dictionary() {
619 check_binary_array_to_dictionary(&[], &[], &[]);
620
621 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
622
623 check_binary_array_to_dictionary(
624 &["a".as_bytes(), "a".as_bytes()],
625 &[0, 0],
626 &["a".as_bytes()],
627 );
628
629 check_binary_array_to_dictionary(
630 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
631 &[0, 0, 1],
632 &["a".as_bytes(), "b".as_bytes()],
633 );
634
635 check_binary_array_to_dictionary(
636 &[
637 "a".as_bytes(),
638 "a".as_bytes(),
639 "b".as_bytes(),
640 "c".as_bytes(),
641 ],
642 &[0, 0, 1, 2],
643 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
644 );
645 }
646
647 struct MutationInput<'a> {
648 k0: &'a str,
649 k1: u32,
650 timestamps: &'a [i64],
651 v1: &'a [Option<f64>],
652 sequence: u64,
653 }
654
655 #[derive(Debug, PartialOrd, PartialEq)]
656 struct BatchOutput<'a> {
657 pk_values: &'a [Value],
658 timestamps: &'a [i64],
659 v1: &'a [Option<f64>],
660 }
661
662 fn check_mutations_to_record_batches(
663 input: &[MutationInput],
664 expected: &[BatchOutput],
665 expected_timestamp: (i64, i64),
666 dedup: bool,
667 ) {
668 let metadata = metadata_for_test();
669 let mutations = input
670 .iter()
671 .map(|m| {
672 build_key_values_with_ts_seq_values(
673 &metadata,
674 m.k0.to_string(),
675 m.k1,
676 m.timestamps.iter().copied(),
677 m.v1.iter().copied(),
678 m.sequence,
679 )
680 .mutation
681 })
682 .collect::<Vec<_>>();
683 let total_rows: usize = mutations
684 .iter()
685 .flat_map(|m| m.rows.iter())
686 .map(|r| r.rows.len())
687 .sum();
688
689 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
690
691 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
692 .unwrap()
693 .unwrap();
694 let read_format = ReadFormat::new_with_all_columns(metadata.clone());
695 let mut batches = VecDeque::new();
696 read_format
697 .convert_record_batch(&batch, &mut batches)
698 .unwrap();
699 if !dedup {
700 assert_eq!(
701 total_rows,
702 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
703 );
704 }
705 let batch_values = batches
706 .into_iter()
707 .map(|b| {
708 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
709 let timestamps = b
710 .timestamps()
711 .as_any()
712 .downcast_ref::<TimestampMillisecondVector>()
713 .unwrap()
714 .iter_data()
715 .map(|v| v.unwrap().0.value())
716 .collect::<Vec<_>>();
717 let float_values = b.fields()[1]
718 .data
719 .as_any()
720 .downcast_ref::<Float64Vector>()
721 .unwrap()
722 .iter_data()
723 .collect::<Vec<_>>();
724
725 (pk_values, timestamps, float_values)
726 })
727 .collect::<Vec<_>>();
728 assert_eq!(expected.len(), batch_values.len());
729
730 for idx in 0..expected.len() {
731 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
732 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
733 assert_eq!(expected[idx].v1, &batch_values[idx].2);
734 }
735 }
736
737 #[test]
738 fn test_mutations_to_record_batch() {
739 check_mutations_to_record_batches(
740 &[MutationInput {
741 k0: "a",
742 k1: 0,
743 timestamps: &[0],
744 v1: &[Some(0.1)],
745 sequence: 0,
746 }],
747 &[BatchOutput {
748 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
749 timestamps: &[0],
750 v1: &[Some(0.1)],
751 }],
752 (0, 0),
753 true,
754 );
755
756 check_mutations_to_record_batches(
757 &[
758 MutationInput {
759 k0: "a",
760 k1: 0,
761 timestamps: &[0],
762 v1: &[Some(0.1)],
763 sequence: 0,
764 },
765 MutationInput {
766 k0: "b",
767 k1: 0,
768 timestamps: &[0],
769 v1: &[Some(0.0)],
770 sequence: 0,
771 },
772 MutationInput {
773 k0: "a",
774 k1: 0,
775 timestamps: &[1],
776 v1: &[Some(0.2)],
777 sequence: 1,
778 },
779 MutationInput {
780 k0: "a",
781 k1: 1,
782 timestamps: &[1],
783 v1: &[Some(0.3)],
784 sequence: 2,
785 },
786 ],
787 &[
788 BatchOutput {
789 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
790 timestamps: &[0, 1],
791 v1: &[Some(0.1), Some(0.2)],
792 },
793 BatchOutput {
794 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
795 timestamps: &[1],
796 v1: &[Some(0.3)],
797 },
798 BatchOutput {
799 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
800 timestamps: &[0],
801 v1: &[Some(0.0)],
802 },
803 ],
804 (0, 1),
805 true,
806 );
807
808 check_mutations_to_record_batches(
809 &[
810 MutationInput {
811 k0: "a",
812 k1: 0,
813 timestamps: &[0],
814 v1: &[Some(0.1)],
815 sequence: 0,
816 },
817 MutationInput {
818 k0: "b",
819 k1: 0,
820 timestamps: &[0],
821 v1: &[Some(0.0)],
822 sequence: 0,
823 },
824 MutationInput {
825 k0: "a",
826 k1: 0,
827 timestamps: &[0],
828 v1: &[Some(0.2)],
829 sequence: 1,
830 },
831 ],
832 &[
833 BatchOutput {
834 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
835 timestamps: &[0],
836 v1: &[Some(0.2)],
837 },
838 BatchOutput {
839 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
840 timestamps: &[0],
841 v1: &[Some(0.0)],
842 },
843 ],
844 (0, 0),
845 true,
846 );
847 check_mutations_to_record_batches(
848 &[
849 MutationInput {
850 k0: "a",
851 k1: 0,
852 timestamps: &[0],
853 v1: &[Some(0.1)],
854 sequence: 0,
855 },
856 MutationInput {
857 k0: "b",
858 k1: 0,
859 timestamps: &[0],
860 v1: &[Some(0.0)],
861 sequence: 0,
862 },
863 MutationInput {
864 k0: "a",
865 k1: 0,
866 timestamps: &[0],
867 v1: &[Some(0.2)],
868 sequence: 1,
869 },
870 ],
871 &[
872 BatchOutput {
873 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
874 timestamps: &[0, 0],
875 v1: &[Some(0.2), Some(0.1)],
876 },
877 BatchOutput {
878 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
879 timestamps: &[0],
880 v1: &[Some(0.0)],
881 },
882 ],
883 (0, 0),
884 false,
885 );
886 }
887
888 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
889 let metadata = metadata_for_test();
890 let mutations = input
891 .iter()
892 .map(|m| {
893 build_key_values_with_ts_seq_values(
894 &metadata,
895 m.k0.to_string(),
896 m.k1,
897 m.timestamps.iter().copied(),
898 m.v1.iter().copied(),
899 m.sequence,
900 )
901 .mutation
902 })
903 .collect::<Vec<_>>();
904 let encoder = BulkPartEncoder::new(metadata, true, 1024);
905 encoder.encode_mutations(&mutations).unwrap().unwrap()
906 }
907
908 #[test]
909 fn test_write_and_read_part_projection() {
910 let part = encode(&[
911 MutationInput {
912 k0: "a",
913 k1: 0,
914 timestamps: &[1],
915 v1: &[Some(0.1)],
916 sequence: 0,
917 },
918 MutationInput {
919 k0: "b",
920 k1: 0,
921 timestamps: &[1],
922 v1: &[Some(0.0)],
923 sequence: 0,
924 },
925 MutationInput {
926 k0: "a",
927 k1: 0,
928 timestamps: &[2],
929 v1: &[Some(0.2)],
930 sequence: 1,
931 },
932 ]);
933
934 let projection = &[4u32];
935
936 let mut reader = part
937 .read(
938 Arc::new(BulkIterContext::new(
939 part.metadata.region_metadata.clone(),
940 &Some(projection.as_slice()),
941 None,
942 )),
943 None,
944 )
945 .unwrap()
946 .expect("expect at least one row group");
947
948 let mut total_rows_read = 0;
949 let mut field = vec![];
950 for res in reader {
951 let batch = res.unwrap();
952 assert_eq!(1, batch.fields().len());
953 assert_eq!(4, batch.fields()[0].column_id);
954 field.extend(
955 batch.fields()[0]
956 .data
957 .as_any()
958 .downcast_ref::<Float64Vector>()
959 .unwrap()
960 .iter_data()
961 .map(|v| v.unwrap()),
962 );
963 total_rows_read += batch.num_rows();
964 }
965 assert_eq!(3, total_rows_read);
966 assert_eq!(vec![0.1, 0.2, 0.0], field);
967 }
968
969 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
970 let metadata = metadata_for_test();
971 let mutations = key_values
972 .into_iter()
973 .map(|(k0, k1, (start, end), sequence)| {
974 let ts = (start..end);
975 let v1 = (start..end).map(|_| None);
976 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
977 .mutation
978 })
979 .collect::<Vec<_>>();
980 let encoder = BulkPartEncoder::new(metadata, true, 100);
981 encoder.encode_mutations(&mutations).unwrap().unwrap()
982 }
983
984 fn check_prune_row_group(
985 part: &EncodedBulkPart,
986 predicate: Option<Predicate>,
987 expected_rows: usize,
988 ) {
989 let context = Arc::new(BulkIterContext::new(
990 part.metadata.region_metadata.clone(),
991 &None,
992 predicate,
993 ));
994 let mut reader = part
995 .read(context, None)
996 .unwrap()
997 .expect("expect at least one row group");
998 let mut total_rows_read = 0;
999 for res in reader {
1000 let batch = res.unwrap();
1001 total_rows_read += batch.num_rows();
1002 }
1003 assert_eq!(expected_rows, total_rows_read);
1005 }
1006
1007 #[test]
1008 fn test_prune_row_groups() {
1009 let part = prepare(vec![
1010 ("a", 0, (0, 40), 1),
1011 ("a", 1, (0, 60), 1),
1012 ("b", 0, (0, 100), 2),
1013 ("b", 1, (100, 180), 3),
1014 ("b", 1, (180, 210), 4),
1015 ]);
1016
1017 let context = Arc::new(BulkIterContext::new(
1018 part.metadata.region_metadata.clone(),
1019 &None,
1020 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1021 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1022 )])),
1023 ));
1024 assert!(part.read(context, None).unwrap().is_none());
1025
1026 check_prune_row_group(&part, None, 310);
1027
1028 check_prune_row_group(
1029 &part,
1030 Some(Predicate::new(vec![
1031 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1032 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1033 ])),
1034 40,
1035 );
1036
1037 check_prune_row_group(
1038 &part,
1039 Some(Predicate::new(vec![
1040 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1041 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1042 ])),
1043 60,
1044 );
1045
1046 check_prune_row_group(
1047 &part,
1048 Some(Predicate::new(vec![
1049 datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1050 ])),
1051 100,
1052 );
1053
1054 check_prune_row_group(
1055 &part,
1056 Some(Predicate::new(vec![
1057 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1058 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1059 ])),
1060 100,
1061 );
1062
1063 check_prune_row_group(
1065 &part,
1066 Some(Predicate::new(vec![
1067 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1068 ])),
1069 1,
1070 );
1071 }
1072}