mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::helper::{value_to_grpc_value, ColumnDataTypeWrapper};
21use api::v1::{Mutation, OpType};
22use bytes::Bytes;
23use common_recordbatch::DfRecordBatch as RecordBatch;
24use common_time::timestamp::TimeUnit;
25use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
26use datatypes::arrow;
27use datatypes::arrow::array::{
28    Array, ArrayRef, BinaryBuilder, DictionaryArray, TimestampMicrosecondArray,
29    TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
30    UInt8Builder,
31};
32use datatypes::arrow::compute::TakeOptions;
33use datatypes::arrow::datatypes::SchemaRef;
34use datatypes::arrow_array::BinaryArray;
35use datatypes::data_type::DataType;
36use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
37use datatypes::value::Value;
38use datatypes::vectors::Helper;
39use parquet::arrow::ArrowWriter;
40use parquet::data_type::AsBytes;
41use parquet::file::metadata::ParquetMetaData;
42use parquet::file::properties::WriterProperties;
43use snafu::{OptionExt, ResultExt, Snafu};
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::SequenceNumber;
46use table::predicate::Predicate;
47
48use crate::error;
49use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
50use crate::memtable::bulk::context::BulkIterContextRef;
51use crate::memtable::bulk::part_reader::BulkPartIter;
52use crate::memtable::key_values::{KeyValue, KeyValuesRef};
53use crate::memtable::BoxedBatchIterator;
54use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
55use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
56use crate::sst::parquet::helper::parse_parquet_metadata;
57use crate::sst::to_sst_arrow_schema;
58
59#[derive(Clone)]
60pub struct BulkPart {
61    pub batch: RecordBatch,
62    pub num_rows: usize,
63    pub max_ts: i64,
64    pub min_ts: i64,
65    pub sequence: u64,
66    pub timestamp_index: usize,
67}
68
69impl BulkPart {
70    pub(crate) fn estimated_size(&self) -> usize {
71        self.batch.get_array_memory_size()
72    }
73
74    /// Converts [BulkPart] to [Mutation] for fallback `write_bulk` implementation.
75    pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
76        let vectors = region_metadata
77            .schema
78            .column_schemas()
79            .iter()
80            .map(|col| match self.batch.column_by_name(&col.name) {
81                None => Ok(None),
82                Some(col) => Helper::try_into_vector(col).map(Some),
83            })
84            .collect::<datatypes::error::Result<Vec<_>>>()
85            .context(error::ComputeVectorSnafu)?;
86
87        let rows = (0..self.num_rows)
88            .map(|row_idx| {
89                let values = (0..self.batch.num_columns())
90                    .map(|col_idx| {
91                        if let Some(v) = &vectors[col_idx] {
92                            value_to_grpc_value(v.get(row_idx))
93                        } else {
94                            api::v1::Value { value_data: None }
95                        }
96                    })
97                    .collect::<Vec<_>>();
98                api::v1::Row { values }
99            })
100            .collect::<Vec<_>>();
101
102        let schema = region_metadata
103            .column_metadatas
104            .iter()
105            .map(|c| {
106                let data_type_wrapper =
107                    ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
108                Ok(api::v1::ColumnSchema {
109                    column_name: c.column_schema.name.clone(),
110                    datatype: data_type_wrapper.datatype() as i32,
111                    semantic_type: c.semantic_type as i32,
112                    ..Default::default()
113                })
114            })
115            .collect::<api::error::Result<Vec<_>>>()
116            .context(error::ConvertColumnDataTypeSnafu {
117                reason: "failed to convert region metadata to column schema",
118            })?;
119
120        let rows = api::v1::Rows { schema, rows };
121
122        Ok(Mutation {
123            op_type: OpType::Put as i32,
124            sequence: self.sequence,
125            rows: Some(rows),
126            write_hint: None,
127        })
128    }
129
130    pub fn timestamps(&self) -> &ArrayRef {
131        self.batch.column(self.timestamp_index)
132    }
133}
134
135#[derive(Debug)]
136pub struct EncodedBulkPart {
137    data: Bytes,
138    metadata: BulkPartMeta,
139}
140
141impl EncodedBulkPart {
142    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
143        Self { data, metadata }
144    }
145
146    pub(crate) fn metadata(&self) -> &BulkPartMeta {
147        &self.metadata
148    }
149
150    pub(crate) fn read(
151        &self,
152        context: BulkIterContextRef,
153        sequence: Option<SequenceNumber>,
154    ) -> Result<Option<BoxedBatchIterator>> {
155        // use predicate to find row groups to read.
156        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
157
158        if row_groups_to_read.is_empty() {
159            // All row groups are filtered.
160            return Ok(None);
161        }
162
163        let iter = BulkPartIter::try_new(
164            context,
165            row_groups_to_read,
166            self.metadata.parquet_metadata.clone(),
167            self.data.clone(),
168            sequence,
169        )?;
170        Ok(Some(Box::new(iter) as BoxedBatchIterator))
171    }
172}
173
174#[derive(Debug)]
175pub struct BulkPartMeta {
176    /// Total rows in part.
177    pub num_rows: usize,
178    /// Max timestamp in part.
179    pub max_timestamp: i64,
180    /// Min timestamp in part.
181    pub min_timestamp: i64,
182    /// Part file metadata.
183    pub parquet_metadata: Arc<ParquetMetaData>,
184    /// Part region schema.
185    pub region_metadata: RegionMetadataRef,
186}
187
188pub struct BulkPartEncoder {
189    metadata: RegionMetadataRef,
190    pk_encoder: DensePrimaryKeyCodec,
191    row_group_size: usize,
192    dedup: bool,
193    writer_props: Option<WriterProperties>,
194}
195
196impl BulkPartEncoder {
197    pub(crate) fn new(
198        metadata: RegionMetadataRef,
199        dedup: bool,
200        row_group_size: usize,
201    ) -> BulkPartEncoder {
202        let codec = DensePrimaryKeyCodec::new(&metadata);
203        let writer_props = Some(
204            WriterProperties::builder()
205                .set_write_batch_size(row_group_size)
206                .set_max_row_group_size(row_group_size)
207                .build(),
208        );
209        Self {
210            metadata,
211            pk_encoder: codec,
212            row_group_size,
213            dedup,
214            writer_props,
215        }
216    }
217}
218
219impl BulkPartEncoder {
220    /// Encodes mutations to a [EncodedBulkPart], returns true if encoded data has been written to `dest`.
221    fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<EncodedBulkPart>> {
222        let Some((arrow_record_batch, min_ts, max_ts)) =
223            mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
224        else {
225            return Ok(None);
226        };
227
228        let mut buf = Vec::with_capacity(4096);
229        let arrow_schema = arrow_record_batch.schema();
230
231        let file_metadata = {
232            let mut writer =
233                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
234                    .context(EncodeMemtableSnafu)?;
235            writer
236                .write(&arrow_record_batch)
237                .context(EncodeMemtableSnafu)?;
238            writer.finish().context(EncodeMemtableSnafu)?
239        };
240
241        let buf = Bytes::from(buf);
242        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
243
244        Ok(Some(EncodedBulkPart {
245            data: buf,
246            metadata: BulkPartMeta {
247                num_rows: arrow_record_batch.num_rows(),
248                max_timestamp: max_ts,
249                min_timestamp: min_ts,
250                parquet_metadata,
251                region_metadata: self.metadata.clone(),
252            },
253        }))
254    }
255}
256
257/// Converts mutations to record batches.
258fn mutations_to_record_batch(
259    mutations: &[Mutation],
260    metadata: &RegionMetadataRef,
261    pk_encoder: &DensePrimaryKeyCodec,
262    dedup: bool,
263) -> Result<Option<(RecordBatch, i64, i64)>> {
264    let total_rows: usize = mutations
265        .iter()
266        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
267        .sum();
268
269    if total_rows == 0 {
270        return Ok(None);
271    }
272
273    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
274
275    let mut ts_vector: Box<dyn MutableVector> = metadata
276        .time_index_column()
277        .column_schema
278        .data_type
279        .create_mutable_vector(total_rows);
280    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
281    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
282
283    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
284        .field_columns()
285        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
286        .collect();
287
288    let mut pk_buffer = vec![];
289    for m in mutations {
290        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
291            continue;
292        };
293
294        for row in key_values.iter() {
295            pk_buffer.clear();
296            pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
297            pk_builder.append_value(pk_buffer.as_bytes());
298            ts_vector.push_value_ref(row.timestamp());
299            sequence_builder.append_value(row.sequence());
300            op_type_builder.append_value(row.op_type() as u8);
301            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
302                builder.push_value_ref(field);
303            }
304        }
305    }
306
307    let arrow_schema = to_sst_arrow_schema(metadata);
308    // safety: timestamp column must be valid, and values must not be None.
309    let timestamp_unit = metadata
310        .time_index_column()
311        .column_schema
312        .data_type
313        .as_timestamp()
314        .unwrap()
315        .unit();
316    let sorter = ArraysSorter {
317        encoded_primary_keys: pk_builder.finish(),
318        timestamp_unit,
319        timestamp: ts_vector.to_vector().to_arrow_array(),
320        sequence: sequence_builder.finish(),
321        op_type: op_type_builder.finish(),
322        fields: field_builders
323            .iter_mut()
324            .map(|f| f.to_vector().to_arrow_array()),
325        dedup,
326        arrow_schema,
327    };
328
329    sorter.sort().map(Some)
330}
331
332struct ArraysSorter<I> {
333    encoded_primary_keys: BinaryArray,
334    timestamp_unit: TimeUnit,
335    timestamp: ArrayRef,
336    sequence: UInt64Array,
337    op_type: UInt8Array,
338    fields: I,
339    dedup: bool,
340    arrow_schema: SchemaRef,
341}
342
343impl<I> ArraysSorter<I>
344where
345    I: Iterator<Item = ArrayRef>,
346{
347    /// Converts arrays to record batch.
348    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
349        debug_assert!(!self.timestamp.is_empty());
350        debug_assert!(self.timestamp.len() == self.sequence.len());
351        debug_assert!(self.timestamp.len() == self.op_type.len());
352        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
353
354        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
355        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
356        let mut to_sort = self
357            .encoded_primary_keys
358            .iter()
359            .zip(timestamp_iter)
360            .zip(self.sequence.iter())
361            .map(|((pk, timestamp), sequence)| {
362                max_timestamp = max_timestamp.max(*timestamp);
363                min_timestamp = min_timestamp.min(*timestamp);
364                (pk, timestamp, sequence)
365            })
366            .enumerate()
367            .collect::<Vec<_>>();
368
369        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
370            l_pk.cmp(r_pk)
371                .then(l_ts.cmp(r_ts))
372                .then(l_seq.cmp(r_seq).reverse())
373        });
374
375        if self.dedup {
376            // Dedup by timestamps while ignore sequence.
377            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
378                l_pk == r_pk && l_ts == r_ts
379            });
380        }
381
382        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
383
384        let pk_dictionary = Arc::new(binary_array_to_dictionary(
385            // safety: pk must be BinaryArray
386            arrow::compute::take(
387                &self.encoded_primary_keys,
388                &indices,
389                Some(TakeOptions {
390                    check_bounds: false,
391                }),
392            )
393            .context(ComputeArrowSnafu)?
394            .as_any()
395            .downcast_ref::<BinaryArray>()
396            .unwrap(),
397        )?) as ArrayRef;
398
399        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
400        for arr in self.fields {
401            arrays.push(
402                arrow::compute::take(
403                    &arr,
404                    &indices,
405                    Some(TakeOptions {
406                        check_bounds: false,
407                    }),
408                )
409                .context(ComputeArrowSnafu)?,
410            );
411        }
412
413        let timestamp = arrow::compute::take(
414            &self.timestamp,
415            &indices,
416            Some(TakeOptions {
417                check_bounds: false,
418            }),
419        )
420        .context(ComputeArrowSnafu)?;
421
422        arrays.push(timestamp);
423        arrays.push(pk_dictionary);
424        arrays.push(
425            arrow::compute::take(
426                &self.sequence,
427                &indices,
428                Some(TakeOptions {
429                    check_bounds: false,
430                }),
431            )
432            .context(ComputeArrowSnafu)?,
433        );
434
435        arrays.push(
436            arrow::compute::take(
437                &self.op_type,
438                &indices,
439                Some(TakeOptions {
440                    check_bounds: false,
441                }),
442            )
443            .context(ComputeArrowSnafu)?,
444        );
445
446        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
447        Ok((batch, min_timestamp, max_timestamp))
448    }
449}
450
451/// Converts timestamp array to an iter of i64 values.
452fn timestamp_array_to_iter(
453    timestamp_unit: TimeUnit,
454    timestamp: &ArrayRef,
455) -> impl Iterator<Item = &i64> {
456    match timestamp_unit {
457        // safety: timestamp column must be valid.
458        TimeUnit::Second => timestamp
459            .as_any()
460            .downcast_ref::<TimestampSecondArray>()
461            .unwrap()
462            .values()
463            .iter(),
464        TimeUnit::Millisecond => timestamp
465            .as_any()
466            .downcast_ref::<TimestampMillisecondArray>()
467            .unwrap()
468            .values()
469            .iter(),
470        TimeUnit::Microsecond => timestamp
471            .as_any()
472            .downcast_ref::<TimestampMicrosecondArray>()
473            .unwrap()
474            .values()
475            .iter(),
476        TimeUnit::Nanosecond => timestamp
477            .as_any()
478            .downcast_ref::<TimestampNanosecondArray>()
479            .unwrap()
480            .values()
481            .iter(),
482    }
483}
484
485/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
486fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
487    if input.is_empty() {
488        return Ok(DictionaryArray::new(
489            UInt32Array::from(Vec::<u32>::new()),
490            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
491        ));
492    }
493    let mut keys = Vec::with_capacity(16);
494    let mut values = BinaryBuilder::new();
495    let mut prev: usize = 0;
496    keys.push(prev as u32);
497    values.append_value(input.value(prev));
498
499    for current_bytes in input.iter().skip(1) {
500        // safety: encoded pk must present.
501        let current_bytes = current_bytes.unwrap();
502        let prev_bytes = input.value(prev);
503        if current_bytes != prev_bytes {
504            values.append_value(current_bytes);
505            prev += 1;
506        }
507        keys.push(prev as u32);
508    }
509
510    Ok(DictionaryArray::new(
511        UInt32Array::from(keys),
512        Arc::new(values.finish()) as ArrayRef,
513    ))
514}
515
516#[cfg(test)]
517mod tests {
518    use std::collections::VecDeque;
519
520    use datafusion_common::ScalarValue;
521    use datatypes::prelude::{ScalarVector, Value};
522    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
523
524    use super::*;
525    use crate::memtable::bulk::context::BulkIterContext;
526    use crate::sst::parquet::format::ReadFormat;
527    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
528
529    fn check_binary_array_to_dictionary(
530        input: &[&[u8]],
531        expected_keys: &[u32],
532        expected_values: &[&[u8]],
533    ) {
534        let input = BinaryArray::from_iter_values(input.iter());
535        let array = binary_array_to_dictionary(&input).unwrap();
536        assert_eq!(
537            &expected_keys,
538            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
539        );
540        assert_eq!(
541            expected_values,
542            &array
543                .values()
544                .as_any()
545                .downcast_ref::<BinaryArray>()
546                .unwrap()
547                .iter()
548                .map(|v| v.unwrap())
549                .collect::<Vec<_>>()
550        );
551    }
552
553    #[test]
554    fn test_binary_array_to_dictionary() {
555        check_binary_array_to_dictionary(&[], &[], &[]);
556
557        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
558
559        check_binary_array_to_dictionary(
560            &["a".as_bytes(), "a".as_bytes()],
561            &[0, 0],
562            &["a".as_bytes()],
563        );
564
565        check_binary_array_to_dictionary(
566            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
567            &[0, 0, 1],
568            &["a".as_bytes(), "b".as_bytes()],
569        );
570
571        check_binary_array_to_dictionary(
572            &[
573                "a".as_bytes(),
574                "a".as_bytes(),
575                "b".as_bytes(),
576                "c".as_bytes(),
577            ],
578            &[0, 0, 1, 2],
579            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
580        );
581    }
582
583    struct MutationInput<'a> {
584        k0: &'a str,
585        k1: u32,
586        timestamps: &'a [i64],
587        v1: &'a [Option<f64>],
588        sequence: u64,
589    }
590
591    #[derive(Debug, PartialOrd, PartialEq)]
592    struct BatchOutput<'a> {
593        pk_values: &'a [Value],
594        timestamps: &'a [i64],
595        v1: &'a [Option<f64>],
596    }
597
598    fn check_mutations_to_record_batches(
599        input: &[MutationInput],
600        expected: &[BatchOutput],
601        expected_timestamp: (i64, i64),
602        dedup: bool,
603    ) {
604        let metadata = metadata_for_test();
605        let mutations = input
606            .iter()
607            .map(|m| {
608                build_key_values_with_ts_seq_values(
609                    &metadata,
610                    m.k0.to_string(),
611                    m.k1,
612                    m.timestamps.iter().copied(),
613                    m.v1.iter().copied(),
614                    m.sequence,
615                )
616                .mutation
617            })
618            .collect::<Vec<_>>();
619        let total_rows: usize = mutations
620            .iter()
621            .flat_map(|m| m.rows.iter())
622            .map(|r| r.rows.len())
623            .sum();
624
625        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
626
627        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
628            .unwrap()
629            .unwrap();
630        let read_format = ReadFormat::new_with_all_columns(metadata.clone());
631        let mut batches = VecDeque::new();
632        read_format
633            .convert_record_batch(&batch, &mut batches)
634            .unwrap();
635        if !dedup {
636            assert_eq!(
637                total_rows,
638                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
639            );
640        }
641        let batch_values = batches
642            .into_iter()
643            .map(|b| {
644                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
645                let timestamps = b
646                    .timestamps()
647                    .as_any()
648                    .downcast_ref::<TimestampMillisecondVector>()
649                    .unwrap()
650                    .iter_data()
651                    .map(|v| v.unwrap().0.value())
652                    .collect::<Vec<_>>();
653                let float_values = b.fields()[1]
654                    .data
655                    .as_any()
656                    .downcast_ref::<Float64Vector>()
657                    .unwrap()
658                    .iter_data()
659                    .collect::<Vec<_>>();
660
661                (pk_values, timestamps, float_values)
662            })
663            .collect::<Vec<_>>();
664        assert_eq!(expected.len(), batch_values.len());
665
666        for idx in 0..expected.len() {
667            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
668            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
669            assert_eq!(expected[idx].v1, &batch_values[idx].2);
670        }
671    }
672
673    #[test]
674    fn test_mutations_to_record_batch() {
675        check_mutations_to_record_batches(
676            &[MutationInput {
677                k0: "a",
678                k1: 0,
679                timestamps: &[0],
680                v1: &[Some(0.1)],
681                sequence: 0,
682            }],
683            &[BatchOutput {
684                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
685                timestamps: &[0],
686                v1: &[Some(0.1)],
687            }],
688            (0, 0),
689            true,
690        );
691
692        check_mutations_to_record_batches(
693            &[
694                MutationInput {
695                    k0: "a",
696                    k1: 0,
697                    timestamps: &[0],
698                    v1: &[Some(0.1)],
699                    sequence: 0,
700                },
701                MutationInput {
702                    k0: "b",
703                    k1: 0,
704                    timestamps: &[0],
705                    v1: &[Some(0.0)],
706                    sequence: 0,
707                },
708                MutationInput {
709                    k0: "a",
710                    k1: 0,
711                    timestamps: &[1],
712                    v1: &[Some(0.2)],
713                    sequence: 1,
714                },
715                MutationInput {
716                    k0: "a",
717                    k1: 1,
718                    timestamps: &[1],
719                    v1: &[Some(0.3)],
720                    sequence: 2,
721                },
722            ],
723            &[
724                BatchOutput {
725                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
726                    timestamps: &[0, 1],
727                    v1: &[Some(0.1), Some(0.2)],
728                },
729                BatchOutput {
730                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
731                    timestamps: &[1],
732                    v1: &[Some(0.3)],
733                },
734                BatchOutput {
735                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
736                    timestamps: &[0],
737                    v1: &[Some(0.0)],
738                },
739            ],
740            (0, 1),
741            true,
742        );
743
744        check_mutations_to_record_batches(
745            &[
746                MutationInput {
747                    k0: "a",
748                    k1: 0,
749                    timestamps: &[0],
750                    v1: &[Some(0.1)],
751                    sequence: 0,
752                },
753                MutationInput {
754                    k0: "b",
755                    k1: 0,
756                    timestamps: &[0],
757                    v1: &[Some(0.0)],
758                    sequence: 0,
759                },
760                MutationInput {
761                    k0: "a",
762                    k1: 0,
763                    timestamps: &[0],
764                    v1: &[Some(0.2)],
765                    sequence: 1,
766                },
767            ],
768            &[
769                BatchOutput {
770                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
771                    timestamps: &[0],
772                    v1: &[Some(0.2)],
773                },
774                BatchOutput {
775                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
776                    timestamps: &[0],
777                    v1: &[Some(0.0)],
778                },
779            ],
780            (0, 0),
781            true,
782        );
783        check_mutations_to_record_batches(
784            &[
785                MutationInput {
786                    k0: "a",
787                    k1: 0,
788                    timestamps: &[0],
789                    v1: &[Some(0.1)],
790                    sequence: 0,
791                },
792                MutationInput {
793                    k0: "b",
794                    k1: 0,
795                    timestamps: &[0],
796                    v1: &[Some(0.0)],
797                    sequence: 0,
798                },
799                MutationInput {
800                    k0: "a",
801                    k1: 0,
802                    timestamps: &[0],
803                    v1: &[Some(0.2)],
804                    sequence: 1,
805                },
806            ],
807            &[
808                BatchOutput {
809                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
810                    timestamps: &[0, 0],
811                    v1: &[Some(0.2), Some(0.1)],
812                },
813                BatchOutput {
814                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
815                    timestamps: &[0],
816                    v1: &[Some(0.0)],
817                },
818            ],
819            (0, 0),
820            false,
821        );
822    }
823
824    fn encode(input: &[MutationInput]) -> EncodedBulkPart {
825        let metadata = metadata_for_test();
826        let mutations = input
827            .iter()
828            .map(|m| {
829                build_key_values_with_ts_seq_values(
830                    &metadata,
831                    m.k0.to_string(),
832                    m.k1,
833                    m.timestamps.iter().copied(),
834                    m.v1.iter().copied(),
835                    m.sequence,
836                )
837                .mutation
838            })
839            .collect::<Vec<_>>();
840        let encoder = BulkPartEncoder::new(metadata, true, 1024);
841        encoder.encode_mutations(&mutations).unwrap().unwrap()
842    }
843
844    #[test]
845    fn test_write_and_read_part_projection() {
846        let part = encode(&[
847            MutationInput {
848                k0: "a",
849                k1: 0,
850                timestamps: &[1],
851                v1: &[Some(0.1)],
852                sequence: 0,
853            },
854            MutationInput {
855                k0: "b",
856                k1: 0,
857                timestamps: &[1],
858                v1: &[Some(0.0)],
859                sequence: 0,
860            },
861            MutationInput {
862                k0: "a",
863                k1: 0,
864                timestamps: &[2],
865                v1: &[Some(0.2)],
866                sequence: 1,
867            },
868        ]);
869
870        let projection = &[4u32];
871
872        let mut reader = part
873            .read(
874                Arc::new(BulkIterContext::new(
875                    part.metadata.region_metadata.clone(),
876                    &Some(projection.as_slice()),
877                    None,
878                )),
879                None,
880            )
881            .unwrap()
882            .expect("expect at least one row group");
883
884        let mut total_rows_read = 0;
885        let mut field = vec![];
886        for res in reader {
887            let batch = res.unwrap();
888            assert_eq!(1, batch.fields().len());
889            assert_eq!(4, batch.fields()[0].column_id);
890            field.extend(
891                batch.fields()[0]
892                    .data
893                    .as_any()
894                    .downcast_ref::<Float64Vector>()
895                    .unwrap()
896                    .iter_data()
897                    .map(|v| v.unwrap()),
898            );
899            total_rows_read += batch.num_rows();
900        }
901        assert_eq!(3, total_rows_read);
902        assert_eq!(vec![0.1, 0.2, 0.0], field);
903    }
904
905    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
906        let metadata = metadata_for_test();
907        let mutations = key_values
908            .into_iter()
909            .map(|(k0, k1, (start, end), sequence)| {
910                let ts = (start..end);
911                let v1 = (start..end).map(|_| None);
912                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
913                    .mutation
914            })
915            .collect::<Vec<_>>();
916        let encoder = BulkPartEncoder::new(metadata, true, 100);
917        encoder.encode_mutations(&mutations).unwrap().unwrap()
918    }
919
920    fn check_prune_row_group(
921        part: &EncodedBulkPart,
922        predicate: Option<Predicate>,
923        expected_rows: usize,
924    ) {
925        let context = Arc::new(BulkIterContext::new(
926            part.metadata.region_metadata.clone(),
927            &None,
928            predicate,
929        ));
930        let mut reader = part
931            .read(context, None)
932            .unwrap()
933            .expect("expect at least one row group");
934        let mut total_rows_read = 0;
935        for res in reader {
936            let batch = res.unwrap();
937            total_rows_read += batch.num_rows();
938        }
939        // Should only read row group 1.
940        assert_eq!(expected_rows, total_rows_read);
941    }
942
943    #[test]
944    fn test_prune_row_groups() {
945        let part = prepare(vec![
946            ("a", 0, (0, 40), 1),
947            ("a", 1, (0, 60), 1),
948            ("b", 0, (0, 100), 2),
949            ("b", 1, (100, 180), 3),
950            ("b", 1, (180, 210), 4),
951        ]);
952
953        let context = Arc::new(BulkIterContext::new(
954            part.metadata.region_metadata.clone(),
955            &None,
956            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
957                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
958            )])),
959        ));
960        assert!(part.read(context, None).unwrap().is_none());
961
962        check_prune_row_group(&part, None, 310);
963
964        check_prune_row_group(
965            &part,
966            Some(Predicate::new(vec![
967                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
968                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
969            ])),
970            40,
971        );
972
973        check_prune_row_group(
974            &part,
975            Some(Predicate::new(vec![
976                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
977                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
978            ])),
979            60,
980        );
981
982        check_prune_row_group(
983            &part,
984            Some(Predicate::new(vec![
985                datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
986            ])),
987            100,
988        );
989
990        check_prune_row_group(
991            &part,
992            Some(Predicate::new(vec![
993                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
994                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
995            ])),
996            100,
997        );
998
999        /// Predicates over field column can do precise filtering.
1000        check_prune_row_group(
1001            &part,
1002            Some(Predicate::new(vec![
1003                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
1004            ])),
1005            1,
1006        );
1007    }
1008}