mito2/memtable/partition_tree/
data.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Data part of a shard.
16
17use std::cmp::{Ordering, Reverse};
18use std::fmt::{Debug, Formatter};
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use bytes::Bytes;
24use datatypes::arrow;
25use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
26use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
27use datatypes::data_type::DataType;
28use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
29use datatypes::schema::ColumnSchema;
30use datatypes::types::TimestampType;
31use datatypes::vectors::{
32    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33    TimestampSecondVector, UInt8Vector, UInt8VectorBuilder, UInt16Vector, UInt16VectorBuilder,
34    UInt64Vector, UInt64VectorBuilder,
35};
36use mito_codec::key_values::KeyValue;
37use parquet::arrow::ArrowWriter;
38use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::properties::{EnabledStatistics, WriterProperties};
41use parquet::schema::types::ColumnPath;
42use snafu::ResultExt;
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
45
46use crate::error;
47use crate::error::Result;
48use crate::memtable::partition_tree::PkIndex;
49use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
50use crate::metrics::{
51    PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
52};
53
54const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
55
56/// Initial capacity for the data buffer.
57pub(crate) const DATA_INIT_CAP: usize = 8;
58
59/// Range of a data batch.
60#[derive(Debug, Clone, Copy)]
61pub(crate) struct DataBatchRange {
62    /// Primary key index of this batch.
63    pub(crate) pk_index: PkIndex,
64    /// Start of current primary key inside record batch.
65    pub(crate) start: usize,
66    /// End of current primary key inside record batch.
67    pub(crate) end: usize,
68}
69
70impl DataBatchRange {
71    pub(crate) fn len(&self) -> usize {
72        self.end - self.start
73    }
74}
75
76/// Data part batches returns by `DataParts::read`.
77#[derive(Debug, Clone, Copy)]
78pub struct DataBatch<'a> {
79    /// Record batch of data.
80    rb: &'a RecordBatch,
81    /// Range of current primary key inside record batch
82    range: DataBatchRange,
83}
84
85impl<'a> DataBatch<'a> {
86    pub(crate) fn pk_index(&self) -> PkIndex {
87        self.range.pk_index
88    }
89
90    pub(crate) fn range(&self) -> DataBatchRange {
91        self.range
92    }
93
94    pub(crate) fn slice_record_batch(&self) -> RecordBatch {
95        self.rb.slice(self.range.start, self.range.len())
96    }
97
98    pub(crate) fn first_row(&self) -> (i64, u64) {
99        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
100        let sequence_values = self
101            .rb
102            .column(2)
103            .as_any()
104            .downcast_ref::<UInt64Array>()
105            .unwrap()
106            .values();
107        (
108            ts_values[self.range.start],
109            sequence_values[self.range.start],
110        )
111    }
112
113    pub(crate) fn last_row(&self) -> (i64, u64) {
114        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
115        let sequence_values = self
116            .rb
117            .column(2)
118            .as_any()
119            .downcast_ref::<UInt64Array>()
120            .unwrap()
121            .values();
122        (
123            ts_values[self.range.end - 1],
124            sequence_values[self.range.end - 1],
125        )
126    }
127
128    pub(crate) fn first_key(&self) -> DataBatchKey {
129        let pk_index = self.pk_index();
130        let ts_array = self.rb.column(1);
131
132        // maybe safe the result somewhere.
133        let ts_values = timestamp_array_to_i64_slice(ts_array);
134        let timestamp = ts_values[self.range.start];
135        DataBatchKey {
136            pk_index,
137            timestamp,
138        }
139    }
140
141    pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
142        let DataBatchKey {
143            pk_index,
144            timestamp,
145        } = key;
146        assert_eq!(*pk_index, self.range.pk_index);
147        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
148        let ts_values = &ts_values[self.range.start..self.range.end];
149        ts_values.binary_search(timestamp)
150    }
151
152    pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
153        let start = self.range.start + offset;
154        let end = start + length;
155        DataBatch {
156            rb: self.rb,
157            range: DataBatchRange {
158                pk_index: self.range.pk_index,
159                start,
160                end,
161            },
162        }
163    }
164
165    pub(crate) fn num_rows(&self) -> usize {
166        self.range.len()
167    }
168}
169
170/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
171pub struct DataBuffer {
172    metadata: RegionMetadataRef,
173    /// Schema for data part (primary keys are replaced with pk_index)
174    data_part_schema: SchemaRef,
175    /// Builder for primary key index.
176    pk_index_builder: UInt16VectorBuilder,
177    /// Builder for timestamp column.
178    ts_builder: Box<dyn MutableVector>,
179    /// Builder for sequence column.
180    sequence_builder: UInt64VectorBuilder,
181    /// Builder for op_type column.
182    op_type_builder: UInt8VectorBuilder,
183    /// Builders for field columns.
184    field_builders: Vec<LazyMutableVectorBuilder>,
185
186    dedup: bool,
187}
188
189impl DataBuffer {
190    /// Creates a `DataBuffer` instance with given schema and capacity.
191    pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
192        let ts_builder = metadata
193            .time_index_column()
194            .column_schema
195            .data_type
196            .create_mutable_vector(init_capacity);
197
198        let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
199        let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
200        let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
201
202        let field_builders = metadata
203            .field_columns()
204            .map(|c| LazyMutableVectorBuilder::new(c.column_schema.data_type.clone()))
205            .collect::<Vec<_>>();
206
207        let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
208        Self {
209            metadata,
210            data_part_schema,
211            pk_index_builder: pk_id_builder,
212            ts_builder,
213            sequence_builder,
214            op_type_builder,
215            field_builders,
216            dedup,
217        }
218    }
219
220    /// Writes a row to data buffer.
221    pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
222        self.ts_builder.push_value_ref(&kv.timestamp());
223        self.pk_index_builder.push(Some(pk_index));
224        self.sequence_builder.push(Some(kv.sequence()));
225        self.op_type_builder.push(Some(kv.op_type() as u8));
226
227        debug_assert_eq!(self.field_builders.len(), kv.num_fields());
228
229        for (idx, field) in kv.fields().enumerate() {
230            self.field_builders[idx]
231                .get_or_create_builder(self.ts_builder.len())
232                .push_value_ref(&field);
233        }
234    }
235
236    /// Freezes `DataBuffer` to bytes.
237    /// If `pk_weights` is present, it will be used to sort rows.
238    ///
239    /// `freeze` clears the buffers of builders.
240    pub fn freeze(
241        &mut self,
242        pk_weights: Option<&[u16]>,
243        replace_pk_index: bool,
244    ) -> Result<DataPart> {
245        let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone();
246        let encoder = DataPartEncoder::new(
247            &self.metadata,
248            pk_weights,
249            None,
250            timestamp_col_name,
251            replace_pk_index,
252            self.dedup,
253        );
254        let parts = encoder.write(self)?;
255        Ok(parts)
256    }
257
258    /// Builds a lazily initialized data buffer reader from [DataBuffer]
259    pub fn read(&self) -> Result<DataBufferReaderBuilder> {
260        let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
261            .with_label_values(&["read_data_buffer"])
262            .start_timer();
263
264        let (pk_index, timestamp, sequence, op_type) = (
265            self.pk_index_builder.finish_cloned(),
266            self.ts_builder.to_vector_cloned(),
267            self.sequence_builder.finish_cloned(),
268            self.op_type_builder.finish_cloned(),
269        );
270
271        let mut fields = Vec::with_capacity(self.field_builders.len());
272        for b in self.field_builders.iter() {
273            let field = match b {
274                LazyMutableVectorBuilder::Type(ty) => LazyFieldVector::Type(ty.clone()),
275                LazyMutableVectorBuilder::Builder(builder) => {
276                    LazyFieldVector::Vector(builder.to_vector_cloned())
277                }
278            };
279            fields.push(field);
280        }
281
282        Ok(DataBufferReaderBuilder {
283            schema: self.data_part_schema.clone(),
284            pk_index,
285            timestamp,
286            sequence,
287            op_type,
288            fields,
289            dedup: self.dedup,
290        })
291    }
292
293    /// Returns num of rows in data buffer.
294    pub fn num_rows(&self) -> usize {
295        self.ts_builder.len()
296    }
297
298    /// Returns whether the buffer is empty.
299    pub fn is_empty(&self) -> bool {
300        self.num_rows() == 0
301    }
302}
303
304enum LazyMutableVectorBuilder {
305    Type(ConcreteDataType),
306    Builder(Box<dyn MutableVector>),
307}
308
309impl LazyMutableVectorBuilder {
310    fn new(ty: ConcreteDataType) -> Self {
311        Self::Type(ty)
312    }
313
314    fn get_or_create_builder(&mut self, init_capacity: usize) -> &mut Box<dyn MutableVector> {
315        match self {
316            LazyMutableVectorBuilder::Type(ty) => {
317                let builder = ty.create_mutable_vector(init_capacity);
318                *self = LazyMutableVectorBuilder::Builder(builder);
319                self.get_or_create_builder(init_capacity)
320            }
321            LazyMutableVectorBuilder::Builder(builder) => builder,
322        }
323    }
324}
325
326/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
327/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`.
328/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight.
329fn drain_data_buffer_to_record_batches(
330    schema: SchemaRef,
331    buffer: &mut DataBuffer,
332    pk_weights: Option<&[u16]>,
333    dedup: bool,
334    replace_pk_index: bool,
335) -> Result<RecordBatch> {
336    let num_rows = buffer.ts_builder.len();
337
338    let (pk_index_v, ts_v, sequence_v, op_type_v) = (
339        buffer.pk_index_builder.finish(),
340        buffer.ts_builder.to_vector(),
341        buffer.sequence_builder.finish(),
342        buffer.op_type_builder.finish(),
343    );
344
345    let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
346        pk_weights,
347        pk_index_v,
348        ts_v,
349        sequence_v,
350        op_type_v,
351        replace_pk_index,
352        dedup,
353        buffer.field_builders.len() + 4,
354    )?;
355
356    for b in buffer.field_builders.iter_mut() {
357        let array = match b {
358            LazyMutableVectorBuilder::Type(ty) => {
359                let mut single_null = ty.create_mutable_vector(num_rows);
360                single_null.push_nulls(num_rows);
361                single_null.to_vector().to_arrow_array()
362            }
363            LazyMutableVectorBuilder::Builder(builder) => builder.to_vector().to_arrow_array(),
364        };
365        columns.push(
366            arrow::compute::take(&array, &indices_to_take, None)
367                .context(error::ComputeArrowSnafu)?,
368        );
369    }
370
371    RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
372}
373
374#[allow(clippy::too_many_arguments)]
375fn build_row_sort_indices_and_columns(
376    pk_weights: Option<&[u16]>,
377    pk_index: UInt16Vector,
378    ts: VectorRef,
379    sequence: UInt64Vector,
380    op_type: UInt8Vector,
381    replace_pk_index: bool,
382    dedup: bool,
383    column_num: usize,
384) -> Result<(UInt32Array, Vec<ArrayRef>)> {
385    let mut rows = build_rows_to_sort(pk_weights, &pk_index, &ts, &sequence);
386
387    let pk_array = if replace_pk_index {
388        // replace pk index values with pk weights.
389        Arc::new(UInt16Array::from_iter_values(
390            rows.iter().map(|(_, key)| key.pk_weight),
391        )) as Arc<_>
392    } else {
393        pk_index.to_arrow_array()
394    };
395
396    // sort and dedup
397    rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
398    if dedup {
399        rows.dedup_by(|l, r| l.1.pk_weight == r.1.pk_weight && l.1.timestamp == r.1.timestamp);
400    }
401
402    let indices_to_take = UInt32Array::from_iter_values(rows.iter().map(|(idx, _)| *idx as u32));
403
404    let mut columns = Vec::with_capacity(column_num);
405
406    columns.push(
407        arrow::compute::take(&pk_array, &indices_to_take, None)
408            .context(error::ComputeArrowSnafu)?,
409    );
410
411    columns.push(
412        arrow::compute::take(&ts.to_arrow_array(), &indices_to_take, None)
413            .context(error::ComputeArrowSnafu)?,
414    );
415
416    columns.push(
417        arrow::compute::take(&sequence.as_arrow(), &indices_to_take, None)
418            .context(error::ComputeArrowSnafu)?,
419    );
420
421    columns.push(
422        arrow::compute::take(&op_type.as_arrow(), &indices_to_take, None)
423            .context(error::ComputeArrowSnafu)?,
424    );
425
426    Ok((indices_to_take, columns))
427}
428
429pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
430    use datatypes::arrow::array::{
431        TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
432        TimestampSecondArray,
433    };
434    use datatypes::arrow::datatypes::{DataType, TimeUnit};
435
436    match arr.data_type() {
437        DataType::Timestamp(t, _) => match t {
438            TimeUnit::Second => arr
439                .as_any()
440                .downcast_ref::<TimestampSecondArray>()
441                .unwrap()
442                .values(),
443            TimeUnit::Millisecond => arr
444                .as_any()
445                .downcast_ref::<TimestampMillisecondArray>()
446                .unwrap()
447                .values(),
448            TimeUnit::Microsecond => arr
449                .as_any()
450                .downcast_ref::<TimestampMicrosecondArray>()
451                .unwrap()
452                .values(),
453            TimeUnit::Nanosecond => arr
454                .as_any()
455                .downcast_ref::<TimestampNanosecondArray>()
456                .unwrap()
457                .values(),
458        },
459        _ => unreachable!(),
460    }
461}
462
463enum LazyFieldVector {
464    Type(ConcreteDataType),
465    Vector(VectorRef),
466}
467
468pub(crate) struct DataBufferReaderBuilder {
469    schema: SchemaRef,
470    pk_index: UInt16Vector,
471    timestamp: VectorRef,
472    sequence: UInt64Vector,
473    op_type: UInt8Vector,
474    fields: Vec<LazyFieldVector>,
475    dedup: bool,
476}
477
478impl DataBufferReaderBuilder {
479    fn build_record_batch(self, pk_weights: Option<&[u16]>) -> Result<RecordBatch> {
480        let num_rows = self.timestamp.len();
481        let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
482            pk_weights,
483            self.pk_index,
484            self.timestamp,
485            self.sequence,
486            self.op_type,
487            // replace_pk_index is always set to false since:
488            // - for DataBuffer in ShardBuilder, pk dict is not frozen
489            // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
490            false,
491            self.dedup,
492            self.fields.len() + 4,
493        )?;
494
495        for b in self.fields.iter() {
496            let array = match b {
497                LazyFieldVector::Type(ty) => {
498                    let mut single_null = ty.create_mutable_vector(num_rows);
499                    single_null.push_nulls(num_rows);
500                    single_null.to_vector().to_arrow_array()
501                }
502                LazyFieldVector::Vector(vector) => vector.to_arrow_array(),
503            };
504            columns.push(
505                arrow::compute::take(&array, &indices_to_take, None)
506                    .context(error::ComputeArrowSnafu)?,
507            );
508        }
509        RecordBatch::try_new(self.schema, columns).context(error::NewRecordBatchSnafu)
510    }
511
512    pub fn build(self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
513        self.build_record_batch(pk_weights)
514            .and_then(DataBufferReader::new)
515    }
516}
517
518#[derive(Debug)]
519pub(crate) struct DataBufferReader {
520    batch: RecordBatch,
521    offset: usize,
522    current_range: Option<DataBatchRange>,
523    elapsed_time: Duration,
524}
525
526impl Drop for DataBufferReader {
527    fn drop(&mut self) {
528        PARTITION_TREE_READ_STAGE_ELAPSED
529            .with_label_values(&["read_data_buffer"])
530            .observe(self.elapsed_time.as_secs_f64())
531    }
532}
533
534impl DataBufferReader {
535    pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
536        let mut reader = Self {
537            batch,
538            offset: 0,
539            current_range: None,
540            elapsed_time: Duration::default(),
541        };
542        reader.next()?; // fill data batch for comparison and merge.
543        Ok(reader)
544    }
545
546    pub(crate) fn is_valid(&self) -> bool {
547        self.current_range.is_some()
548    }
549
550    /// Returns current data batch.
551    /// # Panics
552    /// If Current reader is exhausted.
553    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
554        let range = self.current_range.unwrap();
555        DataBatch {
556            rb: &self.batch,
557            range,
558        }
559    }
560
561    /// Advances reader to next data batch.
562    pub(crate) fn next(&mut self) -> Result<()> {
563        if self.offset >= self.batch.num_rows() {
564            self.current_range = None;
565            return Ok(());
566        }
567        let start = Instant::now();
568        let pk_index_array = pk_index_array(&self.batch);
569        if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
570            self.offset = range.end;
571            self.current_range = Some(DataBatchRange {
572                pk_index: next_pk,
573                start: range.start,
574                end: range.end,
575            });
576        } else {
577            self.current_range = None;
578        }
579        self.elapsed_time += start.elapsed();
580        Ok(())
581    }
582}
583
584/// Gets `pk_index` array from record batch.
585/// # Panics
586/// If pk index column is not the first column or the type is not `UInt16Array`.
587fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
588    batch
589        .column(0)
590        .as_any()
591        .downcast_ref::<UInt16Array>()
592        .unwrap()
593}
594
595/// Searches for next pk index, and it's offset range in a sorted `UInt16Array`.
596fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
597    let num_rows = array.len();
598    if start >= num_rows {
599        return None;
600    }
601
602    let values = array.values();
603    let next_pk = values[start];
604
605    for idx in start..num_rows {
606        if values[idx] != next_pk {
607            return Some((next_pk, start..idx));
608        }
609    }
610    Some((next_pk, start..num_rows))
611}
612
613#[derive(Eq, PartialEq)]
614struct InnerKey {
615    pk_weight: u16,
616    timestamp: i64,
617    sequence: u64,
618}
619
620impl PartialOrd for InnerKey {
621    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
622        Some(self.cmp(other))
623    }
624}
625
626impl Ord for InnerKey {
627    fn cmp(&self, other: &Self) -> Ordering {
628        (self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
629            other.pk_weight,
630            other.timestamp,
631            Reverse(other.sequence),
632        ))
633    }
634}
635
636fn build_rows_to_sort(
637    pk_weights: Option<&[u16]>,
638    pk_index: &UInt16Vector,
639    ts: &VectorRef,
640    sequence: &UInt64Vector,
641) -> Vec<(usize, InnerKey)> {
642    let ts_values = match ts.data_type() {
643        ConcreteDataType::Timestamp(t) => match t {
644            TimestampType::Second(_) => ts
645                .as_any()
646                .downcast_ref::<TimestampSecondVector>()
647                .unwrap()
648                .as_arrow()
649                .values(),
650            TimestampType::Millisecond(_) => ts
651                .as_any()
652                .downcast_ref::<TimestampMillisecondVector>()
653                .unwrap()
654                .as_arrow()
655                .values(),
656            TimestampType::Microsecond(_) => ts
657                .as_any()
658                .downcast_ref::<TimestampMicrosecondVector>()
659                .unwrap()
660                .as_arrow()
661                .values(),
662            TimestampType::Nanosecond(_) => ts
663                .as_any()
664                .downcast_ref::<TimestampNanosecondVector>()
665                .unwrap()
666                .as_arrow()
667                .values(),
668        },
669        other => unreachable!("Unexpected type {:?}", other),
670    };
671    let pk_index_values = pk_index.as_arrow().values();
672    let sequence_values = sequence.as_arrow().values();
673    debug_assert_eq!(ts_values.len(), pk_index_values.len());
674    debug_assert_eq!(ts_values.len(), sequence_values.len());
675
676    ts_values
677        .iter()
678        .zip(pk_index_values.iter())
679        .zip(sequence_values.iter())
680        .enumerate()
681        .map(|(idx, ((timestamp, pk_index), sequence))| {
682            let pk_weight = if let Some(weights) = pk_weights {
683                weights[*pk_index as usize] // if pk_weights is present, sort according to weight.
684            } else {
685                *pk_index // otherwise pk_index has already been replaced by weights.
686            };
687            (
688                idx,
689                InnerKey {
690                    timestamp: *timestamp,
691                    pk_weight,
692                    sequence: *sequence,
693                },
694            )
695        })
696        .collect()
697}
698
699fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
700    use datatypes::arrow::datatypes::DataType;
701    let ColumnSchema {
702        name: ts_name,
703        data_type: ts_type,
704        ..
705    } = &schema.time_index_column().column_schema;
706
707    let mut fields = vec![
708        Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
709        Field::new(ts_name, ts_type.as_arrow_type(), false),
710        Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
711        Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
712    ];
713
714    fields.extend(schema.field_columns().map(|c| {
715        Field::new(
716            &c.column_schema.name,
717            c.column_schema.data_type.as_arrow_type(),
718            c.column_schema.is_nullable(),
719        )
720    }));
721
722    Arc::new(Schema::new(fields))
723}
724
725struct DataPartEncoder<'a> {
726    schema: SchemaRef,
727    pk_weights: Option<&'a [u16]>,
728    row_group_size: Option<usize>,
729    timestamp_column_name: String,
730    replace_pk_index: bool,
731    dedup: bool,
732}
733
734impl<'a> DataPartEncoder<'a> {
735    pub fn new(
736        metadata: &RegionMetadataRef,
737        pk_weights: Option<&'a [u16]>,
738        row_group_size: Option<usize>,
739        timestamp_column_name: String,
740        replace_pk_index: bool,
741        dedup: bool,
742    ) -> DataPartEncoder<'a> {
743        let schema = memtable_schema_to_encoded_schema(metadata);
744        Self {
745            schema,
746            pk_weights,
747            row_group_size,
748            timestamp_column_name,
749            replace_pk_index,
750            dedup,
751        }
752    }
753
754    // todo(hl): more customized config according to region options.
755    fn writer_props(self) -> WriterProperties {
756        let mut builder = WriterProperties::builder();
757        if let Some(row_group_size) = self.row_group_size {
758            builder = builder.set_max_row_group_size(row_group_size)
759        }
760
761        let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);
762        let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]);
763        let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
764        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
765
766        builder = builder
767            .set_compression(Compression::ZSTD(ZstdLevel::default()))
768            .set_statistics_enabled(EnabledStatistics::None);
769        builder = builder
770            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
771            .set_column_dictionary_enabled(ts_col, false)
772            .set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED)
773            .set_column_dictionary_enabled(pk_index_col, true)
774            .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
775            .set_column_dictionary_enabled(sequence_col, false)
776            .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
777            .set_column_dictionary_enabled(op_type_col, true)
778            .set_column_index_truncate_length(None)
779            .set_statistics_truncate_length(None);
780        builder.build()
781    }
782
783    pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
784        let mut bytes = Vec::with_capacity(1024);
785
786        let rb = {
787            let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
788                .with_label_values(&["drain_data_buffer_to_batch"])
789                .start_timer();
790            drain_data_buffer_to_record_batches(
791                self.schema.clone(),
792                source,
793                self.pk_weights,
794                self.dedup,
795                self.replace_pk_index,
796            )?
797        };
798
799        {
800            let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
801                .with_label_values(&["encode"])
802                .start_timer();
803            let mut writer =
804                ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
805                    .context(error::EncodeMemtableSnafu)?;
806            writer.write(&rb).context(error::EncodeMemtableSnafu)?;
807            let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
808        }
809        Ok(DataPart::Parquet(ParquetPart {
810            data: Bytes::from(bytes),
811        }))
812    }
813}
814
815/// Format of immutable data part.
816pub enum DataPart {
817    Parquet(ParquetPart),
818}
819
820impl DataPart {
821    /// Reads frozen data part and yields [DataBatch]es.
822    pub fn read(&self) -> Result<DataPartReader> {
823        match self {
824            DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
825        }
826    }
827
828    fn is_empty(&self) -> bool {
829        match self {
830            DataPart::Parquet(p) => p.data.is_empty(),
831        }
832    }
833}
834
835pub struct DataPartReader {
836    inner: ParquetRecordBatchReader,
837    current_batch: Option<RecordBatch>,
838    current_range: Option<DataBatchRange>,
839    elapsed: Duration,
840}
841
842impl Drop for DataPartReader {
843    fn drop(&mut self) {
844        PARTITION_TREE_READ_STAGE_ELAPSED
845            .with_label_values(&["read_data_part"])
846            .observe(self.elapsed.as_secs_f64());
847    }
848}
849
850impl Debug for DataPartReader {
851    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
852        f.debug_struct("DataPartReader")
853            .field("current_range", &self.current_range)
854            .finish()
855    }
856}
857
858impl DataPartReader {
859    pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
860        let mut builder =
861            ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
862        if let Some(batch_size) = batch_size {
863            builder = builder.with_batch_size(batch_size);
864        }
865        let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
866        let mut reader = Self {
867            inner: parquet_reader,
868            current_batch: None,
869            current_range: None,
870            elapsed: Default::default(),
871        };
872        reader.next()?;
873        Ok(reader)
874    }
875
876    /// Returns false if current reader is exhausted.
877    pub(crate) fn is_valid(&self) -> bool {
878        self.current_range.is_some()
879    }
880
881    /// Returns current data batch of reader.
882    /// # Panics
883    /// If reader is exhausted.
884    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
885        let range = self.current_range.unwrap();
886        DataBatch {
887            rb: self.current_batch.as_ref().unwrap(),
888            range,
889        }
890    }
891
892    pub(crate) fn next(&mut self) -> Result<()> {
893        let start = Instant::now();
894        if let Some((next_pk, range)) = self.search_next_pk_range() {
895            // first try to search next pk in current record batch.
896            self.current_range = Some(DataBatchRange {
897                pk_index: next_pk,
898                start: range.start,
899                end: range.end,
900            });
901        } else {
902            // current record batch reaches eof, fetch next record batch from parquet reader.
903            if let Some(rb) = self.inner.next() {
904                let rb = rb.context(error::ComputeArrowSnafu)?;
905                self.current_batch = Some(rb);
906                self.current_range = None;
907                return self.next();
908            } else {
909                // parquet is also exhausted
910                self.current_batch = None;
911                self.current_range = None;
912            }
913        }
914        self.elapsed += start.elapsed();
915        Ok(())
916    }
917
918    /// Searches next primary key along with it's offset range inside record batch.
919    fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
920        self.current_batch.as_ref().and_then(|b| {
921            // safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
922            let pk_array = pk_index_array(b);
923            let start = self
924                .current_range
925                .as_ref()
926                .map(|range| range.end)
927                .unwrap_or(0);
928            search_next_pk_range(pk_array, start)
929        })
930    }
931}
932
933/// Parquet-encoded `DataPart`.
934pub struct ParquetPart {
935    data: Bytes,
936}
937
938/// Data parts under a shard.
939pub struct DataParts {
940    /// The active writing buffer.
941    active: DataBuffer,
942    /// immutable (encoded) parts.
943    frozen: Vec<DataPart>,
944}
945
946impl DataParts {
947    pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
948        Self {
949            active: DataBuffer::with_capacity(metadata, capacity, dedup),
950            frozen: Vec::new(),
951        }
952    }
953
954    pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
955        self.frozen = frozen;
956        self
957    }
958
959    /// Writes a row into parts.
960    pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
961        self.active.write_row(pk_index, kv)
962    }
963
964    /// Returns the number of rows in the active buffer.
965    pub fn num_active_rows(&self) -> usize {
966        self.active.num_rows()
967    }
968
969    /// Freezes active buffer and creates a new active buffer.
970    pub fn freeze(&mut self) -> Result<()> {
971        let part = self.active.freeze(None, false)?;
972        self.frozen.push(part);
973        Ok(())
974    }
975
976    /// Reads data from all parts including active and frozen parts.
977    /// The returned iterator yields a record batch of one primary key at a time.
978    /// The order of yielding primary keys is determined by provided weights.
979    pub fn read(&self) -> Result<DataPartsReaderBuilder> {
980        let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
981            .with_label_values(&["build_data_parts_reader"])
982            .start_timer();
983
984        let buffer = self.active.read()?;
985        let mut parts = Vec::with_capacity(self.frozen.len());
986        for p in &self.frozen {
987            parts.push(p.read()?);
988        }
989        Ok(DataPartsReaderBuilder { buffer, parts })
990    }
991
992    pub(crate) fn is_empty(&self) -> bool {
993        self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
994    }
995
996    #[cfg(test)]
997    pub(crate) fn frozen_len(&self) -> usize {
998        self.frozen.len()
999    }
1000}
1001
1002pub struct DataPartsReaderBuilder {
1003    buffer: DataBufferReaderBuilder,
1004    parts: Vec<DataPartReader>,
1005}
1006
1007impl DataPartsReaderBuilder {
1008    pub(crate) fn build(self) -> Result<DataPartsReader> {
1009        let mut nodes = Vec::with_capacity(self.parts.len() + 1);
1010        nodes.push(DataNode::new(DataSource::Buffer(
1011            // `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
1012            // then we pass None to sort rows directly according to pk_index.
1013            self.buffer.build(None)?,
1014        )));
1015        for p in self.parts {
1016            nodes.push(DataNode::new(DataSource::Part(p)));
1017        }
1018        let num_parts = nodes.len();
1019        let merger = Merger::try_new(nodes)?;
1020        Ok(DataPartsReader {
1021            merger,
1022            num_parts,
1023            elapsed: Default::default(),
1024        })
1025    }
1026}
1027
1028/// Reader for all parts inside a `DataParts`.
1029pub struct DataPartsReader {
1030    merger: Merger<DataNode>,
1031    num_parts: usize,
1032    elapsed: Duration,
1033}
1034
1035impl Drop for DataPartsReader {
1036    fn drop(&mut self) {
1037        PARTITION_TREE_READ_STAGE_ELAPSED
1038            .with_label_values(&["read_data_parts"])
1039            .observe(self.elapsed.as_secs_f64())
1040    }
1041}
1042
1043impl DataPartsReader {
1044    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
1045        let batch = self.merger.current_node().current_data_batch();
1046        batch.slice(0, self.merger.current_rows())
1047    }
1048
1049    pub(crate) fn next(&mut self) -> Result<()> {
1050        let start = Instant::now();
1051        let result = self.merger.next();
1052        self.elapsed += start.elapsed();
1053        result
1054    }
1055
1056    pub(crate) fn is_valid(&self) -> bool {
1057        self.merger.is_valid()
1058    }
1059
1060    pub(crate) fn num_parts(&self) -> usize {
1061        self.num_parts
1062    }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067    use datafusion::arrow::array::Float64Array;
1068    use datatypes::arrow::array::UInt16Array;
1069    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1070    use parquet::data_type::AsBytes;
1071
1072    use super::*;
1073    use crate::test_util::memtable_util::{
1074        extract_data_batch, metadata_for_test, write_rows_to_buffer,
1075    };
1076
1077    #[test]
1078    fn test_lazy_mutable_vector_builder() {
1079        let mut builder = LazyMutableVectorBuilder::new(ConcreteDataType::boolean_datatype());
1080        match builder {
1081            LazyMutableVectorBuilder::Type(ref t) => {
1082                assert_eq!(&ConcreteDataType::boolean_datatype(), t);
1083            }
1084            LazyMutableVectorBuilder::Builder(_) => {
1085                unreachable!()
1086            }
1087        }
1088        builder.get_or_create_builder(1);
1089        match builder {
1090            LazyMutableVectorBuilder::Type(_) => {
1091                unreachable!()
1092            }
1093            LazyMutableVectorBuilder::Builder(_) => {}
1094        }
1095    }
1096
1097    fn check_data_buffer_dedup(dedup: bool) {
1098        let metadata = metadata_for_test();
1099        let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
1100        write_rows_to_buffer(
1101            &mut buffer,
1102            &metadata,
1103            0,
1104            vec![2, 3],
1105            vec![Some(1.0), Some(2.0)],
1106            0,
1107        );
1108        write_rows_to_buffer(
1109            &mut buffer,
1110            &metadata,
1111            0,
1112            vec![1, 2],
1113            vec![Some(1.1), Some(2.1)],
1114            2,
1115        );
1116
1117        let mut reader = buffer.read().unwrap().build(Some(&[0])).unwrap();
1118        let mut res = vec![];
1119        while reader.is_valid() {
1120            let batch = reader.current_data_batch();
1121            res.push(extract_data_batch(&batch));
1122            reader.next().unwrap();
1123        }
1124        if dedup {
1125            assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
1126        } else {
1127            assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
1128        }
1129    }
1130
1131    #[test]
1132    fn test_data_buffer_dedup() {
1133        check_data_buffer_dedup(true);
1134        check_data_buffer_dedup(false);
1135    }
1136
1137    fn check_data_buffer_freeze(
1138        pk_weights: Option<&[u16]>,
1139        replace_pk_weights: bool,
1140        expected: &[(u16, Vec<(i64, u64)>)],
1141    ) {
1142        let meta = metadata_for_test();
1143        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1144
1145        // write rows with null values.
1146        write_rows_to_buffer(
1147            &mut buffer,
1148            &meta,
1149            0,
1150            vec![0, 1, 2],
1151            vec![Some(1.0), None, Some(3.0)],
1152            0,
1153        );
1154        write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);
1155
1156        let mut res = Vec::with_capacity(3);
1157        let mut reader = buffer
1158            .freeze(pk_weights, replace_pk_weights)
1159            .unwrap()
1160            .read()
1161            .unwrap();
1162        while reader.is_valid() {
1163            let batch = reader.current_data_batch();
1164            res.push(extract_data_batch(&batch));
1165            reader.next().unwrap();
1166        }
1167        assert_eq!(expected, res);
1168    }
1169
1170    #[test]
1171    fn test_data_buffer_freeze() {
1172        check_data_buffer_freeze(
1173            None,
1174            false,
1175            &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1176        );
1177
1178        check_data_buffer_freeze(
1179            Some(&[1, 2]),
1180            false,
1181            &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1182        );
1183
1184        check_data_buffer_freeze(
1185            Some(&[3, 2]),
1186            true,
1187            &[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
1188        );
1189
1190        check_data_buffer_freeze(
1191            Some(&[3, 2]),
1192            false,
1193            &[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
1194        );
1195    }
1196
1197    #[test]
1198    fn test_encode_data_buffer() {
1199        let meta = metadata_for_test();
1200        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1201
1202        // write rows with null values.
1203        write_rows_to_buffer(
1204            &mut buffer,
1205            &meta,
1206            2,
1207            vec![0, 1, 2],
1208            vec![Some(1.0), None, Some(3.0)],
1209            2,
1210        );
1211
1212        assert_eq!(3, buffer.num_rows());
1213
1214        write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
1215
1216        assert_eq!(4, buffer.num_rows());
1217
1218        let encoder = DataPartEncoder::new(
1219            &meta,
1220            Some(&[0, 1, 2]),
1221            None,
1222            meta.time_index_column().column_schema.name.clone(),
1223            true,
1224            true,
1225        );
1226        let encoded = match encoder.write(&mut buffer).unwrap() {
1227            DataPart::Parquet(data) => data.data,
1228        };
1229
1230        let s = String::from_utf8_lossy(encoded.as_bytes());
1231        assert!(s.starts_with("PAR1"));
1232        assert!(s.ends_with("PAR1"));
1233
1234        let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
1235        let mut reader = builder.build().unwrap();
1236        let batch = reader.next().unwrap().unwrap();
1237        assert_eq!(3, batch.num_rows());
1238    }
1239
1240    fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
1241        let mut output = Vec::with_capacity(expected_values.len());
1242        while reader.is_valid() {
1243            let batch = reader.current_data_batch().slice_record_batch();
1244            let values = batch
1245                .column_by_name("v1")
1246                .unwrap()
1247                .as_any()
1248                .downcast_ref::<Float64Array>()
1249                .unwrap()
1250                .iter()
1251                .map(|v| v.unwrap())
1252                .collect::<Vec<_>>();
1253            output.push(values);
1254            reader.next().unwrap();
1255        }
1256        assert_eq!(expected_values, output);
1257    }
1258
1259    #[test]
1260    fn test_search_next_pk_range() {
1261        let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
1262        assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
1263        assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
1264        assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
1265        assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
1266
1267        assert_eq!(None, search_next_pk_range(&a, 6));
1268    }
1269
1270    fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
1271        let meta = metadata_for_test();
1272        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1273
1274        write_rows_to_buffer(
1275            &mut buffer,
1276            &meta,
1277            3,
1278            vec![1, 2, 3],
1279            vec![Some(1.1), Some(2.1), Some(3.1)],
1280            3,
1281        );
1282
1283        write_rows_to_buffer(
1284            &mut buffer,
1285            &meta,
1286            2,
1287            vec![0, 1, 2],
1288            vec![Some(1.0), Some(2.0), Some(3.0)],
1289            2,
1290        );
1291
1292        let mut iter = buffer.read().unwrap().build(pk_weights).unwrap();
1293        check_buffer_values_equal(&mut iter, expected);
1294    }
1295
1296    #[test]
1297    fn test_iter_data_buffer() {
1298        check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
1299        check_iter_data_buffer(
1300            Some(&[0, 1, 2, 3]),
1301            &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
1302        );
1303        check_iter_data_buffer(
1304            Some(&[3, 2, 1, 0]),
1305            &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
1306        );
1307    }
1308
1309    #[test]
1310    fn test_iter_empty_data_buffer() {
1311        let meta = metadata_for_test();
1312        let buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1313        let mut iter = buffer.read().unwrap().build(Some(&[0, 1, 3, 2])).unwrap();
1314        check_buffer_values_equal(&mut iter, &[]);
1315    }
1316
1317    fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
1318        let mut output = Vec::with_capacity(expected_values.len());
1319        while iter.is_valid() {
1320            let batch = iter.current_data_batch().slice_record_batch();
1321            let values = batch
1322                .column_by_name("v1")
1323                .unwrap()
1324                .as_any()
1325                .downcast_ref::<Float64Array>()
1326                .unwrap()
1327                .iter()
1328                .map(|v| v.unwrap())
1329                .collect::<Vec<_>>();
1330            output.push(values);
1331            iter.next().unwrap();
1332        }
1333        assert_eq!(expected_values, output);
1334    }
1335
1336    fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
1337        let meta = metadata_for_test();
1338        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1339
1340        write_rows_to_buffer(
1341            &mut buffer,
1342            &meta,
1343            2,
1344            vec![0, 1, 2],
1345            vec![Some(1.0), Some(2.0), Some(3.0)],
1346            2,
1347        );
1348
1349        write_rows_to_buffer(
1350            &mut buffer,
1351            &meta,
1352            3,
1353            vec![1, 2, 3],
1354            vec![Some(1.1), Some(2.1), Some(3.1)],
1355            3,
1356        );
1357
1358        write_rows_to_buffer(
1359            &mut buffer,
1360            &meta,
1361            2,
1362            vec![2, 3],
1363            vec![Some(2.2), Some(2.3)],
1364            4,
1365        );
1366
1367        let encoder = DataPartEncoder::new(
1368            &meta,
1369            Some(weights),
1370            Some(4),
1371            meta.time_index_column().column_schema.name.clone(),
1372            true,
1373            true,
1374        );
1375        let encoded = encoder.write(&mut buffer).unwrap();
1376
1377        let mut iter = encoded.read().unwrap();
1378        check_part_values_equal(&mut iter, expected_values);
1379    }
1380
1381    #[test]
1382    fn test_iter_data_part() {
1383        check_iter_data_part(
1384            &[0, 1, 2, 3],
1385            &[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
1386        );
1387
1388        check_iter_data_part(
1389            &[3, 2, 1, 0],
1390            &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
1391        );
1392    }
1393}