1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::v1::Mutation;
21use bytes::Bytes;
22use common_time::timestamp::TimeUnit;
23use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
24use datatypes::arrow;
25use datatypes::arrow::array::{
26 Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
27 TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
28 UInt8Builder,
29};
30use datatypes::arrow::compute::TakeOptions;
31use datatypes::arrow::datatypes::SchemaRef;
32use datatypes::arrow_array::BinaryArray;
33use datatypes::data_type::DataType;
34use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
35use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
36use parquet::arrow::ArrowWriter;
37use parquet::data_type::AsBytes;
38use parquet::file::metadata::ParquetMetaData;
39use parquet::file::properties::WriterProperties;
40use snafu::ResultExt;
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error;
46use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
47use crate::memtable::bulk::context::BulkIterContextRef;
48use crate::memtable::bulk::part_reader::BulkPartIter;
49use crate::memtable::key_values::KeyValuesRef;
50use crate::memtable::BoxedBatchIterator;
51use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
52use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
53use crate::sst::parquet::helper::parse_parquet_metadata;
54use crate::sst::to_sst_arrow_schema;
55
56#[derive(Debug)]
57pub struct BulkPart {
58 data: Bytes,
59 metadata: BulkPartMeta,
60}
61
62impl BulkPart {
63 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
64 Self { data, metadata }
65 }
66
67 pub(crate) fn metadata(&self) -> &BulkPartMeta {
68 &self.metadata
69 }
70
71 pub(crate) fn read(
72 &self,
73 context: BulkIterContextRef,
74 sequence: Option<SequenceNumber>,
75 ) -> Result<Option<BoxedBatchIterator>> {
76 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
78
79 if row_groups_to_read.is_empty() {
80 return Ok(None);
82 }
83
84 let iter = BulkPartIter::try_new(
85 context,
86 row_groups_to_read,
87 self.metadata.parquet_metadata.clone(),
88 self.data.clone(),
89 sequence,
90 )?;
91 Ok(Some(Box::new(iter) as BoxedBatchIterator))
92 }
93}
94
95#[derive(Debug)]
96pub struct BulkPartMeta {
97 pub num_rows: usize,
99 pub max_timestamp: i64,
101 pub min_timestamp: i64,
103 pub parquet_metadata: Arc<ParquetMetaData>,
105 pub region_metadata: RegionMetadataRef,
107}
108
109pub struct BulkPartEncoder {
110 metadata: RegionMetadataRef,
111 pk_encoder: DensePrimaryKeyCodec,
112 row_group_size: usize,
113 dedup: bool,
114 writer_props: Option<WriterProperties>,
115}
116
117impl BulkPartEncoder {
118 pub(crate) fn new(
119 metadata: RegionMetadataRef,
120 dedup: bool,
121 row_group_size: usize,
122 ) -> BulkPartEncoder {
123 let codec = DensePrimaryKeyCodec::new(&metadata);
124 let writer_props = Some(
125 WriterProperties::builder()
126 .set_write_batch_size(row_group_size)
127 .set_max_row_group_size(row_group_size)
128 .build(),
129 );
130 Self {
131 metadata,
132 pk_encoder: codec,
133 row_group_size,
134 dedup,
135 writer_props,
136 }
137 }
138}
139
140impl BulkPartEncoder {
141 fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
143 let Some((arrow_record_batch, min_ts, max_ts)) =
144 mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
145 else {
146 return Ok(None);
147 };
148
149 let mut buf = Vec::with_capacity(4096);
150 let arrow_schema = arrow_record_batch.schema();
151
152 let file_metadata = {
153 let mut writer =
154 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
155 .context(EncodeMemtableSnafu)?;
156 writer
157 .write(&arrow_record_batch)
158 .context(EncodeMemtableSnafu)?;
159 writer.finish().context(EncodeMemtableSnafu)?
160 };
161
162 let buf = Bytes::from(buf);
163 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
164
165 Ok(Some(BulkPart {
166 data: buf,
167 metadata: BulkPartMeta {
168 num_rows: arrow_record_batch.num_rows(),
169 max_timestamp: max_ts,
170 min_timestamp: min_ts,
171 parquet_metadata,
172 region_metadata: self.metadata.clone(),
173 },
174 }))
175 }
176}
177
178fn mutations_to_record_batch(
180 mutations: &[Mutation],
181 metadata: &RegionMetadataRef,
182 pk_encoder: &DensePrimaryKeyCodec,
183 dedup: bool,
184) -> Result<Option<(RecordBatch, i64, i64)>> {
185 let total_rows: usize = mutations
186 .iter()
187 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
188 .sum();
189
190 if total_rows == 0 {
191 return Ok(None);
192 }
193
194 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
195
196 let mut ts_vector: Box<dyn MutableVector> = metadata
197 .time_index_column()
198 .column_schema
199 .data_type
200 .create_mutable_vector(total_rows);
201 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
202 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
203
204 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
205 .field_columns()
206 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
207 .collect();
208
209 let mut pk_buffer = vec![];
210 for m in mutations {
211 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
212 continue;
213 };
214
215 for row in key_values.iter() {
216 pk_buffer.clear();
217 pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
218 pk_builder.append_value(pk_buffer.as_bytes());
219 ts_vector.push_value_ref(row.timestamp());
220 sequence_builder.append_value(row.sequence());
221 op_type_builder.append_value(row.op_type() as u8);
222 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
223 builder.push_value_ref(field);
224 }
225 }
226 }
227
228 let arrow_schema = to_sst_arrow_schema(metadata);
229 let timestamp_unit = metadata
231 .time_index_column()
232 .column_schema
233 .data_type
234 .as_timestamp()
235 .unwrap()
236 .unit();
237 let sorter = ArraysSorter {
238 encoded_primary_keys: pk_builder.finish(),
239 timestamp_unit,
240 timestamp: ts_vector.to_vector().to_arrow_array(),
241 sequence: sequence_builder.finish(),
242 op_type: op_type_builder.finish(),
243 fields: field_builders
244 .iter_mut()
245 .map(|f| f.to_vector().to_arrow_array()),
246 dedup,
247 arrow_schema,
248 };
249
250 sorter.sort().map(Some)
251}
252
253struct ArraysSorter<I> {
254 encoded_primary_keys: BinaryArray,
255 timestamp_unit: TimeUnit,
256 timestamp: ArrayRef,
257 sequence: UInt64Array,
258 op_type: UInt8Array,
259 fields: I,
260 dedup: bool,
261 arrow_schema: SchemaRef,
262}
263
264impl<I> ArraysSorter<I>
265where
266 I: Iterator<Item = ArrayRef>,
267{
268 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
270 debug_assert!(!self.timestamp.is_empty());
271 debug_assert!(self.timestamp.len() == self.sequence.len());
272 debug_assert!(self.timestamp.len() == self.op_type.len());
273 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
274
275 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
276 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
277 let mut to_sort = self
278 .encoded_primary_keys
279 .iter()
280 .zip(timestamp_iter)
281 .zip(self.sequence.iter())
282 .map(|((pk, timestamp), sequence)| {
283 max_timestamp = max_timestamp.max(*timestamp);
284 min_timestamp = min_timestamp.min(*timestamp);
285 (pk, timestamp, sequence)
286 })
287 .enumerate()
288 .collect::<Vec<_>>();
289
290 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
291 l_pk.cmp(r_pk)
292 .then(l_ts.cmp(r_ts))
293 .then(l_seq.cmp(r_seq).reverse())
294 });
295
296 if self.dedup {
297 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
299 l_pk == r_pk && l_ts == r_ts
300 });
301 }
302
303 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
304
305 let pk_dictionary = Arc::new(binary_array_to_dictionary(
306 arrow::compute::take(
308 &self.encoded_primary_keys,
309 &indices,
310 Some(TakeOptions {
311 check_bounds: false,
312 }),
313 )
314 .context(ComputeArrowSnafu)?
315 .as_any()
316 .downcast_ref::<BinaryArray>()
317 .unwrap(),
318 )?) as ArrayRef;
319
320 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
321 for arr in self.fields {
322 arrays.push(
323 arrow::compute::take(
324 &arr,
325 &indices,
326 Some(TakeOptions {
327 check_bounds: false,
328 }),
329 )
330 .context(ComputeArrowSnafu)?,
331 );
332 }
333
334 let timestamp = arrow::compute::take(
335 &self.timestamp,
336 &indices,
337 Some(TakeOptions {
338 check_bounds: false,
339 }),
340 )
341 .context(ComputeArrowSnafu)?;
342
343 arrays.push(timestamp);
344 arrays.push(pk_dictionary);
345 arrays.push(
346 arrow::compute::take(
347 &self.sequence,
348 &indices,
349 Some(TakeOptions {
350 check_bounds: false,
351 }),
352 )
353 .context(ComputeArrowSnafu)?,
354 );
355
356 arrays.push(
357 arrow::compute::take(
358 &self.op_type,
359 &indices,
360 Some(TakeOptions {
361 check_bounds: false,
362 }),
363 )
364 .context(ComputeArrowSnafu)?,
365 );
366
367 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
368 Ok((batch, min_timestamp, max_timestamp))
369 }
370}
371
372fn timestamp_array_to_iter(
374 timestamp_unit: TimeUnit,
375 timestamp: &ArrayRef,
376) -> impl Iterator<Item = &i64> {
377 match timestamp_unit {
378 TimeUnit::Second => timestamp
380 .as_any()
381 .downcast_ref::<TimestampSecondArray>()
382 .unwrap()
383 .values()
384 .iter(),
385 TimeUnit::Millisecond => timestamp
386 .as_any()
387 .downcast_ref::<TimestampMillisecondArray>()
388 .unwrap()
389 .values()
390 .iter(),
391 TimeUnit::Microsecond => timestamp
392 .as_any()
393 .downcast_ref::<TimestampMicrosecondArray>()
394 .unwrap()
395 .values()
396 .iter(),
397 TimeUnit::Nanosecond => timestamp
398 .as_any()
399 .downcast_ref::<TimestampNanosecondArray>()
400 .unwrap()
401 .values()
402 .iter(),
403 }
404}
405
406fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
408 if input.is_empty() {
409 return Ok(DictionaryArray::new(
410 UInt32Array::from(Vec::<u32>::new()),
411 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
412 ));
413 }
414 let mut keys = Vec::with_capacity(16);
415 let mut values = BinaryBuilder::new();
416 let mut prev: usize = 0;
417 keys.push(prev as u32);
418 values.append_value(input.value(prev));
419
420 for current_bytes in input.iter().skip(1) {
421 let current_bytes = current_bytes.unwrap();
423 let prev_bytes = input.value(prev);
424 if current_bytes != prev_bytes {
425 values.append_value(current_bytes);
426 prev += 1;
427 }
428 keys.push(prev as u32);
429 }
430
431 Ok(DictionaryArray::new(
432 UInt32Array::from(keys),
433 Arc::new(values.finish()) as ArrayRef,
434 ))
435}
436
437#[cfg(test)]
438mod tests {
439 use std::collections::VecDeque;
440
441 use datafusion_common::ScalarValue;
442 use datatypes::prelude::{ScalarVector, Value};
443 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
444
445 use super::*;
446 use crate::memtable::bulk::context::BulkIterContext;
447 use crate::sst::parquet::format::ReadFormat;
448 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
449
450 fn check_binary_array_to_dictionary(
451 input: &[&[u8]],
452 expected_keys: &[u32],
453 expected_values: &[&[u8]],
454 ) {
455 let input = BinaryArray::from_iter_values(input.iter());
456 let array = binary_array_to_dictionary(&input).unwrap();
457 assert_eq!(
458 &expected_keys,
459 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
460 );
461 assert_eq!(
462 expected_values,
463 &array
464 .values()
465 .as_any()
466 .downcast_ref::<BinaryArray>()
467 .unwrap()
468 .iter()
469 .map(|v| v.unwrap())
470 .collect::<Vec<_>>()
471 );
472 }
473
474 #[test]
475 fn test_binary_array_to_dictionary() {
476 check_binary_array_to_dictionary(&[], &[], &[]);
477
478 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
479
480 check_binary_array_to_dictionary(
481 &["a".as_bytes(), "a".as_bytes()],
482 &[0, 0],
483 &["a".as_bytes()],
484 );
485
486 check_binary_array_to_dictionary(
487 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
488 &[0, 0, 1],
489 &["a".as_bytes(), "b".as_bytes()],
490 );
491
492 check_binary_array_to_dictionary(
493 &[
494 "a".as_bytes(),
495 "a".as_bytes(),
496 "b".as_bytes(),
497 "c".as_bytes(),
498 ],
499 &[0, 0, 1, 2],
500 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
501 );
502 }
503
504 struct MutationInput<'a> {
505 k0: &'a str,
506 k1: u32,
507 timestamps: &'a [i64],
508 v1: &'a [Option<f64>],
509 sequence: u64,
510 }
511
512 #[derive(Debug, PartialOrd, PartialEq)]
513 struct BatchOutput<'a> {
514 pk_values: &'a [Value],
515 timestamps: &'a [i64],
516 v1: &'a [Option<f64>],
517 }
518
519 fn check_mutations_to_record_batches(
520 input: &[MutationInput],
521 expected: &[BatchOutput],
522 expected_timestamp: (i64, i64),
523 dedup: bool,
524 ) {
525 let metadata = metadata_for_test();
526 let mutations = input
527 .iter()
528 .map(|m| {
529 build_key_values_with_ts_seq_values(
530 &metadata,
531 m.k0.to_string(),
532 m.k1,
533 m.timestamps.iter().copied(),
534 m.v1.iter().copied(),
535 m.sequence,
536 )
537 .mutation
538 })
539 .collect::<Vec<_>>();
540 let total_rows: usize = mutations
541 .iter()
542 .flat_map(|m| m.rows.iter())
543 .map(|r| r.rows.len())
544 .sum();
545
546 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
547
548 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
549 .unwrap()
550 .unwrap();
551 let read_format = ReadFormat::new_with_all_columns(metadata.clone());
552 let mut batches = VecDeque::new();
553 read_format
554 .convert_record_batch(&batch, &mut batches)
555 .unwrap();
556 if !dedup {
557 assert_eq!(
558 total_rows,
559 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
560 );
561 }
562 let batch_values = batches
563 .into_iter()
564 .map(|b| {
565 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
566 let timestamps = b
567 .timestamps()
568 .as_any()
569 .downcast_ref::<TimestampMillisecondVector>()
570 .unwrap()
571 .iter_data()
572 .map(|v| v.unwrap().0.value())
573 .collect::<Vec<_>>();
574 let float_values = b.fields()[1]
575 .data
576 .as_any()
577 .downcast_ref::<Float64Vector>()
578 .unwrap()
579 .iter_data()
580 .collect::<Vec<_>>();
581
582 (pk_values, timestamps, float_values)
583 })
584 .collect::<Vec<_>>();
585 assert_eq!(expected.len(), batch_values.len());
586
587 for idx in 0..expected.len() {
588 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
589 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
590 assert_eq!(expected[idx].v1, &batch_values[idx].2);
591 }
592 }
593
594 #[test]
595 fn test_mutations_to_record_batch() {
596 check_mutations_to_record_batches(
597 &[MutationInput {
598 k0: "a",
599 k1: 0,
600 timestamps: &[0],
601 v1: &[Some(0.1)],
602 sequence: 0,
603 }],
604 &[BatchOutput {
605 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
606 timestamps: &[0],
607 v1: &[Some(0.1)],
608 }],
609 (0, 0),
610 true,
611 );
612
613 check_mutations_to_record_batches(
614 &[
615 MutationInput {
616 k0: "a",
617 k1: 0,
618 timestamps: &[0],
619 v1: &[Some(0.1)],
620 sequence: 0,
621 },
622 MutationInput {
623 k0: "b",
624 k1: 0,
625 timestamps: &[0],
626 v1: &[Some(0.0)],
627 sequence: 0,
628 },
629 MutationInput {
630 k0: "a",
631 k1: 0,
632 timestamps: &[1],
633 v1: &[Some(0.2)],
634 sequence: 1,
635 },
636 MutationInput {
637 k0: "a",
638 k1: 1,
639 timestamps: &[1],
640 v1: &[Some(0.3)],
641 sequence: 2,
642 },
643 ],
644 &[
645 BatchOutput {
646 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
647 timestamps: &[0, 1],
648 v1: &[Some(0.1), Some(0.2)],
649 },
650 BatchOutput {
651 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
652 timestamps: &[1],
653 v1: &[Some(0.3)],
654 },
655 BatchOutput {
656 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
657 timestamps: &[0],
658 v1: &[Some(0.0)],
659 },
660 ],
661 (0, 1),
662 true,
663 );
664
665 check_mutations_to_record_batches(
666 &[
667 MutationInput {
668 k0: "a",
669 k1: 0,
670 timestamps: &[0],
671 v1: &[Some(0.1)],
672 sequence: 0,
673 },
674 MutationInput {
675 k0: "b",
676 k1: 0,
677 timestamps: &[0],
678 v1: &[Some(0.0)],
679 sequence: 0,
680 },
681 MutationInput {
682 k0: "a",
683 k1: 0,
684 timestamps: &[0],
685 v1: &[Some(0.2)],
686 sequence: 1,
687 },
688 ],
689 &[
690 BatchOutput {
691 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
692 timestamps: &[0],
693 v1: &[Some(0.2)],
694 },
695 BatchOutput {
696 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
697 timestamps: &[0],
698 v1: &[Some(0.0)],
699 },
700 ],
701 (0, 0),
702 true,
703 );
704 check_mutations_to_record_batches(
705 &[
706 MutationInput {
707 k0: "a",
708 k1: 0,
709 timestamps: &[0],
710 v1: &[Some(0.1)],
711 sequence: 0,
712 },
713 MutationInput {
714 k0: "b",
715 k1: 0,
716 timestamps: &[0],
717 v1: &[Some(0.0)],
718 sequence: 0,
719 },
720 MutationInput {
721 k0: "a",
722 k1: 0,
723 timestamps: &[0],
724 v1: &[Some(0.2)],
725 sequence: 1,
726 },
727 ],
728 &[
729 BatchOutput {
730 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
731 timestamps: &[0, 0],
732 v1: &[Some(0.2), Some(0.1)],
733 },
734 BatchOutput {
735 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
736 timestamps: &[0],
737 v1: &[Some(0.0)],
738 },
739 ],
740 (0, 0),
741 false,
742 );
743 }
744
745 fn encode(input: &[MutationInput]) -> BulkPart {
746 let metadata = metadata_for_test();
747 let mutations = input
748 .iter()
749 .map(|m| {
750 build_key_values_with_ts_seq_values(
751 &metadata,
752 m.k0.to_string(),
753 m.k1,
754 m.timestamps.iter().copied(),
755 m.v1.iter().copied(),
756 m.sequence,
757 )
758 .mutation
759 })
760 .collect::<Vec<_>>();
761 let encoder = BulkPartEncoder::new(metadata, true, 1024);
762 encoder.encode_mutations(&mutations).unwrap().unwrap()
763 }
764
765 #[test]
766 fn test_write_and_read_part_projection() {
767 let part = encode(&[
768 MutationInput {
769 k0: "a",
770 k1: 0,
771 timestamps: &[1],
772 v1: &[Some(0.1)],
773 sequence: 0,
774 },
775 MutationInput {
776 k0: "b",
777 k1: 0,
778 timestamps: &[1],
779 v1: &[Some(0.0)],
780 sequence: 0,
781 },
782 MutationInput {
783 k0: "a",
784 k1: 0,
785 timestamps: &[2],
786 v1: &[Some(0.2)],
787 sequence: 1,
788 },
789 ]);
790
791 let projection = &[4u32];
792
793 let mut reader = part
794 .read(
795 Arc::new(BulkIterContext::new(
796 part.metadata.region_metadata.clone(),
797 &Some(projection.as_slice()),
798 None,
799 )),
800 None,
801 )
802 .unwrap()
803 .expect("expect at least one row group");
804
805 let mut total_rows_read = 0;
806 let mut field = vec![];
807 for res in reader {
808 let batch = res.unwrap();
809 assert_eq!(1, batch.fields().len());
810 assert_eq!(4, batch.fields()[0].column_id);
811 field.extend(
812 batch.fields()[0]
813 .data
814 .as_any()
815 .downcast_ref::<Float64Vector>()
816 .unwrap()
817 .iter_data()
818 .map(|v| v.unwrap()),
819 );
820 total_rows_read += batch.num_rows();
821 }
822 assert_eq!(3, total_rows_read);
823 assert_eq!(vec![0.1, 0.2, 0.0], field);
824 }
825
826 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart {
827 let metadata = metadata_for_test();
828 let mutations = key_values
829 .into_iter()
830 .map(|(k0, k1, (start, end), sequence)| {
831 let ts = (start..end);
832 let v1 = (start..end).map(|_| None);
833 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
834 .mutation
835 })
836 .collect::<Vec<_>>();
837 let encoder = BulkPartEncoder::new(metadata, true, 100);
838 encoder.encode_mutations(&mutations).unwrap().unwrap()
839 }
840
841 fn check_prune_row_group(part: &BulkPart, predicate: Option<Predicate>, expected_rows: usize) {
842 let context = Arc::new(BulkIterContext::new(
843 part.metadata.region_metadata.clone(),
844 &None,
845 predicate,
846 ));
847 let mut reader = part
848 .read(context, None)
849 .unwrap()
850 .expect("expect at least one row group");
851 let mut total_rows_read = 0;
852 for res in reader {
853 let batch = res.unwrap();
854 total_rows_read += batch.num_rows();
855 }
856 assert_eq!(expected_rows, total_rows_read);
858 }
859
860 #[test]
861 fn test_prune_row_groups() {
862 let part = prepare(vec![
863 ("a", 0, (0, 40), 1),
864 ("a", 1, (0, 60), 1),
865 ("b", 0, (0, 100), 2),
866 ("b", 1, (100, 180), 3),
867 ("b", 1, (180, 210), 4),
868 ]);
869
870 let context = Arc::new(BulkIterContext::new(
871 part.metadata.region_metadata.clone(),
872 &None,
873 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
874 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
875 )])),
876 ));
877 assert!(part.read(context, None).unwrap().is_none());
878
879 check_prune_row_group(&part, None, 310);
880
881 check_prune_row_group(
882 &part,
883 Some(Predicate::new(vec![
884 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
885 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
886 ])),
887 40,
888 );
889
890 check_prune_row_group(
891 &part,
892 Some(Predicate::new(vec![
893 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
894 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
895 ])),
896 60,
897 );
898
899 check_prune_row_group(
900 &part,
901 Some(Predicate::new(vec![
902 datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
903 ])),
904 100,
905 );
906
907 check_prune_row_group(
908 &part,
909 Some(Predicate::new(vec![
910 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
911 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
912 ])),
913 100,
914 );
915
916 check_prune_row_group(
918 &part,
919 Some(Predicate::new(vec![
920 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
921 ])),
922 1,
923 );
924 }
925}