mito2/memtable/bulk/
part.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Bulk part encoder/decoder.
16
17use std::collections::VecDeque;
18use std::sync::Arc;
19
20use api::v1::Mutation;
21use bytes::Bytes;
22use common_time::timestamp::TimeUnit;
23use datafusion::arrow::array::{TimestampNanosecondArray, UInt64Builder};
24use datatypes::arrow;
25use datatypes::arrow::array::{
26    Array, ArrayRef, BinaryBuilder, DictionaryArray, RecordBatch, TimestampMicrosecondArray,
27    TimestampMillisecondArray, TimestampSecondArray, UInt32Array, UInt64Array, UInt8Array,
28    UInt8Builder,
29};
30use datatypes::arrow::compute::TakeOptions;
31use datatypes::arrow::datatypes::SchemaRef;
32use datatypes::arrow_array::BinaryArray;
33use datatypes::data_type::DataType;
34use datatypes::prelude::{MutableVector, ScalarVectorBuilder, Vector};
35use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions};
36use parquet::arrow::ArrowWriter;
37use parquet::data_type::AsBytes;
38use parquet::file::metadata::ParquetMetaData;
39use parquet::file::properties::WriterProperties;
40use snafu::ResultExt;
41use store_api::metadata::RegionMetadataRef;
42use store_api::storage::{ColumnId, SequenceNumber};
43use table::predicate::Predicate;
44
45use crate::error;
46use crate::error::{ComputeArrowSnafu, EncodeMemtableSnafu, NewRecordBatchSnafu, Result};
47use crate::memtable::bulk::context::BulkIterContextRef;
48use crate::memtable::bulk::part_reader::BulkPartIter;
49use crate::memtable::key_values::KeyValuesRef;
50use crate::memtable::BoxedBatchIterator;
51use crate::row_converter::{DensePrimaryKeyCodec, PrimaryKeyCodec, PrimaryKeyCodecExt};
52use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
53use crate::sst::parquet::helper::parse_parquet_metadata;
54use crate::sst::to_sst_arrow_schema;
55
56#[derive(Debug)]
57pub struct BulkPart {
58    data: Bytes,
59    metadata: BulkPartMeta,
60}
61
62impl BulkPart {
63    pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
64        Self { data, metadata }
65    }
66
67    pub(crate) fn metadata(&self) -> &BulkPartMeta {
68        &self.metadata
69    }
70
71    pub(crate) fn read(
72        &self,
73        context: BulkIterContextRef,
74        sequence: Option<SequenceNumber>,
75    ) -> Result<Option<BoxedBatchIterator>> {
76        // use predicate to find row groups to read.
77        let row_groups_to_read = context.row_groups_to_read(&self.metadata.parquet_metadata);
78
79        if row_groups_to_read.is_empty() {
80            // All row groups are filtered.
81            return Ok(None);
82        }
83
84        let iter = BulkPartIter::try_new(
85            context,
86            row_groups_to_read,
87            self.metadata.parquet_metadata.clone(),
88            self.data.clone(),
89            sequence,
90        )?;
91        Ok(Some(Box::new(iter) as BoxedBatchIterator))
92    }
93}
94
95#[derive(Debug)]
96pub struct BulkPartMeta {
97    /// Total rows in part.
98    pub num_rows: usize,
99    /// Max timestamp in part.
100    pub max_timestamp: i64,
101    /// Min timestamp in part.
102    pub min_timestamp: i64,
103    /// Part file metadata.
104    pub parquet_metadata: Arc<ParquetMetaData>,
105    /// Part region schema.
106    pub region_metadata: RegionMetadataRef,
107}
108
109pub struct BulkPartEncoder {
110    metadata: RegionMetadataRef,
111    pk_encoder: DensePrimaryKeyCodec,
112    row_group_size: usize,
113    dedup: bool,
114    writer_props: Option<WriterProperties>,
115}
116
117impl BulkPartEncoder {
118    pub(crate) fn new(
119        metadata: RegionMetadataRef,
120        dedup: bool,
121        row_group_size: usize,
122    ) -> BulkPartEncoder {
123        let codec = DensePrimaryKeyCodec::new(&metadata);
124        let writer_props = Some(
125            WriterProperties::builder()
126                .set_write_batch_size(row_group_size)
127                .set_max_row_group_size(row_group_size)
128                .build(),
129        );
130        Self {
131            metadata,
132            pk_encoder: codec,
133            row_group_size,
134            dedup,
135            writer_props,
136        }
137    }
138}
139
140impl BulkPartEncoder {
141    /// Encodes mutations to a [BulkPart], returns true if encoded data has been written to `dest`.
142    fn encode_mutations(&self, mutations: &[Mutation]) -> Result<Option<BulkPart>> {
143        let Some((arrow_record_batch, min_ts, max_ts)) =
144            mutations_to_record_batch(mutations, &self.metadata, &self.pk_encoder, self.dedup)?
145        else {
146            return Ok(None);
147        };
148
149        let mut buf = Vec::with_capacity(4096);
150        let arrow_schema = arrow_record_batch.schema();
151
152        let file_metadata = {
153            let mut writer =
154                ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
155                    .context(EncodeMemtableSnafu)?;
156            writer
157                .write(&arrow_record_batch)
158                .context(EncodeMemtableSnafu)?;
159            writer.finish().context(EncodeMemtableSnafu)?
160        };
161
162        let buf = Bytes::from(buf);
163        let parquet_metadata = Arc::new(parse_parquet_metadata(file_metadata)?);
164
165        Ok(Some(BulkPart {
166            data: buf,
167            metadata: BulkPartMeta {
168                num_rows: arrow_record_batch.num_rows(),
169                max_timestamp: max_ts,
170                min_timestamp: min_ts,
171                parquet_metadata,
172                region_metadata: self.metadata.clone(),
173            },
174        }))
175    }
176}
177
178/// Converts mutations to record batches.
179fn mutations_to_record_batch(
180    mutations: &[Mutation],
181    metadata: &RegionMetadataRef,
182    pk_encoder: &DensePrimaryKeyCodec,
183    dedup: bool,
184) -> Result<Option<(RecordBatch, i64, i64)>> {
185    let total_rows: usize = mutations
186        .iter()
187        .map(|m| m.rows.as_ref().map(|r| r.rows.len()).unwrap_or(0))
188        .sum();
189
190    if total_rows == 0 {
191        return Ok(None);
192    }
193
194    let mut pk_builder = BinaryBuilder::with_capacity(total_rows, 0);
195
196    let mut ts_vector: Box<dyn MutableVector> = metadata
197        .time_index_column()
198        .column_schema
199        .data_type
200        .create_mutable_vector(total_rows);
201    let mut sequence_builder = UInt64Builder::with_capacity(total_rows);
202    let mut op_type_builder = UInt8Builder::with_capacity(total_rows);
203
204    let mut field_builders: Vec<Box<dyn MutableVector>> = metadata
205        .field_columns()
206        .map(|f| f.column_schema.data_type.create_mutable_vector(total_rows))
207        .collect();
208
209    let mut pk_buffer = vec![];
210    for m in mutations {
211        let Some(key_values) = KeyValuesRef::new(metadata, m) else {
212            continue;
213        };
214
215        for row in key_values.iter() {
216            pk_buffer.clear();
217            pk_encoder.encode_to_vec(row.primary_keys(), &mut pk_buffer)?;
218            pk_builder.append_value(pk_buffer.as_bytes());
219            ts_vector.push_value_ref(row.timestamp());
220            sequence_builder.append_value(row.sequence());
221            op_type_builder.append_value(row.op_type() as u8);
222            for (builder, field) in field_builders.iter_mut().zip(row.fields()) {
223                builder.push_value_ref(field);
224            }
225        }
226    }
227
228    let arrow_schema = to_sst_arrow_schema(metadata);
229    // safety: timestamp column must be valid, and values must not be None.
230    let timestamp_unit = metadata
231        .time_index_column()
232        .column_schema
233        .data_type
234        .as_timestamp()
235        .unwrap()
236        .unit();
237    let sorter = ArraysSorter {
238        encoded_primary_keys: pk_builder.finish(),
239        timestamp_unit,
240        timestamp: ts_vector.to_vector().to_arrow_array(),
241        sequence: sequence_builder.finish(),
242        op_type: op_type_builder.finish(),
243        fields: field_builders
244            .iter_mut()
245            .map(|f| f.to_vector().to_arrow_array()),
246        dedup,
247        arrow_schema,
248    };
249
250    sorter.sort().map(Some)
251}
252
253struct ArraysSorter<I> {
254    encoded_primary_keys: BinaryArray,
255    timestamp_unit: TimeUnit,
256    timestamp: ArrayRef,
257    sequence: UInt64Array,
258    op_type: UInt8Array,
259    fields: I,
260    dedup: bool,
261    arrow_schema: SchemaRef,
262}
263
264impl<I> ArraysSorter<I>
265where
266    I: Iterator<Item = ArrayRef>,
267{
268    /// Converts arrays to record batch.
269    fn sort(self) -> Result<(RecordBatch, i64, i64)> {
270        debug_assert!(!self.timestamp.is_empty());
271        debug_assert!(self.timestamp.len() == self.sequence.len());
272        debug_assert!(self.timestamp.len() == self.op_type.len());
273        debug_assert!(self.timestamp.len() == self.encoded_primary_keys.len());
274
275        let timestamp_iter = timestamp_array_to_iter(self.timestamp_unit, &self.timestamp);
276        let (mut min_timestamp, mut max_timestamp) = (i64::MAX, i64::MIN);
277        let mut to_sort = self
278            .encoded_primary_keys
279            .iter()
280            .zip(timestamp_iter)
281            .zip(self.sequence.iter())
282            .map(|((pk, timestamp), sequence)| {
283                max_timestamp = max_timestamp.max(*timestamp);
284                min_timestamp = min_timestamp.min(*timestamp);
285                (pk, timestamp, sequence)
286            })
287            .enumerate()
288            .collect::<Vec<_>>();
289
290        to_sort.sort_unstable_by(|(_, (l_pk, l_ts, l_seq)), (_, (r_pk, r_ts, r_seq))| {
291            l_pk.cmp(r_pk)
292                .then(l_ts.cmp(r_ts))
293                .then(l_seq.cmp(r_seq).reverse())
294        });
295
296        if self.dedup {
297            // Dedup by timestamps while ignore sequence.
298            to_sort.dedup_by(|(_, (l_pk, l_ts, _)), (_, (r_pk, r_ts, _))| {
299                l_pk == r_pk && l_ts == r_ts
300            });
301        }
302
303        let indices = UInt32Array::from_iter_values(to_sort.iter().map(|v| v.0 as u32));
304
305        let pk_dictionary = Arc::new(binary_array_to_dictionary(
306            // safety: pk must be BinaryArray
307            arrow::compute::take(
308                &self.encoded_primary_keys,
309                &indices,
310                Some(TakeOptions {
311                    check_bounds: false,
312                }),
313            )
314            .context(ComputeArrowSnafu)?
315            .as_any()
316            .downcast_ref::<BinaryArray>()
317            .unwrap(),
318        )?) as ArrayRef;
319
320        let mut arrays = Vec::with_capacity(self.arrow_schema.fields.len());
321        for arr in self.fields {
322            arrays.push(
323                arrow::compute::take(
324                    &arr,
325                    &indices,
326                    Some(TakeOptions {
327                        check_bounds: false,
328                    }),
329                )
330                .context(ComputeArrowSnafu)?,
331            );
332        }
333
334        let timestamp = arrow::compute::take(
335            &self.timestamp,
336            &indices,
337            Some(TakeOptions {
338                check_bounds: false,
339            }),
340        )
341        .context(ComputeArrowSnafu)?;
342
343        arrays.push(timestamp);
344        arrays.push(pk_dictionary);
345        arrays.push(
346            arrow::compute::take(
347                &self.sequence,
348                &indices,
349                Some(TakeOptions {
350                    check_bounds: false,
351                }),
352            )
353            .context(ComputeArrowSnafu)?,
354        );
355
356        arrays.push(
357            arrow::compute::take(
358                &self.op_type,
359                &indices,
360                Some(TakeOptions {
361                    check_bounds: false,
362                }),
363            )
364            .context(ComputeArrowSnafu)?,
365        );
366
367        let batch = RecordBatch::try_new(self.arrow_schema, arrays).context(NewRecordBatchSnafu)?;
368        Ok((batch, min_timestamp, max_timestamp))
369    }
370}
371
372/// Converts timestamp array to an iter of i64 values.
373fn timestamp_array_to_iter(
374    timestamp_unit: TimeUnit,
375    timestamp: &ArrayRef,
376) -> impl Iterator<Item = &i64> {
377    match timestamp_unit {
378        // safety: timestamp column must be valid.
379        TimeUnit::Second => timestamp
380            .as_any()
381            .downcast_ref::<TimestampSecondArray>()
382            .unwrap()
383            .values()
384            .iter(),
385        TimeUnit::Millisecond => timestamp
386            .as_any()
387            .downcast_ref::<TimestampMillisecondArray>()
388            .unwrap()
389            .values()
390            .iter(),
391        TimeUnit::Microsecond => timestamp
392            .as_any()
393            .downcast_ref::<TimestampMicrosecondArray>()
394            .unwrap()
395            .values()
396            .iter(),
397        TimeUnit::Nanosecond => timestamp
398            .as_any()
399            .downcast_ref::<TimestampNanosecondArray>()
400            .unwrap()
401            .values()
402            .iter(),
403    }
404}
405
406/// Converts a **sorted** [BinaryArray] to [DictionaryArray].
407fn binary_array_to_dictionary(input: &BinaryArray) -> Result<PrimaryKeyArray> {
408    if input.is_empty() {
409        return Ok(DictionaryArray::new(
410            UInt32Array::from(Vec::<u32>::new()),
411            Arc::new(BinaryArray::from_vec(vec![])) as ArrayRef,
412        ));
413    }
414    let mut keys = Vec::with_capacity(16);
415    let mut values = BinaryBuilder::new();
416    let mut prev: usize = 0;
417    keys.push(prev as u32);
418    values.append_value(input.value(prev));
419
420    for current_bytes in input.iter().skip(1) {
421        // safety: encoded pk must present.
422        let current_bytes = current_bytes.unwrap();
423        let prev_bytes = input.value(prev);
424        if current_bytes != prev_bytes {
425            values.append_value(current_bytes);
426            prev += 1;
427        }
428        keys.push(prev as u32);
429    }
430
431    Ok(DictionaryArray::new(
432        UInt32Array::from(keys),
433        Arc::new(values.finish()) as ArrayRef,
434    ))
435}
436
437#[cfg(test)]
438mod tests {
439    use std::collections::VecDeque;
440
441    use datafusion_common::ScalarValue;
442    use datatypes::prelude::{ScalarVector, Value};
443    use datatypes::vectors::{Float64Vector, TimestampMillisecondVector};
444
445    use super::*;
446    use crate::memtable::bulk::context::BulkIterContext;
447    use crate::sst::parquet::format::ReadFormat;
448    use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
449
450    fn check_binary_array_to_dictionary(
451        input: &[&[u8]],
452        expected_keys: &[u32],
453        expected_values: &[&[u8]],
454    ) {
455        let input = BinaryArray::from_iter_values(input.iter());
456        let array = binary_array_to_dictionary(&input).unwrap();
457        assert_eq!(
458            &expected_keys,
459            &array.keys().iter().map(|v| v.unwrap()).collect::<Vec<_>>()
460        );
461        assert_eq!(
462            expected_values,
463            &array
464                .values()
465                .as_any()
466                .downcast_ref::<BinaryArray>()
467                .unwrap()
468                .iter()
469                .map(|v| v.unwrap())
470                .collect::<Vec<_>>()
471        );
472    }
473
474    #[test]
475    fn test_binary_array_to_dictionary() {
476        check_binary_array_to_dictionary(&[], &[], &[]);
477
478        check_binary_array_to_dictionary(&["a".as_bytes()], &[0], &["a".as_bytes()]);
479
480        check_binary_array_to_dictionary(
481            &["a".as_bytes(), "a".as_bytes()],
482            &[0, 0],
483            &["a".as_bytes()],
484        );
485
486        check_binary_array_to_dictionary(
487            &["a".as_bytes(), "a".as_bytes(), "b".as_bytes()],
488            &[0, 0, 1],
489            &["a".as_bytes(), "b".as_bytes()],
490        );
491
492        check_binary_array_to_dictionary(
493            &[
494                "a".as_bytes(),
495                "a".as_bytes(),
496                "b".as_bytes(),
497                "c".as_bytes(),
498            ],
499            &[0, 0, 1, 2],
500            &["a".as_bytes(), "b".as_bytes(), "c".as_bytes()],
501        );
502    }
503
504    struct MutationInput<'a> {
505        k0: &'a str,
506        k1: u32,
507        timestamps: &'a [i64],
508        v1: &'a [Option<f64>],
509        sequence: u64,
510    }
511
512    #[derive(Debug, PartialOrd, PartialEq)]
513    struct BatchOutput<'a> {
514        pk_values: &'a [Value],
515        timestamps: &'a [i64],
516        v1: &'a [Option<f64>],
517    }
518
519    fn check_mutations_to_record_batches(
520        input: &[MutationInput],
521        expected: &[BatchOutput],
522        expected_timestamp: (i64, i64),
523        dedup: bool,
524    ) {
525        let metadata = metadata_for_test();
526        let mutations = input
527            .iter()
528            .map(|m| {
529                build_key_values_with_ts_seq_values(
530                    &metadata,
531                    m.k0.to_string(),
532                    m.k1,
533                    m.timestamps.iter().copied(),
534                    m.v1.iter().copied(),
535                    m.sequence,
536                )
537                .mutation
538            })
539            .collect::<Vec<_>>();
540        let total_rows: usize = mutations
541            .iter()
542            .flat_map(|m| m.rows.iter())
543            .map(|r| r.rows.len())
544            .sum();
545
546        let pk_encoder = DensePrimaryKeyCodec::new(&metadata);
547
548        let (batch, _, _) = mutations_to_record_batch(&mutations, &metadata, &pk_encoder, dedup)
549            .unwrap()
550            .unwrap();
551        let read_format = ReadFormat::new_with_all_columns(metadata.clone());
552        let mut batches = VecDeque::new();
553        read_format
554            .convert_record_batch(&batch, &mut batches)
555            .unwrap();
556        if !dedup {
557            assert_eq!(
558                total_rows,
559                batches.iter().map(|b| { b.num_rows() }).sum::<usize>()
560            );
561        }
562        let batch_values = batches
563            .into_iter()
564            .map(|b| {
565                let pk_values = pk_encoder.decode(b.primary_key()).unwrap().into_dense();
566                let timestamps = b
567                    .timestamps()
568                    .as_any()
569                    .downcast_ref::<TimestampMillisecondVector>()
570                    .unwrap()
571                    .iter_data()
572                    .map(|v| v.unwrap().0.value())
573                    .collect::<Vec<_>>();
574                let float_values = b.fields()[1]
575                    .data
576                    .as_any()
577                    .downcast_ref::<Float64Vector>()
578                    .unwrap()
579                    .iter_data()
580                    .collect::<Vec<_>>();
581
582                (pk_values, timestamps, float_values)
583            })
584            .collect::<Vec<_>>();
585        assert_eq!(expected.len(), batch_values.len());
586
587        for idx in 0..expected.len() {
588            assert_eq!(expected[idx].pk_values, &batch_values[idx].0);
589            assert_eq!(expected[idx].timestamps, &batch_values[idx].1);
590            assert_eq!(expected[idx].v1, &batch_values[idx].2);
591        }
592    }
593
594    #[test]
595    fn test_mutations_to_record_batch() {
596        check_mutations_to_record_batches(
597            &[MutationInput {
598                k0: "a",
599                k1: 0,
600                timestamps: &[0],
601                v1: &[Some(0.1)],
602                sequence: 0,
603            }],
604            &[BatchOutput {
605                pk_values: &[Value::String("a".into()), Value::UInt32(0)],
606                timestamps: &[0],
607                v1: &[Some(0.1)],
608            }],
609            (0, 0),
610            true,
611        );
612
613        check_mutations_to_record_batches(
614            &[
615                MutationInput {
616                    k0: "a",
617                    k1: 0,
618                    timestamps: &[0],
619                    v1: &[Some(0.1)],
620                    sequence: 0,
621                },
622                MutationInput {
623                    k0: "b",
624                    k1: 0,
625                    timestamps: &[0],
626                    v1: &[Some(0.0)],
627                    sequence: 0,
628                },
629                MutationInput {
630                    k0: "a",
631                    k1: 0,
632                    timestamps: &[1],
633                    v1: &[Some(0.2)],
634                    sequence: 1,
635                },
636                MutationInput {
637                    k0: "a",
638                    k1: 1,
639                    timestamps: &[1],
640                    v1: &[Some(0.3)],
641                    sequence: 2,
642                },
643            ],
644            &[
645                BatchOutput {
646                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
647                    timestamps: &[0, 1],
648                    v1: &[Some(0.1), Some(0.2)],
649                },
650                BatchOutput {
651                    pk_values: &[Value::String("a".into()), Value::UInt32(1)],
652                    timestamps: &[1],
653                    v1: &[Some(0.3)],
654                },
655                BatchOutput {
656                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
657                    timestamps: &[0],
658                    v1: &[Some(0.0)],
659                },
660            ],
661            (0, 1),
662            true,
663        );
664
665        check_mutations_to_record_batches(
666            &[
667                MutationInput {
668                    k0: "a",
669                    k1: 0,
670                    timestamps: &[0],
671                    v1: &[Some(0.1)],
672                    sequence: 0,
673                },
674                MutationInput {
675                    k0: "b",
676                    k1: 0,
677                    timestamps: &[0],
678                    v1: &[Some(0.0)],
679                    sequence: 0,
680                },
681                MutationInput {
682                    k0: "a",
683                    k1: 0,
684                    timestamps: &[0],
685                    v1: &[Some(0.2)],
686                    sequence: 1,
687                },
688            ],
689            &[
690                BatchOutput {
691                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
692                    timestamps: &[0],
693                    v1: &[Some(0.2)],
694                },
695                BatchOutput {
696                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
697                    timestamps: &[0],
698                    v1: &[Some(0.0)],
699                },
700            ],
701            (0, 0),
702            true,
703        );
704        check_mutations_to_record_batches(
705            &[
706                MutationInput {
707                    k0: "a",
708                    k1: 0,
709                    timestamps: &[0],
710                    v1: &[Some(0.1)],
711                    sequence: 0,
712                },
713                MutationInput {
714                    k0: "b",
715                    k1: 0,
716                    timestamps: &[0],
717                    v1: &[Some(0.0)],
718                    sequence: 0,
719                },
720                MutationInput {
721                    k0: "a",
722                    k1: 0,
723                    timestamps: &[0],
724                    v1: &[Some(0.2)],
725                    sequence: 1,
726                },
727            ],
728            &[
729                BatchOutput {
730                    pk_values: &[Value::String("a".into()), Value::UInt32(0)],
731                    timestamps: &[0, 0],
732                    v1: &[Some(0.2), Some(0.1)],
733                },
734                BatchOutput {
735                    pk_values: &[Value::String("b".into()), Value::UInt32(0)],
736                    timestamps: &[0],
737                    v1: &[Some(0.0)],
738                },
739            ],
740            (0, 0),
741            false,
742        );
743    }
744
745    fn encode(input: &[MutationInput]) -> BulkPart {
746        let metadata = metadata_for_test();
747        let mutations = input
748            .iter()
749            .map(|m| {
750                build_key_values_with_ts_seq_values(
751                    &metadata,
752                    m.k0.to_string(),
753                    m.k1,
754                    m.timestamps.iter().copied(),
755                    m.v1.iter().copied(),
756                    m.sequence,
757                )
758                .mutation
759            })
760            .collect::<Vec<_>>();
761        let encoder = BulkPartEncoder::new(metadata, true, 1024);
762        encoder.encode_mutations(&mutations).unwrap().unwrap()
763    }
764
765    #[test]
766    fn test_write_and_read_part_projection() {
767        let part = encode(&[
768            MutationInput {
769                k0: "a",
770                k1: 0,
771                timestamps: &[1],
772                v1: &[Some(0.1)],
773                sequence: 0,
774            },
775            MutationInput {
776                k0: "b",
777                k1: 0,
778                timestamps: &[1],
779                v1: &[Some(0.0)],
780                sequence: 0,
781            },
782            MutationInput {
783                k0: "a",
784                k1: 0,
785                timestamps: &[2],
786                v1: &[Some(0.2)],
787                sequence: 1,
788            },
789        ]);
790
791        let projection = &[4u32];
792
793        let mut reader = part
794            .read(
795                Arc::new(BulkIterContext::new(
796                    part.metadata.region_metadata.clone(),
797                    &Some(projection.as_slice()),
798                    None,
799                )),
800                None,
801            )
802            .unwrap()
803            .expect("expect at least one row group");
804
805        let mut total_rows_read = 0;
806        let mut field = vec![];
807        for res in reader {
808            let batch = res.unwrap();
809            assert_eq!(1, batch.fields().len());
810            assert_eq!(4, batch.fields()[0].column_id);
811            field.extend(
812                batch.fields()[0]
813                    .data
814                    .as_any()
815                    .downcast_ref::<Float64Vector>()
816                    .unwrap()
817                    .iter_data()
818                    .map(|v| v.unwrap()),
819            );
820            total_rows_read += batch.num_rows();
821        }
822        assert_eq!(3, total_rows_read);
823        assert_eq!(vec![0.1, 0.2, 0.0], field);
824    }
825
826    fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> BulkPart {
827        let metadata = metadata_for_test();
828        let mutations = key_values
829            .into_iter()
830            .map(|(k0, k1, (start, end), sequence)| {
831                let ts = (start..end);
832                let v1 = (start..end).map(|_| None);
833                build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
834                    .mutation
835            })
836            .collect::<Vec<_>>();
837        let encoder = BulkPartEncoder::new(metadata, true, 100);
838        encoder.encode_mutations(&mutations).unwrap().unwrap()
839    }
840
841    fn check_prune_row_group(part: &BulkPart, predicate: Option<Predicate>, expected_rows: usize) {
842        let context = Arc::new(BulkIterContext::new(
843            part.metadata.region_metadata.clone(),
844            &None,
845            predicate,
846        ));
847        let mut reader = part
848            .read(context, None)
849            .unwrap()
850            .expect("expect at least one row group");
851        let mut total_rows_read = 0;
852        for res in reader {
853            let batch = res.unwrap();
854            total_rows_read += batch.num_rows();
855        }
856        // Should only read row group 1.
857        assert_eq!(expected_rows, total_rows_read);
858    }
859
860    #[test]
861    fn test_prune_row_groups() {
862        let part = prepare(vec![
863            ("a", 0, (0, 40), 1),
864            ("a", 1, (0, 60), 1),
865            ("b", 0, (0, 100), 2),
866            ("b", 1, (100, 180), 3),
867            ("b", 1, (180, 210), 4),
868        ]);
869
870        let context = Arc::new(BulkIterContext::new(
871            part.metadata.region_metadata.clone(),
872            &None,
873            Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
874                datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
875            )])),
876        ));
877        assert!(part.read(context, None).unwrap().is_none());
878
879        check_prune_row_group(&part, None, 310);
880
881        check_prune_row_group(
882            &part,
883            Some(Predicate::new(vec![
884                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
885                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
886            ])),
887            40,
888        );
889
890        check_prune_row_group(
891            &part,
892            Some(Predicate::new(vec![
893                datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
894                datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
895            ])),
896            60,
897        );
898
899        check_prune_row_group(
900            &part,
901            Some(Predicate::new(vec![
902                datafusion_expr::col("k0").eq(datafusion_expr::lit("a"))
903            ])),
904            100,
905        );
906
907        check_prune_row_group(
908            &part,
909            Some(Predicate::new(vec![
910                datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
911                datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
912            ])),
913            100,
914        );
915
916        /// Predicates over field column can do precise filtering.
917        check_prune_row_group(
918            &part,
919            Some(Predicate::new(vec![
920                datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64))
921            ])),
922            1,
923        );
924    }
925}