1use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::{Mutation, OpType};
22use bytes::Bytes;
23use common_recordbatch::DfRecordBatch as RecordBatch;
24use common_time::timestamp::TimeUnit;
25use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
26use datatypes::arrow;
27use datatypes::arrow::array::{
28 Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
29 TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
30 UInt8Builder,
31};
32use datatypes::arrow::compute::TakeOptions;
33use datatypes::arrow::datatypes::SchemaRef;
34use datatypes::arrow_array::BinaryArray;
35use datatypes::data_type::DataType;
36use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
37use datatypes::value::Value;
38use datatypes::vectors::Helper;
39use parquet::arrow::ArrowWriter;
40use parquet::data_type::AsBytes;
41use parquet::file::metadata::ParquetMetaData;
42use parquet::file::properties::WriterProperties;
43use snafu::{OptionExt, ResultExt, Snafu};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::SequenceNumber;
46use table::predicate::Predicate;
47
48use crate::error;
49use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
50use crate::memtable::bulk::context::BulkIterContextRef;
51use crate::memtable::bulk::part_reader::BulkPartIter;
52use crate::memtable::key_values::{KeyValue, KeyValuesRef};
53use crate::memtable::BoxedBatchIterator;
54use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
55use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
56use crate::sst::parquet::helper::parse_parquet_metadata;
57use crate::sst::to_sst_arrow_schema;
58
59#[derive(Clone)]
60pub struct BulkPart {
61 pub batch: RecordBatch,
62 pub num_rows: usize,
63 pub max_ts: i64,
64 pub min_ts: i64,
65 pub sequence: u64,
66 pub timestamp_index: usize,
67}
68
69impl BulkPart {
70 pub(crate) fn estimated_size(&self) -> usize {
71 self.batch.get_array_memory_size()
72 }
73
74 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
76 let vectors = region_metadata
77 .schema
78 .column_schemas()
79 .iter()
80 .map(|col| match self.batch.column_by_name(&col.name) {
81 None => Ok(None),
82 Some(col) => Helper::try_into_vector(col).map(Some),
83 })
84 .collect::<datatypes::error::Result<Vec<_>>>()
85 .context(error::ComputeVectorSnafu)?;
86
87 let rows = (0..self.num_rows)
88 .map(|row_idx| {
89 let values = (0..self.batch.num_columns())
90 .map(|col_idx| {
91 if let Some(v) = &vectors[col_idx] {
92 value_to_grpc_value(v.get(row_idx))
93 } else {
94 api::v1::Value { value_data: None }
95 }
96 })
97 .collect::<Vec<_>>();
98 api::v1::Row { values }
99 })
100 .collect::<Vec<_>>();
101
102 let schema = region_metadata
103 .column_metadatas
104 .iter()
105 .map(|c| {
106 let data_type_wrapper =
107 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
108 Ok(api::v1::ColumnSchema {
109 column_name: c.column_schema.name.clone(),
110 datatype: data_type_wrapper.datatype() as i32,
111 semantic_type: c.semantic_type as i32,
112 ..Default::default()
113 })
114 })
115 .collect::<api::error::Result<Vec<_>>>()
116 .context(error::ConvertColumnDataTypeSnafu {
117 reason: "failed to convert region metadata to column schema",
118 })?;
119
120 let rows = api::v1::Rows { schema, rows };
121
122 Ok(Mutation {
123 op_type: OpType::Put as i32,
124 sequence: self.sequence,
125 rows: Some(rows),
126 write_hint: None,
127 })
128 }
129
130 pub fn timestamps(&self) -> &ArrayRef {
131 self.batch.column(self.timestamp_index)
132 }
133}
134
135#[derive(Debug)]
136pub struct EncodedBulkPart {
137 data: Bytes,
138 metadata: BulkPartMeta,
139}
140
141impl EncodedBulkPart {
142 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
143 Self { data, metadata }
144 }
145
146 pub(crate) fn metadata(&self) -> &BulkPartMeta {
147 &self.metadata
148 }
149
150 pub(crate) fn read(
151 &self,
152 context: BulkIterContextRef,
153 sequence: Option<SequenceNumber>,
154 ) -> Result<Option<BoxedBatchIterator>> {
155 let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
157
158 if row_groups_to_read.is_empty() {
159 return Ok(None);
161 }
162
163 let iter = BulkPartIter::try_new(
164 context,
165 row_groups_to_read,
166 self.metadata.parquet_metadata.clone(),
167 self.data.clone(),
168 sequence,
169 )?;
170 Ok(Some(Box::new(iter) as BoxedBatchIterator))
171 }
172}
173
174#[derive(Debug)]
175pub struct BulkPartMeta {
176 pub num_rows: usize,
178 pub max_timestamp: i64,
180 pub min_timestamp: i64,
182 pub parquet_metadata: Arc<ParquetMetaData>,
184 pub region_metadata: RegionMetadataRef,
186}
187
188pub struct BulkPartEncoder {
189 metadata: RegionMetadataRef,
190 pk_encoder: DensePrimaryKeyCodec,
191 row_group_size: usize,
192 dedup: bool,
193 writer_props: Option<WriterProperties>,
194}
195
196impl BulkPartEncoder {
197 pub(crate) fn new(
198 metadata: RegionMetadataRef,
199 dedup: bool,
200 row_group_size: usize,
201 ) -> BulkPartEncoder {
202 let codec = DensePrimaryKeyCodec::new(&metadata);
203 let writer_props = Some(
204 WriterProperties::builder()
205 .set_write_batch_size(row_group_size)
206 .set_max_row_group_size(row_group_size)
207 .build(),
208 );
209 Self {
210 metadata,
211 pk_encoder: codec,
212 row_group_size,
213 dedup,
214 writer_props,
215 }
216 }
217}
218
219impl BulkPartEncoder {
220 fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
222 let Some((arrow_record_batch, min_ts, max_ts)) =
223 mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
224 else {
225 return Ok(None);
226 };
227
228 let mut buf = Vec::with_capacity(4096);
229 let arrow_schema = arrow_record_batch.schema();
230
231 let file_metadata = {
232 let mut writer =
233 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
234 .context(EncodeMemtableSnafu)?;
235 writer
236 .write(&arrow_record_batch)
237 .context(EncodeMemtableSnafu)?;
238 writer.finish().context(EncodeMemtableSnafu)?
239 };
240
241 let buf = Bytes::from(buf);
242 let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
243
244 Ok(Some(EncodedBulkPart {
245 data: buf,
246 metadata: BulkPartMeta {
247 num_rows: arrow_record_batch.num_rows(),
248 max_timestamp: max_ts,
249 min_timestamp: min_ts,
250 parquet_metadata,
251 region_metadata: self.metadata.clone(),
252 },
253 }))
254 }
255}
256
257fn mutations_to_record_batch(
259 mutations: &[Mutation],
260 metadata: &RegionMetadataRef,
261 pk_encoder: &DensePrimaryKeyCodec,
262 dedup: bool,
263) -> Result<Option<(RecordBatch, i64, i64)>> {
264 let total_rows: usize = mutations
265 .iter()
266 .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
267 .sum();
268
269 if total_rows == 0 {
270 return Ok(None);
271 }
272
273 let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
274
275 let mut ts_vector: Box<dyn MutableVector> = metadata
276 .time_index_column()
277 .column_schema
278 .data_type
279 .create_mutable_vector(total_rows);
280 let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
281 let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
282
283 let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
284 .field_columns()
285 .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
286 .collect();
287
288 let mut pk_buffer = vec![];
289 for m in mutations {
290 let Some(key_values) = KeyValuesRef::new(metadata, m) else {
291 continue;
292 };
293
294 for row in key_values.iter() {
295 pk_buffer.clear();
296 pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
297 pk_builder.append_value(pk_buffer.as_bytes());
298 ts_vector.push_value_ref(row.timestamp());
299 sequence_builder.append_value(row.sequence());
300 op_type_builder.append_value(row.op_type() as u8);
301 for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
302 builder.push_value_ref(field);
303 }
304 }
305 }
306
307 let arrow_schema = to_sst_arrow_schema(metadata);
308 let timestamp_unit = metadata
310 .time_index_column()
311 .column_schema
312 .data_type
313 .as_timestamp()
314 .unwrap()
315 .unit();
316 let sorter = ArraysSorter {
317 encoded_primary_keys: pk_builder.finish(),
318 timestamp_unit,
319 timestamp: ts_vector.to_vector().to_arrow_array(),
320 sequence: sequence_builder.finish(),
321 op_type: op_type_builder.finish(),
322 fields: field_builders
323 .iter_mut()
324 .map(|f| f.to_vector().to_arrow_array()),
325 dedup,
326 arrow_schema,
327 };
328
329 sorter.sort().map(Some)
330}
331
332struct ArraysSorter<I> {
333 encoded_primary_keys: BinaryArray,
334 timestamp_unit: TimeUnit,
335 timestamp: ArrayRef,
336 sequence: UInt64Array,
337 op_type: UInt8Array,
338 fields: I,
339 dedup: bool,
340 arrow_schema: SchemaRef,
341}
342
343impl<I> ArraysSorter<I>
344where
345 I: Iterator<Item = ArrayRef>,
346{
347 fn sort(self) -> Result<(RecordBatch, i64, i64)> {
349 debug_assert!(!self.timestamp.is_empty());
350 debug_assert!(self.timestamp.len() == self.sequence.len());
351 debug_assert!(self.timestamp.len() == self.op_type.len());
352 debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
353
354 let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
355 let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
356 let mut to_sort = self
357 .encoded_primary_keys
358 .iter()
359 .zip(timestamp_iter)
360 .zip(self.sequence.iter())
361 .map(|((pk, timestamp), sequence)| {
362 max_timestamp = max_timestamp.max(*timestamp);
363 min_timestamp = min_timestamp.min(*timestamp);
364 (pk, timestamp, sequence)
365 })
366 .enumerate()
367 .collect::<Vec<_>>();
368
369 to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
370 l_pk.cmp(r_pk)
371 .then(l_ts.cmp(r_ts))
372 .then(l_seq.cmp(r_seq).reverse())
373 });
374
375 if self.dedup {
376 to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
378 l_pk == r_pk && l_ts == r_ts
379 });
380 }
381
382 let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
383
384 let pk_dictionary = Arc::new(binary_array_to_dictionary(
385 arrow::compute::take(
387 &self.encoded_primary_keys,
388 &indices,
389 Some(TakeOptions {
390 check_bounds: false,
391 }),
392 )
393 .context(ComputeArrowSnafu)?
394 .as_any()
395 .downcast_ref::<BinaryArray>()
396 .unwrap(),
397 )?) as ArrayRef;
398
399 let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
400 for arr in self.fields {
401 arrays.push(
402 arrow::compute::take(
403 &arr,
404 &indices,
405 Some(TakeOptions {
406 check_bounds: false,
407 }),
408 )
409 .context(ComputeArrowSnafu)?,
410 );
411 }
412
413 let timestamp = arrow::compute::take(
414 &self.timestamp,
415 &indices,
416 Some(TakeOptions {
417 check_bounds: false,
418 }),
419 )
420 .context(ComputeArrowSnafu)?;
421
422 arrays.push(timestamp);
423 arrays.push(pk_dictionary);
424 arrays.push(
425 arrow::compute::take(
426 &self.sequence,
427 &indices,
428 Some(TakeOptions {
429 check_bounds: false,
430 }),
431 )
432 .context(ComputeArrowSnafu)?,
433 );
434
435 arrays.push(
436 arrow::compute::take(
437 &self.op_type,
438 &indices,
439 Some(TakeOptions {
440 check_bounds: false,
441 }),
442 )
443 .context(ComputeArrowSnafu)?,
444 );
445
446 let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
447 Ok((batch, min_timestamp, max_timestamp))
448 }
449}
450
451fn timestamp_array_to_iter(
453 timestamp_unit: TimeUnit,
454 timestamp: &ArrayRef,
455) -> impl Iterator<Item = &i64> {
456 match timestamp_unit {
457 TimeUnit::Second => timestamp
459 .as_any()
460 .downcast_ref::<TimestampSecondArray>()
461 .unwrap()
462 .values()
463 .iter(),
464 TimeUnit::Millisecond => timestamp
465 .as_any()
466 .downcast_ref::<TimestampMillisecondArray>()
467 .unwrap()
468 .values()
469 .iter(),
470 TimeUnit::Microsecond => timestamp
471 .as_any()
472 .downcast_ref::<TimestampMicrosecondArray>()
473 .unwrap()
474 .values()
475 .iter(),
476 TimeUnit::Nanosecond => timestamp
477 .as_any()
478 .downcast_ref::<TimestampNanosecondArray>()
479 .unwrap()
480 .values()
481 .iter(),
482 }
483}
484
485fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
487 if input.is_empty() {
488 return Ok(DictionaryArray::new(
489 UInt32Array::from(Vec::<u32>::new()),
490 Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
491 ));
492 }
493 let mut keys = Vec::with_capacity(16);
494 let mut values = BinaryBuilder::new();
495 let mut prev: usize = 0;
496 keys.push(prev as u32);
497 values.append_value(input.value(prev));
498
499 for current_bytes in input.iter().skip(1) {
500 let current_bytes = current_bytes.unwrap();
502 let prev_bytes = input.value(prev);
503 if current_bytes != prev_bytes {
504 values.append_value(current_bytes);
505 prev += 1;
506 }
507 keys.push(prev as u32);
508 }
509
510 Ok(DictionaryArray::new(
511 UInt32Array::from(keys),
512 Arc::new(values.finish()) as ArrayRef,
513 ))
514}
515
516#[cfg(test)]
517mod tests {
518 use std::collections::VecDeque;
519
520 use datafusion_common::ScalarValue;
521 use datatypes::prelude::{ScalarVector, Value};
522 use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
523
524 use super::*;
525 use crate::memtable::bulk::context::BulkIterContext;
526 use crate::sst::parquet::format::ReadFormat;
527 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
528
529 fn check_binary_array_to_dictionary(
530 input: &[&[u8]],
531 expected_keys: &[u32],
532 expected_values: &[&[u8]],
533 ) {
534 let input = BinaryArray::from_iter_values(input.iter());
535 let array = binary_array_to_dictionary(&input).unwrap();
536 assert_eq!(
537 &expected_keys,
538 &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
539 );
540 assert_eq!(
541 expected_values,
542 &array
543 .values()
544 .as_any()
545 .downcast_ref::<BinaryArray>()
546 .unwrap()
547 .iter()
548 .map(|v| v.unwrap())
549 .collect::<Vec<_>>()
550 );
551 }
552
553 #[test]
554 fn test_binary_array_to_dictionary() {
555 check_binary_array_to_dictionary(&[], &[], &[]);
556
557 check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
558
559 check_binary_array_to_dictionary(
560 &["a".as_bytes(), "a".as_bytes()],
561 &[0, 0],
562 &["a".as_bytes()],
563 );
564
565 check_binary_array_to_dictionary(
566 &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
567 &[0, 0, 1],
568 &["a".as_bytes(), "b".as_bytes()],
569 );
570
571 check_binary_array_to_dictionary(
572 &[
573 "a".as_bytes(),
574 "a".as_bytes(),
575 "b".as_bytes(),
576 "c".as_bytes(),
577 ],
578 &[0, 0, 1, 2],
579 &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
580 );
581 }
582
583 struct MutationInput<'a> {
584 k0: &'a str,
585 k1: u32,
586 timestamps: &'a [i64],
587 v1: &'a [Option<f64>],
588 sequence: u64,
589 }
590
591 #[derive(Debug, PartialOrd, PartialEq)]
592 struct BatchOutput<'a> {
593 pk_values: &'a [Value],
594 timestamps: &'a [i64],
595 v1: &'a [Option<f64>],
596 }
597
598 fn check_mutations_to_record_batches(
599 input: &[MutationInput],
600 expected: &[BatchOutput],
601 expected_timestamp: (i64, i64),
602 dedup: bool,
603 ) {
604 let metadata = metadata_for_test();
605 let mutations = input
606 .iter()
607 .map(|m| {
608 build_key_values_with_ts_seq_values(
609 &metadata,
610 m.k0.to_string(),
611 m.k1,
612 m.timestamps.iter().copied(),
613 m.v1.iter().copied(),
614 m.sequence,
615 )
616 .mutation
617 })
618 .collect::<Vec<_>>();
619 let total_rows: usize = mutations
620 .iter()
621 .flat_map(|m| m.rows.iter())
622 .map(|r| r.rows.len())
623 .sum();
624
625 let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
626
627 let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
628 .unwrap()
629 .unwrap();
630 let read_format = ReadFormat::new_with_all_columns(metadata.clone());
631 let mut batches = VecDeque::new();
632 read_format
633 .convert_record_batch(&batch, &mut batches)
634 .unwrap();
635 if !dedup {
636 assert_eq!(
637 total_rows,
638 batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
639 );
640 }
641 let batch_values = batches
642 .into_iter()
643 .map(|b| {
644 let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
645 let timestamps = b
646 .timestamps()
647 .as_any()
648 .downcast_ref::<TimestampMillisecondVector>()
649 .unwrap()
650 .iter_data()
651 .map(|v| v.unwrap().0.value())
652 .collect::<Vec<_>>();
653 let float_values = b.fields()[1]
654 .data
655 .as_any()
656 .downcast_ref::<Float64Vector>()
657 .unwrap()
658 .iter_data()
659 .collect::<Vec<_>>();
660
661 (pk_values, timestamps, float_values)
662 })
663 .collect::<Vec<_>>();
664 assert_eq!(expected.len(), batch_values.len());
665
666 for idx in 0..expected.len() {
667 assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
668 assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
669 assert_eq!(expected[idx].v1, &batch_values[idx].2);
670 }
671 }
672
673 #[test]
674 fn test_mutations_to_record_batch() {
675 check_mutations_to_record_batches(
676 &[MutationInput {
677 k0: "a",
678 k1: 0,
679 timestamps: &[0],
680 v1: &[Some(0.1)],
681 sequence: 0,
682 }],
683 &[BatchOutput {
684 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
685 timestamps: &[0],
686 v1: &[Some(0.1)],
687 }],
688 (0, 0),
689 true,
690 );
691
692 check_mutations_to_record_batches(
693 &[
694 MutationInput {
695 k0: "a",
696 k1: 0,
697 timestamps: &[0],
698 v1: &[Some(0.1)],
699 sequence: 0,
700 },
701 MutationInput {
702 k0: "b",
703 k1: 0,
704 timestamps: &[0],
705 v1: &[Some(0.0)],
706 sequence: 0,
707 },
708 MutationInput {
709 k0: "a",
710 k1: 0,
711 timestamps: &[1],
712 v1: &[Some(0.2)],
713 sequence: 1,
714 },
715 MutationInput {
716 k0: "a",
717 k1: 1,
718 timestamps: &[1],
719 v1: &[Some(0.3)],
720 sequence: 2,
721 },
722 ],
723 &[
724 BatchOutput {
725 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
726 timestamps: &[0, 1],
727 v1: &[Some(0.1), Some(0.2)],
728 },
729 BatchOutput {
730 pk_values: &[Value::String("a".into()), Value::UInt32(1)],
731 timestamps: &[1],
732 v1: &[Some(0.3)],
733 },
734 BatchOutput {
735 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
736 timestamps: &[0],
737 v1: &[Some(0.0)],
738 },
739 ],
740 (0, 1),
741 true,
742 );
743
744 check_mutations_to_record_batches(
745 &[
746 MutationInput {
747 k0: "a",
748 k1: 0,
749 timestamps: &[0],
750 v1: &[Some(0.1)],
751 sequence: 0,
752 },
753 MutationInput {
754 k0: "b",
755 k1: 0,
756 timestamps: &[0],
757 v1: &[Some(0.0)],
758 sequence: 0,
759 },
760 MutationInput {
761 k0: "a",
762 k1: 0,
763 timestamps: &[0],
764 v1: &[Some(0.2)],
765 sequence: 1,
766 },
767 ],
768 &[
769 BatchOutput {
770 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
771 timestamps: &[0],
772 v1: &[Some(0.2)],
773 },
774 BatchOutput {
775 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
776 timestamps: &[0],
777 v1: &[Some(0.0)],
778 },
779 ],
780 (0, 0),
781 true,
782 );
783 check_mutations_to_record_batches(
784 &[
785 MutationInput {
786 k0: "a",
787 k1: 0,
788 timestamps: &[0],
789 v1: &[Some(0.1)],
790 sequence: 0,
791 },
792 MutationInput {
793 k0: "b",
794 k1: 0,
795 timestamps: &[0],
796 v1: &[Some(0.0)],
797 sequence: 0,
798 },
799 MutationInput {
800 k0: "a",
801 k1: 0,
802 timestamps: &[0],
803 v1: &[Some(0.2)],
804 sequence: 1,
805 },
806 ],
807 &[
808 BatchOutput {
809 pk_values: &[Value::String("a".into()), Value::UInt32(0)],
810 timestamps: &[0, 0],
811 v1: &[Some(0.2), Some(0.1)],
812 },
813 BatchOutput {
814 pk_values: &[Value::String("b".into()), Value::UInt32(0)],
815 timestamps: &[0],
816 v1: &[Some(0.0)],
817 },
818 ],
819 (0, 0),
820 false,
821 );
822 }
823
824 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
825 let metadata = metadata_for_test();
826 let mutations = input
827 .iter()
828 .map(|m| {
829 build_key_values_with_ts_seq_values(
830 &metadata,
831 m.k0.to_string(),
832 m.k1,
833 m.timestamps.iter().copied(),
834 m.v1.iter().copied(),
835 m.sequence,
836 )
837 .mutation
838 })
839 .collect::<Vec<_>>();
840 let encoder = BulkPartEncoder::new(metadata, true, 1024);
841 encoder.encode_mutations(&mutations).unwrap().unwrap()
842 }
843
844 #[test]
845 fn test_write_and_read_part_projection() {
846 let part = encode(&[
847 MutationInput {
848 k0: "a",
849 k1: 0,
850 timestamps: &[1],
851 v1: &[Some(0.1)],
852 sequence: 0,
853 },
854 MutationInput {
855 k0: "b",
856 k1: 0,
857 timestamps: &[1],
858 v1: &[Some(0.0)],
859 sequence: 0,
860 },
861 MutationInput {
862 k0: "a",
863 k1: 0,
864 timestamps: &[2],
865 v1: &[Some(0.2)],
866 sequence: 1,
867 },
868 ]);
869
870 let projection = &[4u32];
871
872 let mut reader = part
873 .read(
874 Arc::new(BulkIterContext::new(
875 part.metadata.region_metadata.clone(),
876 &Some(projection.as_slice()),
877 None,
878 )),
879 None,
880 )
881 .unwrap()
882 .expect("expect at least one row group");
883
884 let mut total_rows_read = 0;
885 let mut field = vec![];
886 for res in reader {
887 let batch = res.unwrap();
888 assert_eq!(1, batch.fields().len());
889 assert_eq!(4, batch.fields()[0].column_id);
890 field.extend(
891 batch.fields()[0]
892 .data
893 .as_any()
894 .downcast_ref::<Float64Vector>()
895 .unwrap()
896 .iter_data()
897 .map(|v| v.unwrap()),
898 );
899 total_rows_read += batch.num_rows();
900 }
901 assert_eq!(3, total_rows_read);
902 assert_eq!(vec![0.1, 0.2, 0.0], field);
903 }
904
905 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
906 let metadata = metadata_for_test();
907 let mutations = key_values
908 .into_iter()
909 .map(|(k0, k1, (start, end), sequence)| {
910 let ts = (start..end);
911 let v1 = (start..end).map(|_| None);
912 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
913 .mutation
914 })
915 .collect::<Vec<_>>();
916 let encoder = BulkPartEncoder::new(metadata, true, 100);
917 encoder.encode_mutations(&mutations).unwrap().unwrap()
918 }
919
920 fn check_prune_row_group(
921 part: &EncodedBulkPart,
922 predicate: Option<Predicate>,
923 expected_rows: usize,
924 ) {
925 let context = Arc::new(BulkIterContext::new(
926 part.metadata.region_metadata.clone(),
927 &None,
928 predicate,
929 ));
930 let mut reader = part
931 .read(context, None)
932 .unwrap()
933 .expect("expect at least one row group");
934 let mut total_rows_read = 0;
935 for res in reader {
936 let batch = res.unwrap();
937 total_rows_read += batch.num_rows();
938 }
939 assert_eq!(expected_rows, total_rows_read);
941 }
942
943 #[test]
944 fn test_prune_row_groups() {
945 let part = prepare(vec![
946 ("a", 0, (0, 40), 1),
947 ("a", 1, (0, 60), 1),
948 ("b", 0, (0, 100), 2),
949 ("b", 1, (100, 180), 3),
950 ("b", 1, (180, 210), 4),
951 ]);
952
953 let context = Arc::new(BulkIterContext::new(
954 part.metadata.region_metadata.clone(),
955 &None,
956 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
957 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
958 )])),
959 ));
960 assert!(part.read(context, None).unwrap().is_none());
961
962 check_prune_row_group(&part, None, 310);
963
964 check_prune_row_group(
965 &part,
966 Some(Predicate::new(vec![
967 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
968 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
969 ])),
970 40,
971 );
972
973 check_prune_row_group(
974 &part,
975 Some(Predicate::new(vec![
976 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
977 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
978 ])),
979 60,
980 );
981
982 check_prune_row_group(
983 &part,
984 Some(Predicate::new(vec![
985 datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
986 ])),
987 100,
988 );
989
990 check_prune_row_group(
991 &part,
992 Some(Predicate::new(vec![
993 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
994 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
995 ])),
996 100,
997 );
998
999 check_prune_row_group(
1001 &part,
1002 Some(Predicate::new(vec![
1003 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1004 ])),
1005 1,
1006 );
1007 }
1008}