mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::bulk_wal_entry::Body;
22use api::v1::{bulk_wal_entry, ArrowIpc, BulkWalEntry, Mutation, OpType};
23use bytes::Bytes;
24use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
25use common_recordbatch::DfRecordBatch as RecordBatch;
26use common_time::timestamp::TimeUnit;
27use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
28use datatypes::arrow;
29use datatypes::arrow::array::{
30    Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
31    TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
32    UInt8Builder,
33};
34use datatypes::arrow::compute::TakeOptions;
35use datatypes::arrow::datatypes::SchemaRef;
36use datatypes::arrow_array::BinaryArray;
37use datatypes::data_type::DataType;
38use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
39use datatypes::value::Value;
40use datatypes::vectors::Helper;
41use mito_codec::key_values::{KeyValue, KeyValuesRef};
42use mito_codec::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
43use parquet::arrow::ArrowWriter;
44use parquet::data_type::AsBytes;
45use parquet::file::metadata::ParquetMetaData;
46use parquet::file::properties::WriterProperties;
47use snafu::{OptionExt, ResultExt, Snafu};
48use store_api::metadata::RegionMetadataRef;
49use store_api::storage::SequenceNumber;
50use table::predicate::Predicate;
51
52use crate::error::{
53    self, ComputeArrowSnafu, EncodeMemtableSnafu, EncodeSnafu, NewRecordBatchSnafu, Result,
54};
55use crate::memtable::bulk::context::BulkIterContextRef;
56use crate::memtable::bulk::part_reader::BulkPartIter;
57use crate::memtable::BoxedBatchIterator;
58use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
59use crate::sst::parquet::helper::parse_parquet_metadata;
60use crate::sst::to_sst_arrow_schema;
61
62#[derive(Clone)]
63pub struct BulkPart {
64    pub batch: RecordBatch,
65    pub max_ts: i64,
66    pub min_ts: i64,
67    pub sequence: u64,
68    pub timestamp_index: usize,
69    pub raw_data: Option<ArrowIpc>,
70}
71
72impl TryFrom<BulkWalEntry> for BulkPart {
73    type Error = error::Error;
74
75    fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
76        match value.body.expect("Entry payload should be present") {
77            Body::ArrowIpc(ipc) => {
78                let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
79                    .context(error::ConvertBulkWalEntrySnafu)?;
80                let batch = decoder
81                    .try_decode_record_batch(&ipc.data_header, &ipc.payload)
82                    .context(error::ConvertBulkWalEntrySnafu)?;
83                Ok(Self {
84                    batch,
85                    max_ts: value.max_ts,
86                    min_ts: value.min_ts,
87                    sequence: value.sequence,
88                    timestamp_index: value.timestamp_index as usize,
89                    raw_data: Some(ipc),
90                })
91            }
92        }
93    }
94}
95
96impl From<&BulkPart> for BulkWalEntry {
97    fn from(value: &BulkPart) -> Self {
98        if let Some(ipc) = &value.raw_data {
99            BulkWalEntry {
100                sequence: value.sequence,
101                max_ts: value.max_ts,
102                min_ts: value.min_ts,
103                timestamp_index: value.timestamp_index as u32,
104                body: Some(Body::ArrowIpc(ipc.clone())),
105            }
106        } else {
107            let mut encoder = FlightEncoder::default();
108            let schema_bytes = encoder
109                .encode(FlightMessage::Schema(value.batch.schema()))
110                .data_header;
111            let rb_data = encoder.encode(FlightMessage::RecordBatch(value.batch.clone()));
112            BulkWalEntry {
113                sequence: value.sequence,
114                max_ts: value.max_ts,
115                min_ts: value.min_ts,
116                timestamp_index: value.timestamp_index as u32,
117                body: Some(Body::ArrowIpc(ArrowIpc {
118                    schema: schema_bytes,
119                    data_header: rb_data.data_header,
120                    payload: rb_data.data_body,
121                })),
122            }
123        }
124    }
125}
126
127impl BulkPart {
128    pub(crate) fn estimated_size(&self) -> usize {
129        self.batch.get_array_memory_size()
130    }
131
132    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
133    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
134        let vectors = region_metadata
135            .schema
136            .column_schemas()
137            .iter()
138            .map(|col| match self.batch.column_by_name(&col.name) {
139                None => Ok(None),
140                Some(col) => Helper::try_into_vector(col).map(Some),
141            })
142            .collect::<datatypes::error::Result<Vec<_>>>()
143            .context(error::ComputeVectorSnafu)?;
144
145        let rows = (0..self.num_rows())
146            .map(|row_idx| {
147                let values = (0..self.batch.num_columns())
148                    .map(|col_idx| {
149                        if let Some(v) = &vectors[col_idx] {
150                            value_to_grpc_value(v.get(row_idx))
151                        } else {
152                            api::v1::Value { value_data: None }
153                        }
154                    })
155                    .collect::<Vec<_>>();
156                api::v1::Row { values }
157            })
158            .collect::<Vec<_>>();
159
160        let schema = region_metadata
161            .column_metadatas
162            .iter()
163            .map(|c| {
164                let data_type_wrapper =
165                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
166                Ok(api::v1::ColumnSchema {
167                    column_name: c.column_schema.name.clone(),
168                    datatype: data_type_wrapper.datatype() as i32,
169                    semantic_type: c.semantic_type as i32,
170                    ..Default::default()
171                })
172            })
173            .collect::<api::error::Result<Vec<_>>>()
174            .context(error::ConvertColumnDataTypeSnafu {
175                reason: "failed to convert region metadata to column schema",
176            })?;
177
178        let rows = api::v1::Rows { schema, rows };
179
180        Ok(Mutation {
181            op_type: OpType::Put as i32,
182            sequence: self.sequence,
183            rows: Some(rows),
184            write_hint: None,
185        })
186    }
187
188    pub fn timestamps(&self) -> &ArrayRef {
189        self.batch.column(self.timestamp_index)
190    }
191
192    pub fn num_rows(&self) -> usize {
193        self.batch.num_rows()
194    }
195}
196
197#[derive(Debug)]
198pub struct EncodedBulkPart {
199    data: Bytes,
200    metadata: BulkPartMeta,
201}
202
203impl EncodedBulkPart {
204    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
205        Self { data, metadata }
206    }
207
208    pub(crate) fn metadata(&self) -> &BulkPartMeta {
209        &self.metadata
210    }
211
212    pub(crate) fn read(
213        &self,
214        context: BulkIterContextRef,
215        sequence: Option<SequenceNumber>,
216    ) -> Result<Option<BoxedBatchIterator>> {
217        // use predicate to find row groups to read.
218        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
219
220        if row_groups_to_read.is_empty() {
221            // All row groups are filtered.
222            return Ok(None);
223        }
224
225        let iter = BulkPartIter::try_new(
226            context,
227            row_groups_to_read,
228            self.metadata.parquet_metadata.clone(),
229            self.data.clone(),
230            sequence,
231        )?;
232        Ok(Some(Box::new(iter) as BoxedBatchIterator))
233    }
234}
235
236#[derive(Debug)]
237pub struct BulkPartMeta {
238    /// Total rows in part.
239    pub num_rows: usize,
240    /// Max timestamp in part.
241    pub max_timestamp: i64,
242    /// Min timestamp in part.
243    pub min_timestamp: i64,
244    /// Part file metadata.
245    pub parquet_metadata: Arc<ParquetMetaData>,
246    /// Part region schema.
247    pub region_metadata: RegionMetadataRef,
248}
249
250pub struct BulkPartEncoder {
251    metadata: RegionMetadataRef,
252    pk_encoder: DensePrimaryKeyCodec,
253    row_group_size: usize,
254    dedup: bool,
255    writer_props: Option<WriterProperties>,
256}
257
258impl BulkPartEncoder {
259    pub(crate) fn new(
260        metadata: RegionMetadataRef,
261        dedup: bool,
262        row_group_size: usize,
263    ) -> BulkPartEncoder {
264        let codec = DensePrimaryKeyCodec::new(&metadata);
265        let writer_props = Some(
266            WriterProperties::builder()
267                .set_write_batch_size(row_group_size)
268                .set_max_row_group_size(row_group_size)
269                .build(),
270        );
271        Self {
272            metadata,
273            pk_encoder: codec,
274            row_group_size,
275            dedup,
276            writer_props,
277        }
278    }
279}
280
281impl BulkPartEncoder {
282    /// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`.
283    fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
284        let Some((arrow_record_batch, min_ts, max_ts)) =
285            mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
286        else {
287            return Ok(None);
288        };
289
290        let mut buf = Vec::with_capacity(4096);
291        let arrow_schema = arrow_record_batch.schema();
292
293        let file_metadata = {
294            let mut writer =
295                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
296                    .context(EncodeMemtableSnafu)?;
297            writer
298                .write(&arrow_record_batch)
299                .context(EncodeMemtableSnafu)?;
300            writer.finish().context(EncodeMemtableSnafu)?
301        };
302
303        let buf = Bytes::from(buf);
304        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
305
306        Ok(Some(EncodedBulkPart {
307            data: buf,
308            metadata: BulkPartMeta {
309                num_rows: arrow_record_batch.num_rows(),
310                max_timestamp: max_ts,
311                min_timestamp: min_ts,
312                parquet_metadata,
313                region_metadata: self.metadata.clone(),
314            },
315        }))
316    }
317}
318
319/// Converts mutations to record batches.
320fn mutations_to_record_batch(
321    mutations: &[Mutation],
322    metadata: &RegionMetadataRef,
323    pk_encoder: &DensePrimaryKeyCodec,
324    dedup: bool,
325) -> Result<Option<(RecordBatch, i64, i64)>> {
326    let total_rows: usize = mutations
327        .iter()
328        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
329        .sum();
330
331    if total_rows == 0 {
332        return Ok(None);
333    }
334
335    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
336
337    let mut ts_vector: Box<dyn MutableVector> = metadata
338        .time_index_column()
339        .column_schema
340        .data_type
341        .create_mutable_vector(total_rows);
342    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
343    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
344
345    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
346        .field_columns()
347        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
348        .collect();
349
350    let mut pk_buffer = vec![];
351    for m in mutations {
352        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
353            continue;
354        };
355
356        for row in key_values.iter() {
357            pk_buffer.clear();
358            pk_encoder
359                .encode_to_vec(row.primary_keys(), &mut pk_buffer)
360                .context(EncodeSnafu)?;
361            pk_builder.append_value(pk_buffer.as_bytes());
362            ts_vector.push_value_ref(row.timestamp());
363            sequence_builder.append_value(row.sequence());
364            op_type_builder.append_value(row.op_type() as u8);
365            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
366                builder.push_value_ref(field);
367            }
368        }
369    }
370
371    let arrow_schema = to_sst_arrow_schema(metadata);
372    // safety: timestamp column must be valid, and values must not be None.
373    let timestamp_unit = metadata
374        .time_index_column()
375        .column_schema
376        .data_type
377        .as_timestamp()
378        .unwrap()
379        .unit();
380    let sorter = ArraysSorter {
381        encoded_primary_keys: pk_builder.finish(),
382        timestamp_unit,
383        timestamp: ts_vector.to_vector().to_arrow_array(),
384        sequence: sequence_builder.finish(),
385        op_type: op_type_builder.finish(),
386        fields: field_builders
387            .iter_mut()
388            .map(|f| f.to_vector().to_arrow_array()),
389        dedup,
390        arrow_schema,
391    };
392
393    sorter.sort().map(Some)
394}
395
396struct ArraysSorter<I> {
397    encoded_primary_keys: BinaryArray,
398    timestamp_unit: TimeUnit,
399    timestamp: ArrayRef,
400    sequence: UInt64Array,
401    op_type: UInt8Array,
402    fields: I,
403    dedup: bool,
404    arrow_schema: SchemaRef,
405}
406
407impl<I> ArraysSorter<I>
408where
409    I: Iterator<Item = ArrayRef>,
410{
411    /// Converts arrays to record batch.
412    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
413        debug_assert!(!self.timestamp.is_empty());
414        debug_assert!(self.timestamp.len() == self.sequence.len());
415        debug_assert!(self.timestamp.len() == self.op_type.len());
416        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
417
418        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
419        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
420        let mut to_sort = self
421            .encoded_primary_keys
422            .iter()
423            .zip(timestamp_iter)
424            .zip(self.sequence.iter())
425            .map(|((pk, timestamp), sequence)| {
426                max_timestamp = max_timestamp.max(*timestamp);
427                min_timestamp = min_timestamp.min(*timestamp);
428                (pk, timestamp, sequence)
429            })
430            .enumerate()
431            .collect::<Vec<_>>();
432
433        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
434            l_pk.cmp(r_pk)
435                .then(l_ts.cmp(r_ts))
436                .then(l_seq.cmp(r_seq).reverse())
437        });
438
439        if self.dedup {
440            // Dedup by timestamps while ignore sequence.
441            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
442                l_pk == r_pk && l_ts == r_ts
443            });
444        }
445
446        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
447
448        let pk_dictionary = Arc::new(binary_array_to_dictionary(
449            // safety: pk must be BinaryArray
450            arrow::compute::take(
451                &self.encoded_primary_keys,
452                &indices,
453                Some(TakeOptions {
454                    check_bounds: false,
455                }),
456            )
457            .context(ComputeArrowSnafu)?
458            .as_any()
459            .downcast_ref::<BinaryArray>()
460            .unwrap(),
461        )?) as ArrayRef;
462
463        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
464        for arr in self.fields {
465            arrays.push(
466                arrow::compute::take(
467                    &arr,
468                    &indices,
469                    Some(TakeOptions {
470                        check_bounds: false,
471                    }),
472                )
473                .context(ComputeArrowSnafu)?,
474            );
475        }
476
477        let timestamp = arrow::compute::take(
478            &self.timestamp,
479            &indices,
480            Some(TakeOptions {
481                check_bounds: false,
482            }),
483        )
484        .context(ComputeArrowSnafu)?;
485
486        arrays.push(timestamp);
487        arrays.push(pk_dictionary);
488        arrays.push(
489            arrow::compute::take(
490                &self.sequence,
491                &indices,
492                Some(TakeOptions {
493                    check_bounds: false,
494                }),
495            )
496            .context(ComputeArrowSnafu)?,
497        );
498
499        arrays.push(
500            arrow::compute::take(
501                &self.op_type,
502                &indices,
503                Some(TakeOptions {
504                    check_bounds: false,
505                }),
506            )
507            .context(ComputeArrowSnafu)?,
508        );
509
510        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
511        Ok((batch, min_timestamp, max_timestamp))
512    }
513}
514
515/// Converts timestamp array to an iter of i64 values.
516fn timestamp_array_to_iter(
517    timestamp_unit: TimeUnit,
518    timestamp: &ArrayRef,
519) -> impl Iterator<Item = &i64> {
520    match timestamp_unit {
521        // safety: timestamp column must be valid.
522        TimeUnit::Second => timestamp
523            .as_any()
524            .downcast_ref::<TimestampSecondArray>()
525            .unwrap()
526            .values()
527            .iter(),
528        TimeUnit::Millisecond => timestamp
529            .as_any()
530            .downcast_ref::<TimestampMillisecondArray>()
531            .unwrap()
532            .values()
533            .iter(),
534        TimeUnit::Microsecond => timestamp
535            .as_any()
536            .downcast_ref::<TimestampMicrosecondArray>()
537            .unwrap()
538            .values()
539            .iter(),
540        TimeUnit::Nanosecond => timestamp
541            .as_any()
542            .downcast_ref::<TimestampNanosecondArray>()
543            .unwrap()
544            .values()
545            .iter(),
546    }
547}
548
549/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
550fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
551    if input.is_empty() {
552        return Ok(DictionaryArray::new(
553            UInt32Array::from(Vec::<u32>::new()),
554            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
555        ));
556    }
557    let mut keys = Vec::with_capacity(16);
558    let mut values = BinaryBuilder::new();
559    let mut prev: usize = 0;
560    keys.push(prev as u32);
561    values.append_value(input.value(prev));
562
563    for current_bytes in input.iter().skip(1) {
564        // safety: encoded pk must present.
565        let current_bytes = current_bytes.unwrap();
566        let prev_bytes = input.value(prev);
567        if current_bytes != prev_bytes {
568            values.append_value(current_bytes);
569            prev += 1;
570        }
571        keys.push(prev as u32);
572    }
573
574    Ok(DictionaryArray::new(
575        UInt32Array::from(keys),
576        Arc::new(values.finish()) as ArrayRef,
577    ))
578}
579
580#[cfg(test)]
581mod tests {
582    use std::collections::VecDeque;
583
584    use datafusion_common::ScalarValue;
585    use datatypes::prelude::{ScalarVector, Value};
586    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
587
588    use super::*;
589    use crate::memtable::bulk::context::BulkIterContext;
590    use crate::sst::parquet::format::ReadFormat;
591    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
592
593    fn check_binary_array_to_dictionary(
594        input: &[&[u8]],
595        expected_keys: &[u32],
596        expected_values: &[&[u8]],
597    ) {
598        let input = BinaryArray::from_iter_values(input.iter());
599        let array = binary_array_to_dictionary(&input).unwrap();
600        assert_eq!(
601            &expected_keys,
602            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
603        );
604        assert_eq!(
605            expected_values,
606            &array
607                .values()
608                .as_any()
609                .downcast_ref::<BinaryArray>()
610                .unwrap()
611                .iter()
612                .map(|v| v.unwrap())
613                .collect::<Vec<_>>()
614        );
615    }
616
617    #[test]
618    fn test_binary_array_to_dictionary() {
619        check_binary_array_to_dictionary(&[], &[], &[]);
620
621        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
622
623        check_binary_array_to_dictionary(
624            &["a".as_bytes(), "a".as_bytes()],
625            &[0, 0],
626            &["a".as_bytes()],
627        );
628
629        check_binary_array_to_dictionary(
630            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
631            &[0, 0, 1],
632            &["a".as_bytes(), "b".as_bytes()],
633        );
634
635        check_binary_array_to_dictionary(
636            &[
637                "a".as_bytes(),
638                "a".as_bytes(),
639                "b".as_bytes(),
640                "c".as_bytes(),
641            ],
642            &[0, 0, 1, 2],
643            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
644        );
645    }
646
647    struct MutationInput<'a> {
648        k0: &'a str,
649        k1: u32,
650        timestamps: &'a [i64],
651        v1: &'a [Option<f64>],
652        sequence: u64,
653    }
654
655    #[derive(Debug, PartialOrd, PartialEq)]
656    struct BatchOutput<'a> {
657        pk_values: &'a [Value],
658        timestamps: &'a [i64],
659        v1: &'a [Option<f64>],
660    }
661
662    fn check_mutations_to_record_batches(
663        input: &[MutationInput],
664        expected: &[BatchOutput],
665        expected_timestamp: (i64, i64),
666        dedup: bool,
667    ) {
668        let metadata = metadata_for_test();
669        let mutations = input
670            .iter()
671            .map(|m| {
672                build_key_values_with_ts_seq_values(
673                    &metadata,
674                    m.k0.to_string(),
675                    m.k1,
676                    m.timestamps.iter().copied(),
677                    m.v1.iter().copied(),
678                    m.sequence,
679                )
680                .mutation
681            })
682            .collect::<Vec<_>>();
683        let total_rows: usize = mutations
684            .iter()
685            .flat_map(|m| m.rows.iter())
686            .map(|r| r.rows.len())
687            .sum();
688
689        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
690
691        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
692            .unwrap()
693            .unwrap();
694        let read_format = ReadFormat::new_with_all_columns(metadata.clone());
695        let mut batches = VecDeque::new();
696        read_format
697            .convert_record_batch(&batch, &mut batches)
698            .unwrap();
699        if !dedup {
700            assert_eq!(
701                total_rows,
702                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
703            );
704        }
705        let batch_values = batches
706            .into_iter()
707            .map(|b| {
708                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
709                let timestamps = b
710                    .timestamps()
711                    .as_any()
712                    .downcast_ref::<TimestampMillisecondVector>()
713                    .unwrap()
714                    .iter_data()
715                    .map(|v| v.unwrap().0.value())
716                    .collect::<Vec<_>>();
717                let float_values = b.fields()[1]
718                    .data
719                    .as_any()
720                    .downcast_ref::<Float64Vector>()
721                    .unwrap()
722                    .iter_data()
723                    .collect::<Vec<_>>();
724
725                (pk_values, timestamps, float_values)
726            })
727            .collect::<Vec<_>>();
728        assert_eq!(expected.len(), batch_values.len());
729
730        for idx in 0..expected.len() {
731            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
732            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
733            assert_eq!(expected[idx].v1, &batch_values[idx].2);
734        }
735    }
736
737    #[test]
738    fn test_mutations_to_record_batch() {
739        check_mutations_to_record_batches(
740            &[MutationInput {
741                k0: "a",
742                k1: 0,
743                timestamps: &[0],
744                v1: &[Some(0.1)],
745                sequence: 0,
746            }],
747            &[BatchOutput {
748                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
749                timestamps: &[0],
750                v1: &[Some(0.1)],
751            }],
752            (0, 0),
753            true,
754        );
755
756        check_mutations_to_record_batches(
757            &[
758                MutationInput {
759                    k0: "a",
760                    k1: 0,
761                    timestamps: &[0],
762                    v1: &[Some(0.1)],
763                    sequence: 0,
764                },
765                MutationInput {
766                    k0: "b",
767                    k1: 0,
768                    timestamps: &[0],
769                    v1: &[Some(0.0)],
770                    sequence: 0,
771                },
772                MutationInput {
773                    k0: "a",
774                    k1: 0,
775                    timestamps: &[1],
776                    v1: &[Some(0.2)],
777                    sequence: 1,
778                },
779                MutationInput {
780                    k0: "a",
781                    k1: 1,
782                    timestamps: &[1],
783                    v1: &[Some(0.3)],
784                    sequence: 2,
785                },
786            ],
787            &[
788                BatchOutput {
789                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
790                    timestamps: &[0, 1],
791                    v1: &[Some(0.1), Some(0.2)],
792                },
793                BatchOutput {
794                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
795                    timestamps: &[1],
796                    v1: &[Some(0.3)],
797                },
798                BatchOutput {
799                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
800                    timestamps: &[0],
801                    v1: &[Some(0.0)],
802                },
803            ],
804            (0, 1),
805            true,
806        );
807
808        check_mutations_to_record_batches(
809            &[
810                MutationInput {
811                    k0: "a",
812                    k1: 0,
813                    timestamps: &[0],
814                    v1: &[Some(0.1)],
815                    sequence: 0,
816                },
817                MutationInput {
818                    k0: "b",
819                    k1: 0,
820                    timestamps: &[0],
821                    v1: &[Some(0.0)],
822                    sequence: 0,
823                },
824                MutationInput {
825                    k0: "a",
826                    k1: 0,
827                    timestamps: &[0],
828                    v1: &[Some(0.2)],
829                    sequence: 1,
830                },
831            ],
832            &[
833                BatchOutput {
834                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
835                    timestamps: &[0],
836                    v1: &[Some(0.2)],
837                },
838                BatchOutput {
839                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
840                    timestamps: &[0],
841                    v1: &[Some(0.0)],
842                },
843            ],
844            (0, 0),
845            true,
846        );
847        check_mutations_to_record_batches(
848            &[
849                MutationInput {
850                    k0: "a",
851                    k1: 0,
852                    timestamps: &[0],
853                    v1: &[Some(0.1)],
854                    sequence: 0,
855                },
856                MutationInput {
857                    k0: "b",
858                    k1: 0,
859                    timestamps: &[0],
860                    v1: &[Some(0.0)],
861                    sequence: 0,
862                },
863                MutationInput {
864                    k0: "a",
865                    k1: 0,
866                    timestamps: &[0],
867                    v1: &[Some(0.2)],
868                    sequence: 1,
869                },
870            ],
871            &[
872                BatchOutput {
873                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
874                    timestamps: &[0, 0],
875                    v1: &[Some(0.2), Some(0.1)],
876                },
877                BatchOutput {
878                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
879                    timestamps: &[0],
880                    v1: &[Some(0.0)],
881                },
882            ],
883            (0, 0),
884            false,
885        );
886    }
887
888    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
889        let metadata = metadata_for_test();
890        let mutations = input
891            .iter()
892            .map(|m| {
893                build_key_values_with_ts_seq_values(
894                    &metadata,
895                    m.k0.to_string(),
896                    m.k1,
897                    m.timestamps.iter().copied(),
898                    m.v1.iter().copied(),
899                    m.sequence,
900                )
901                .mutation
902            })
903            .collect::<Vec<_>>();
904        let encoder = BulkPartEncoder::new(metadata, true, 1024);
905        encoder.encode_mutations(&mutations).unwrap().unwrap()
906    }
907
908    #[test]
909    fn test_write_and_read_part_projection() {
910        let part = encode(&[
911            MutationInput {
912                k0: "a",
913                k1: 0,
914                timestamps: &[1],
915                v1: &[Some(0.1)],
916                sequence: 0,
917            },
918            MutationInput {
919                k0: "b",
920                k1: 0,
921                timestamps: &[1],
922                v1: &[Some(0.0)],
923                sequence: 0,
924            },
925            MutationInput {
926                k0: "a",
927                k1: 0,
928                timestamps: &[2],
929                v1: &[Some(0.2)],
930                sequence: 1,
931            },
932        ]);
933
934        let projection = &[4u32];
935
936        let mut reader = part
937            .read(
938                Arc::new(BulkIterContext::new(
939                    part.metadata.region_metadata.clone(),
940                    &Some(projection.as_slice()),
941                    None,
942                )),
943                None,
944            )
945            .unwrap()
946            .expect("expect at least one row group");
947
948        let mut total_rows_read = 0;
949        let mut field = vec![];
950        for res in reader {
951            let batch = res.unwrap();
952            assert_eq!(1, batch.fields().len());
953            assert_eq!(4, batch.fields()[0].column_id);
954            field.extend(
955                batch.fields()[0]
956                    .data
957                    .as_any()
958                    .downcast_ref::<Float64Vector>()
959                    .unwrap()
960                    .iter_data()
961                    .map(|v| v.unwrap()),
962            );
963            total_rows_read += batch.num_rows();
964        }
965        assert_eq!(3, total_rows_read);
966        assert_eq!(vec![0.1, 0.2, 0.0], field);
967    }
968
969    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
970        let metadata = metadata_for_test();
971        let mutations = key_values
972            .into_iter()
973            .map(|(k0, k1, (start, end), sequence)| {
974                let ts = (start..end);
975                let v1 = (start..end).map(|_| None);
976                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
977                    .mutation
978            })
979            .collect::<Vec<_>>();
980        let encoder = BulkPartEncoder::new(metadata, true, 100);
981        encoder.encode_mutations(&mutations).unwrap().unwrap()
982    }
983
984    fn check_prune_row_group(
985        part: &EncodedBulkPart,
986        predicate: Option<Predicate>,
987        expected_rows: usize,
988    ) {
989        let context = Arc::new(BulkIterContext::new(
990            part.metadata.region_metadata.clone(),
991            &None,
992            predicate,
993        ));
994        let mut reader = part
995            .read(context, None)
996            .unwrap()
997            .expect("expect at least one row group");
998        let mut total_rows_read = 0;
999        for res in reader {
1000            let batch = res.unwrap();
1001            total_rows_read += batch.num_rows();
1002        }
1003        // Should only read row group 1.
1004        assert_eq!(expected_rows, total_rows_read);
1005    }
1006
1007    #[test]
1008    fn test_prune_row_groups() {
1009        let part = prepare(vec![
1010            ("a", 0, (0, 40), 1),
1011            ("a", 1, (0, 60), 1),
1012            ("b", 0, (0, 100), 2),
1013            ("b", 1, (100, 180), 3),
1014            ("b", 1, (180, 210), 4),
1015        ]);
1016
1017        let context = Arc::new(BulkIterContext::new(
1018            part.metadata.region_metadata.clone(),
1019            &None,
1020            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1021                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1022            )])),
1023        ));
1024        assert!(part.read(context, None).unwrap().is_none());
1025
1026        check_prune_row_group(&part, None, 310);
1027
1028        check_prune_row_group(
1029            &part,
1030            Some(Predicate::new(vec![
1031                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1032                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1033            ])),
1034            40,
1035        );
1036
1037        check_prune_row_group(
1038            &part,
1039            Some(Predicate::new(vec![
1040                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1041                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1042            ])),
1043            60,
1044        );
1045
1046        check_prune_row_group(
1047            &part,
1048            Some(Predicate::new(vec![
1049                datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
1050            ])),
1051            100,
1052        );
1053
1054        check_prune_row_group(
1055            &part,
1056            Some(Predicate::new(vec![
1057                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1058                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1059            ])),
1060            100,
1061        );
1062
1063        /// Predicates over field column can do precise filtering.
1064        check_prune_row_group(
1065            &part,
1066            Some(Predicate::new(vec![
1067                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1068            ])),
1069            1,
1070        );
1071    }
1072}