Skip to main content

mito2/memtable/partition_tree/
data.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Data part of a shard.
16
17use std::cmp::{Ordering, Reverse};
18use std::fmt::{Debug, Formatter};
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use bytes::Bytes;
24use datatypes::arrow;
25use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
26use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
27use datatypes::data_type::DataType;
28use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
29use datatypes::schema::ColumnSchema;
30use datatypes::types::TimestampType;
31use datatypes::vectors::{
32    TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33    TimestampSecondVector, UInt8Vector, UInt8VectorBuilder, UInt16Vector, UInt16VectorBuilder,
34    UInt64Vector, UInt64VectorBuilder,
35};
36use mito_codec::key_values::KeyValue;
37use parquet::arrow::ArrowWriter;
38use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::properties::{EnabledStatistics, WriterProperties};
41use parquet::schema::types::ColumnPath;
42use snafu::ResultExt;
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
45
46use crate::error;
47use crate::error::Result;
48use crate::memtable::partition_tree::PkIndex;
49use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
50use crate::metrics::{
51    PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
52};
53use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
54
55const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
56
57/// Initial capacity for the data buffer.
58pub(crate) const DATA_INIT_CAP: usize = 8;
59
60/// Range of a data batch.
61#[derive(Debug, Clone, Copy)]
62pub(crate) struct DataBatchRange {
63    /// Primary key index of this batch.
64    pub(crate) pk_index: PkIndex,
65    /// Start of current primary key inside record batch.
66    pub(crate) start: usize,
67    /// End of current primary key inside record batch.
68    pub(crate) end: usize,
69}
70
71impl DataBatchRange {
72    pub(crate) fn len(&self) -> usize {
73        self.end - self.start
74    }
75}
76
77/// Data part batches returns by `DataParts::read`.
78#[derive(Debug, Clone, Copy)]
79pub struct DataBatch<'a> {
80    /// Record batch of data.
81    rb: &'a RecordBatch,
82    /// Range of current primary key inside record batch
83    range: DataBatchRange,
84}
85
86impl<'a> DataBatch<'a> {
87    pub(crate) fn pk_index(&self) -> PkIndex {
88        self.range.pk_index
89    }
90
91    pub(crate) fn range(&self) -> DataBatchRange {
92        self.range
93    }
94
95    pub(crate) fn slice_record_batch(&self) -> RecordBatch {
96        self.rb.slice(self.range.start, self.range.len())
97    }
98
99    pub(crate) fn first_row(&self) -> (i64, u64) {
100        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
101        let sequence_values = self
102            .rb
103            .column(2)
104            .as_any()
105            .downcast_ref::<UInt64Array>()
106            .unwrap()
107            .values();
108        (
109            ts_values[self.range.start],
110            sequence_values[self.range.start],
111        )
112    }
113
114    pub(crate) fn last_row(&self) -> (i64, u64) {
115        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
116        let sequence_values = self
117            .rb
118            .column(2)
119            .as_any()
120            .downcast_ref::<UInt64Array>()
121            .unwrap()
122            .values();
123        (
124            ts_values[self.range.end - 1],
125            sequence_values[self.range.end - 1],
126        )
127    }
128
129    pub(crate) fn first_key(&self) -> DataBatchKey {
130        let pk_index = self.pk_index();
131        let ts_array = self.rb.column(1);
132
133        // maybe safe the result somewhere.
134        let ts_values = timestamp_array_to_i64_slice(ts_array);
135        let timestamp = ts_values[self.range.start];
136        DataBatchKey {
137            pk_index,
138            timestamp,
139        }
140    }
141
142    pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
143        let DataBatchKey {
144            pk_index,
145            timestamp,
146        } = key;
147        assert_eq!(*pk_index, self.range.pk_index);
148        let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
149        let ts_values = &ts_values[self.range.start..self.range.end];
150        ts_values.binary_search(timestamp)
151    }
152
153    pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
154        let start = self.range.start + offset;
155        let end = start + length;
156        DataBatch {
157            rb: self.rb,
158            range: DataBatchRange {
159                pk_index: self.range.pk_index,
160                start,
161                end,
162            },
163        }
164    }
165
166    pub(crate) fn num_rows(&self) -> usize {
167        self.range.len()
168    }
169}
170
171/// Buffer for the value part (pk_index, ts, sequence, op_type, field columns) in a shard.
172pub struct DataBuffer {
173    metadata: RegionMetadataRef,
174    /// Schema for data part (primary keys are replaced with pk_index)
175    data_part_schema: SchemaRef,
176    /// Builder for primary key index.
177    pk_index_builder: UInt16VectorBuilder,
178    /// Builder for timestamp column.
179    ts_builder: Box<dyn MutableVector>,
180    /// Builder for sequence column.
181    sequence_builder: UInt64VectorBuilder,
182    /// Builder for op_type column.
183    op_type_builder: UInt8VectorBuilder,
184    /// Builders for field columns.
185    field_builders: Vec<LazyMutableVectorBuilder>,
186
187    dedup: bool,
188}
189
190impl DataBuffer {
191    /// Creates a `DataBuffer` instance with given schema and capacity.
192    pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
193        let ts_builder = metadata
194            .time_index_column()
195            .column_schema
196            .data_type
197            .create_mutable_vector(init_capacity);
198
199        let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
200        let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
201        let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
202
203        let field_builders = metadata
204            .field_columns()
205            .map(|c| LazyMutableVectorBuilder::new(c.column_schema.data_type.clone()))
206            .collect::<Vec<_>>();
207
208        let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
209        Self {
210            metadata,
211            data_part_schema,
212            pk_index_builder: pk_id_builder,
213            ts_builder,
214            sequence_builder,
215            op_type_builder,
216            field_builders,
217            dedup,
218        }
219    }
220
221    /// Writes a row to data buffer.
222    pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
223        self.ts_builder.push_value_ref(&kv.timestamp());
224        self.pk_index_builder.push(Some(pk_index));
225        self.sequence_builder.push(Some(kv.sequence()));
226        self.op_type_builder.push(Some(kv.op_type() as u8));
227
228        debug_assert_eq!(self.field_builders.len(), kv.num_fields());
229
230        for (idx, field) in kv.fields().enumerate() {
231            self.field_builders[idx]
232                .get_or_create_builder(self.ts_builder.len())
233                .push_value_ref(&field);
234        }
235    }
236
237    /// Freezes `DataBuffer` to bytes.
238    /// If `pk_weights` is present, it will be used to sort rows.
239    ///
240    /// `freeze` clears the buffers of builders.
241    pub fn freeze(
242        &mut self,
243        pk_weights: Option<&[u16]>,
244        replace_pk_index: bool,
245    ) -> Result<DataPart> {
246        let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone();
247        let encoder = DataPartEncoder::new(
248            &self.metadata,
249            pk_weights,
250            None,
251            timestamp_col_name,
252            replace_pk_index,
253            self.dedup,
254        );
255        let parts = encoder.write(self)?;
256        Ok(parts)
257    }
258
259    /// Builds a lazily initialized data buffer reader from [DataBuffer]
260    pub fn read(&self) -> Result<DataBufferReaderBuilder> {
261        let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
262            .with_label_values(&["read_data_buffer"])
263            .start_timer();
264
265        let (pk_index, timestamp, sequence, op_type) = (
266            self.pk_index_builder.finish_cloned(),
267            self.ts_builder.to_vector_cloned(),
268            self.sequence_builder.finish_cloned(),
269            self.op_type_builder.finish_cloned(),
270        );
271
272        let mut fields = Vec::with_capacity(self.field_builders.len());
273        for b in self.field_builders.iter() {
274            let field = match b {
275                LazyMutableVectorBuilder::Type(ty) => LazyFieldVector::Type(ty.clone()),
276                LazyMutableVectorBuilder::Builder(builder) => {
277                    LazyFieldVector::Vector(builder.to_vector_cloned())
278                }
279            };
280            fields.push(field);
281        }
282
283        Ok(DataBufferReaderBuilder {
284            schema: self.data_part_schema.clone(),
285            pk_index,
286            timestamp,
287            sequence,
288            op_type,
289            fields,
290            dedup: self.dedup,
291        })
292    }
293
294    /// Returns num of rows in data buffer.
295    pub fn num_rows(&self) -> usize {
296        self.ts_builder.len()
297    }
298
299    /// Returns whether the buffer is empty.
300    pub fn is_empty(&self) -> bool {
301        self.num_rows() == 0
302    }
303}
304
305enum LazyMutableVectorBuilder {
306    Type(ConcreteDataType),
307    Builder(Box<dyn MutableVector>),
308}
309
310impl LazyMutableVectorBuilder {
311    fn new(ty: ConcreteDataType) -> Self {
312        Self::Type(ty)
313    }
314
315    fn get_or_create_builder(&mut self, init_capacity: usize) -> &mut Box<dyn MutableVector> {
316        match self {
317            LazyMutableVectorBuilder::Type(ty) => {
318                let builder = ty.create_mutable_vector(init_capacity);
319                *self = LazyMutableVectorBuilder::Builder(builder);
320                self.get_or_create_builder(init_capacity)
321            }
322            LazyMutableVectorBuilder::Builder(builder) => builder,
323        }
324    }
325}
326
327/// Converts `DataBuffer` to record batches, with rows sorted according to pk_weights.
328/// `dedup`: whether to true to remove the duplicated rows inside `DataBuffer`.
329/// `replace_pk_index`: whether to replace the pk_index values with corresponding pk weight.
330fn drain_data_buffer_to_record_batches(
331    schema: SchemaRef,
332    buffer: &mut DataBuffer,
333    pk_weights: Option<&[u16]>,
334    dedup: bool,
335    replace_pk_index: bool,
336) -> Result<RecordBatch> {
337    let num_rows = buffer.ts_builder.len();
338
339    let (pk_index_v, ts_v, sequence_v, op_type_v) = (
340        buffer.pk_index_builder.finish(),
341        buffer.ts_builder.to_vector(),
342        buffer.sequence_builder.finish(),
343        buffer.op_type_builder.finish(),
344    );
345
346    let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
347        pk_weights,
348        pk_index_v,
349        ts_v,
350        sequence_v,
351        op_type_v,
352        replace_pk_index,
353        dedup,
354        buffer.field_builders.len() + 4,
355    )?;
356
357    for b in buffer.field_builders.iter_mut() {
358        let array = match b {
359            LazyMutableVectorBuilder::Type(ty) => {
360                let mut single_null = ty.create_mutable_vector(num_rows);
361                single_null.push_nulls(num_rows);
362                single_null.to_vector().to_arrow_array()
363            }
364            LazyMutableVectorBuilder::Builder(builder) => builder.to_vector().to_arrow_array(),
365        };
366        columns.push(
367            arrow::compute::take(&array, &indices_to_take, None)
368                .context(error::ComputeArrowSnafu)?,
369        );
370    }
371
372    RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
373}
374
375#[allow(clippy::too_many_arguments)]
376fn build_row_sort_indices_and_columns(
377    pk_weights: Option<&[u16]>,
378    pk_index: UInt16Vector,
379    ts: VectorRef,
380    sequence: UInt64Vector,
381    op_type: UInt8Vector,
382    replace_pk_index: bool,
383    dedup: bool,
384    column_num: usize,
385) -> Result<(UInt32Array, Vec<ArrayRef>)> {
386    let mut rows = build_rows_to_sort(pk_weights, &pk_index, &ts, &sequence);
387
388    let pk_array = if replace_pk_index {
389        // replace pk index values with pk weights.
390        Arc::new(UInt16Array::from_iter_values(
391            rows.iter().map(|(_, key)| key.pk_weight),
392        )) as Arc<_>
393    } else {
394        pk_index.to_arrow_array()
395    };
396
397    // sort and dedup
398    rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
399    if dedup {
400        rows.dedup_by(|l, r| l.1.pk_weight == r.1.pk_weight && l.1.timestamp == r.1.timestamp);
401    }
402
403    let indices_to_take = UInt32Array::from_iter_values(rows.iter().map(|(idx, _)| *idx as u32));
404
405    let mut columns = Vec::with_capacity(column_num);
406
407    columns.push(
408        arrow::compute::take(&pk_array, &indices_to_take, None)
409            .context(error::ComputeArrowSnafu)?,
410    );
411
412    columns.push(
413        arrow::compute::take(&ts.to_arrow_array(), &indices_to_take, None)
414            .context(error::ComputeArrowSnafu)?,
415    );
416
417    columns.push(
418        arrow::compute::take(&sequence.as_arrow(), &indices_to_take, None)
419            .context(error::ComputeArrowSnafu)?,
420    );
421
422    columns.push(
423        arrow::compute::take(&op_type.as_arrow(), &indices_to_take, None)
424            .context(error::ComputeArrowSnafu)?,
425    );
426
427    Ok((indices_to_take, columns))
428}
429
430pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
431    use datatypes::arrow::array::{
432        TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
433        TimestampSecondArray,
434    };
435    use datatypes::arrow::datatypes::{DataType, TimeUnit};
436
437    match arr.data_type() {
438        DataType::Timestamp(t, _) => match t {
439            TimeUnit::Second => arr
440                .as_any()
441                .downcast_ref::<TimestampSecondArray>()
442                .unwrap()
443                .values(),
444            TimeUnit::Millisecond => arr
445                .as_any()
446                .downcast_ref::<TimestampMillisecondArray>()
447                .unwrap()
448                .values(),
449            TimeUnit::Microsecond => arr
450                .as_any()
451                .downcast_ref::<TimestampMicrosecondArray>()
452                .unwrap()
453                .values(),
454            TimeUnit::Nanosecond => arr
455                .as_any()
456                .downcast_ref::<TimestampNanosecondArray>()
457                .unwrap()
458                .values(),
459        },
460        _ => unreachable!(),
461    }
462}
463
464enum LazyFieldVector {
465    Type(ConcreteDataType),
466    Vector(VectorRef),
467}
468
469pub(crate) struct DataBufferReaderBuilder {
470    schema: SchemaRef,
471    pk_index: UInt16Vector,
472    timestamp: VectorRef,
473    sequence: UInt64Vector,
474    op_type: UInt8Vector,
475    fields: Vec<LazyFieldVector>,
476    dedup: bool,
477}
478
479impl DataBufferReaderBuilder {
480    fn build_record_batch(self, pk_weights: Option<&[u16]>) -> Result<RecordBatch> {
481        let num_rows = self.timestamp.len();
482        let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
483            pk_weights,
484            self.pk_index,
485            self.timestamp,
486            self.sequence,
487            self.op_type,
488            // replace_pk_index is always set to false since:
489            // - for DataBuffer in ShardBuilder, pk dict is not frozen
490            // - for DataBuffer in Shard, values in pk_index column has already been replaced during `freeze`.
491            false,
492            self.dedup,
493            self.fields.len() + 4,
494        )?;
495
496        for b in self.fields.iter() {
497            let array = match b {
498                LazyFieldVector::Type(ty) => {
499                    let mut single_null = ty.create_mutable_vector(num_rows);
500                    single_null.push_nulls(num_rows);
501                    single_null.to_vector().to_arrow_array()
502                }
503                LazyFieldVector::Vector(vector) => vector.to_arrow_array(),
504            };
505            columns.push(
506                arrow::compute::take(&array, &indices_to_take, None)
507                    .context(error::ComputeArrowSnafu)?,
508            );
509        }
510        RecordBatch::try_new(self.schema, columns).context(error::NewRecordBatchSnafu)
511    }
512
513    pub fn build(self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
514        self.build_record_batch(pk_weights)
515            .and_then(DataBufferReader::new)
516    }
517}
518
519#[derive(Debug)]
520pub(crate) struct DataBufferReader {
521    batch: RecordBatch,
522    offset: usize,
523    current_range: Option<DataBatchRange>,
524    elapsed_time: Duration,
525}
526
527impl Drop for DataBufferReader {
528    fn drop(&mut self) {
529        PARTITION_TREE_READ_STAGE_ELAPSED
530            .with_label_values(&["read_data_buffer"])
531            .observe(self.elapsed_time.as_secs_f64())
532    }
533}
534
535impl DataBufferReader {
536    pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
537        let mut reader = Self {
538            batch,
539            offset: 0,
540            current_range: None,
541            elapsed_time: Duration::default(),
542        };
543        reader.next()?; // fill data batch for comparison and merge.
544        Ok(reader)
545    }
546
547    pub(crate) fn is_valid(&self) -> bool {
548        self.current_range.is_some()
549    }
550
551    /// Returns current data batch.
552    /// # Panics
553    /// If Current reader is exhausted.
554    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
555        let range = self.current_range.unwrap();
556        DataBatch {
557            rb: &self.batch,
558            range,
559        }
560    }
561
562    /// Advances reader to next data batch.
563    pub(crate) fn next(&mut self) -> Result<()> {
564        if self.offset >= self.batch.num_rows() {
565            self.current_range = None;
566            return Ok(());
567        }
568        let start = Instant::now();
569        let pk_index_array = pk_index_array(&self.batch);
570        if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
571            self.offset = range.end;
572            self.current_range = Some(DataBatchRange {
573                pk_index: next_pk,
574                start: range.start,
575                end: range.end,
576            });
577        } else {
578            self.current_range = None;
579        }
580        self.elapsed_time += start.elapsed();
581        Ok(())
582    }
583}
584
585/// Gets `pk_index` array from record batch.
586/// # Panics
587/// If pk index column is not the first column or the type is not `UInt16Array`.
588fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
589    batch
590        .column(0)
591        .as_any()
592        .downcast_ref::<UInt16Array>()
593        .unwrap()
594}
595
596/// Searches for next pk index, and it's offset range in a sorted `UInt16Array`.
597fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
598    let num_rows = array.len();
599    if start >= num_rows {
600        return None;
601    }
602
603    let values = array.values();
604    let next_pk = values[start];
605
606    for idx in start..num_rows {
607        if values[idx] != next_pk {
608            return Some((next_pk, start..idx));
609        }
610    }
611    Some((next_pk, start..num_rows))
612}
613
614#[derive(Eq, PartialEq)]
615struct InnerKey {
616    pk_weight: u16,
617    timestamp: i64,
618    sequence: u64,
619}
620
621impl PartialOrd for InnerKey {
622    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
623        Some(self.cmp(other))
624    }
625}
626
627impl Ord for InnerKey {
628    fn cmp(&self, other: &Self) -> Ordering {
629        (self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
630            other.pk_weight,
631            other.timestamp,
632            Reverse(other.sequence),
633        ))
634    }
635}
636
637fn build_rows_to_sort(
638    pk_weights: Option<&[u16]>,
639    pk_index: &UInt16Vector,
640    ts: &VectorRef,
641    sequence: &UInt64Vector,
642) -> Vec<(usize, InnerKey)> {
643    let ts_values = match ts.data_type() {
644        ConcreteDataType::Timestamp(t) => match t {
645            TimestampType::Second(_) => ts
646                .as_any()
647                .downcast_ref::<TimestampSecondVector>()
648                .unwrap()
649                .as_arrow()
650                .values(),
651            TimestampType::Millisecond(_) => ts
652                .as_any()
653                .downcast_ref::<TimestampMillisecondVector>()
654                .unwrap()
655                .as_arrow()
656                .values(),
657            TimestampType::Microsecond(_) => ts
658                .as_any()
659                .downcast_ref::<TimestampMicrosecondVector>()
660                .unwrap()
661                .as_arrow()
662                .values(),
663            TimestampType::Nanosecond(_) => ts
664                .as_any()
665                .downcast_ref::<TimestampNanosecondVector>()
666                .unwrap()
667                .as_arrow()
668                .values(),
669        },
670        other => unreachable!("Unexpected type {:?}", other),
671    };
672    let pk_index_values = pk_index.as_arrow().values();
673    let sequence_values = sequence.as_arrow().values();
674    debug_assert_eq!(ts_values.len(), pk_index_values.len());
675    debug_assert_eq!(ts_values.len(), sequence_values.len());
676
677    ts_values
678        .iter()
679        .zip(pk_index_values.iter())
680        .zip(sequence_values.iter())
681        .enumerate()
682        .map(|(idx, ((timestamp, pk_index), sequence))| {
683            let pk_weight = if let Some(weights) = pk_weights {
684                weights[*pk_index as usize] // if pk_weights is present, sort according to weight.
685            } else {
686                *pk_index // otherwise pk_index has already been replaced by weights.
687            };
688            (
689                idx,
690                InnerKey {
691                    timestamp: *timestamp,
692                    pk_weight,
693                    sequence: *sequence,
694                },
695            )
696        })
697        .collect()
698}
699
700fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
701    use datatypes::arrow::datatypes::DataType;
702    let ColumnSchema {
703        name: ts_name,
704        data_type: ts_type,
705        ..
706    } = &schema.time_index_column().column_schema;
707
708    let mut fields = vec![
709        Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
710        Field::new(ts_name, ts_type.as_arrow_type(), false),
711        Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
712        Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
713    ];
714
715    fields.extend(schema.field_columns().map(|c| {
716        Field::new(
717            &c.column_schema.name,
718            c.column_schema.data_type.as_arrow_type(),
719            c.column_schema.is_nullable(),
720        )
721    }));
722
723    Arc::new(Schema::new(fields))
724}
725
726struct DataPartEncoder<'a> {
727    schema: SchemaRef,
728    pk_weights: Option<&'a [u16]>,
729    row_group_size: Option<usize>,
730    timestamp_column_name: String,
731    replace_pk_index: bool,
732    dedup: bool,
733}
734
735impl<'a> DataPartEncoder<'a> {
736    pub fn new(
737        metadata: &RegionMetadataRef,
738        pk_weights: Option<&'a [u16]>,
739        row_group_size: Option<usize>,
740        timestamp_column_name: String,
741        replace_pk_index: bool,
742        dedup: bool,
743    ) -> DataPartEncoder<'a> {
744        let schema = memtable_schema_to_encoded_schema(metadata);
745        Self {
746            schema,
747            pk_weights,
748            row_group_size,
749            timestamp_column_name,
750            replace_pk_index,
751            dedup,
752        }
753    }
754
755    // todo(hl): more customized config according to region options.
756    fn writer_props(self) -> WriterProperties {
757        let mut builder = WriterProperties::builder();
758        if let Some(row_group_size) = self.row_group_size {
759            builder = builder.set_max_row_group_size(row_group_size)
760        }
761
762        let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);
763        let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]);
764        let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
765        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
766
767        builder = builder
768            .set_compression(Compression::ZSTD(ZstdLevel::default()))
769            .set_statistics_enabled(EnabledStatistics::None);
770        builder = builder
771            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
772            .set_column_dictionary_enabled(ts_col, false)
773            .set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED)
774            .set_column_dictionary_enabled(pk_index_col, true)
775            .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
776            .set_column_dictionary_enabled(sequence_col, false)
777            .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
778            .set_column_dictionary_enabled(op_type_col, true)
779            .set_column_index_truncate_length(None)
780            .set_statistics_truncate_length(None);
781        builder.build()
782    }
783
784    pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
785        let mut bytes = Vec::with_capacity(1024);
786
787        let rb = {
788            let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
789                .with_label_values(&["drain_data_buffer_to_batch"])
790                .start_timer();
791            drain_data_buffer_to_record_batches(
792                self.schema.clone(),
793                source,
794                self.pk_weights,
795                self.dedup,
796                self.replace_pk_index,
797            )?
798        };
799
800        {
801            let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
802                .with_label_values(&["encode"])
803                .start_timer();
804            let mut writer =
805                ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
806                    .context(error::EncodeMemtableSnafu)?;
807            writer.write(&rb).context(error::EncodeMemtableSnafu)?;
808            let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
809        }
810        Ok(DataPart::Parquet(ParquetPart {
811            data: Bytes::from(bytes),
812        }))
813    }
814}
815
816/// Format of immutable data part.
817pub enum DataPart {
818    Parquet(ParquetPart),
819}
820
821impl DataPart {
822    /// Reads frozen data part and yields [DataBatch]es.
823    pub fn read(&self) -> Result<DataPartReader> {
824        match self {
825            // Keep encoded memtable scans aligned with mito/DataFusion batch sizing instead of
826            // parquet-rs's implicit 1024-row default.
827            DataPart::Parquet(data_bytes) => {
828                DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE))
829            }
830        }
831    }
832
833    fn is_empty(&self) -> bool {
834        match self {
835            DataPart::Parquet(p) => p.data.is_empty(),
836        }
837    }
838}
839
840pub struct DataPartReader {
841    inner: ParquetRecordBatchReader,
842    current_batch: Option<RecordBatch>,
843    current_range: Option<DataBatchRange>,
844    elapsed: Duration,
845}
846
847impl Drop for DataPartReader {
848    fn drop(&mut self) {
849        PARTITION_TREE_READ_STAGE_ELAPSED
850            .with_label_values(&["read_data_part"])
851            .observe(self.elapsed.as_secs_f64());
852    }
853}
854
855impl Debug for DataPartReader {
856    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
857        f.debug_struct("DataPartReader")
858            .field("current_range", &self.current_range)
859            .finish()
860    }
861}
862
863impl DataPartReader {
864    pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
865        let mut builder =
866            ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
867        if let Some(batch_size) = batch_size {
868            builder = builder.with_batch_size(batch_size);
869        }
870        let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
871        let mut reader = Self {
872            inner: parquet_reader,
873            current_batch: None,
874            current_range: None,
875            elapsed: Default::default(),
876        };
877        reader.next()?;
878        Ok(reader)
879    }
880
881    /// Returns false if current reader is exhausted.
882    pub(crate) fn is_valid(&self) -> bool {
883        self.current_range.is_some()
884    }
885
886    /// Returns current data batch of reader.
887    /// # Panics
888    /// If reader is exhausted.
889    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
890        let range = self.current_range.unwrap();
891        DataBatch {
892            rb: self.current_batch.as_ref().unwrap(),
893            range,
894        }
895    }
896
897    pub(crate) fn next(&mut self) -> Result<()> {
898        let start = Instant::now();
899        if let Some((next_pk, range)) = self.search_next_pk_range() {
900            // first try to search next pk in current record batch.
901            self.current_range = Some(DataBatchRange {
902                pk_index: next_pk,
903                start: range.start,
904                end: range.end,
905            });
906        } else {
907            // current record batch reaches eof, fetch next record batch from parquet reader.
908            if let Some(rb) = self.inner.next() {
909                let rb = rb.context(error::ComputeArrowSnafu)?;
910                self.current_batch = Some(rb);
911                self.current_range = None;
912                return self.next();
913            } else {
914                // parquet is also exhausted
915                self.current_batch = None;
916                self.current_range = None;
917            }
918        }
919        self.elapsed += start.elapsed();
920        Ok(())
921    }
922
923    /// Searches next primary key along with it's offset range inside record batch.
924    fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
925        self.current_batch.as_ref().and_then(|b| {
926            // safety: PK_INDEX_COLUMN_NAME must present in record batch yielded by data part.
927            let pk_array = pk_index_array(b);
928            let start = self
929                .current_range
930                .as_ref()
931                .map(|range| range.end)
932                .unwrap_or(0);
933            search_next_pk_range(pk_array, start)
934        })
935    }
936}
937
938/// Parquet-encoded `DataPart`.
939pub struct ParquetPart {
940    data: Bytes,
941}
942
943/// Data parts under a shard.
944pub struct DataParts {
945    /// The active writing buffer.
946    active: DataBuffer,
947    /// immutable (encoded) parts.
948    frozen: Vec<DataPart>,
949}
950
951impl DataParts {
952    pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
953        Self {
954            active: DataBuffer::with_capacity(metadata, capacity, dedup),
955            frozen: Vec::new(),
956        }
957    }
958
959    pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
960        self.frozen = frozen;
961        self
962    }
963
964    /// Writes a row into parts.
965    pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
966        self.active.write_row(pk_index, kv)
967    }
968
969    /// Returns the number of rows in the active buffer.
970    pub fn num_active_rows(&self) -> usize {
971        self.active.num_rows()
972    }
973
974    /// Freezes active buffer and creates a new active buffer.
975    pub fn freeze(&mut self) -> Result<()> {
976        let part = self.active.freeze(None, false)?;
977        self.frozen.push(part);
978        Ok(())
979    }
980
981    /// Reads data from all parts including active and frozen parts.
982    /// The returned iterator yields a record batch of one primary key at a time.
983    /// The order of yielding primary keys is determined by provided weights.
984    pub fn read(&self) -> Result<DataPartsReaderBuilder> {
985        let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
986            .with_label_values(&["build_data_parts_reader"])
987            .start_timer();
988
989        let buffer = self.active.read()?;
990        let mut parts = Vec::with_capacity(self.frozen.len());
991        for p in &self.frozen {
992            parts.push(p.read()?);
993        }
994        Ok(DataPartsReaderBuilder { buffer, parts })
995    }
996
997    pub(crate) fn is_empty(&self) -> bool {
998        self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
999    }
1000
1001    #[cfg(test)]
1002    pub(crate) fn frozen_len(&self) -> usize {
1003        self.frozen.len()
1004    }
1005}
1006
1007pub struct DataPartsReaderBuilder {
1008    buffer: DataBufferReaderBuilder,
1009    parts: Vec<DataPartReader>,
1010}
1011
1012impl DataPartsReaderBuilder {
1013    pub(crate) fn build(self) -> Result<DataPartsReader> {
1014        let mut nodes = Vec::with_capacity(self.parts.len() + 1);
1015        nodes.push(DataNode::new(DataSource::Buffer(
1016            // `DataPars::read` ensures that all pk_index inside `DataBuffer` are replaced by weights.
1017            // then we pass None to sort rows directly according to pk_index.
1018            self.buffer.build(None)?,
1019        )));
1020        for p in self.parts {
1021            nodes.push(DataNode::new(DataSource::Part(p)));
1022        }
1023        let num_parts = nodes.len();
1024        let merger = Merger::try_new(nodes)?;
1025        Ok(DataPartsReader {
1026            merger,
1027            num_parts,
1028            elapsed: Default::default(),
1029        })
1030    }
1031}
1032
1033/// Reader for all parts inside a `DataParts`.
1034pub struct DataPartsReader {
1035    merger: Merger<DataNode>,
1036    num_parts: usize,
1037    elapsed: Duration,
1038}
1039
1040impl Drop for DataPartsReader {
1041    fn drop(&mut self) {
1042        PARTITION_TREE_READ_STAGE_ELAPSED
1043            .with_label_values(&["read_data_parts"])
1044            .observe(self.elapsed.as_secs_f64())
1045    }
1046}
1047
1048impl DataPartsReader {
1049    pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
1050        let batch = self.merger.current_node().current_data_batch();
1051        batch.slice(0, self.merger.current_rows())
1052    }
1053
1054    pub(crate) fn next(&mut self) -> Result<()> {
1055        let start = Instant::now();
1056        let result = self.merger.next();
1057        self.elapsed += start.elapsed();
1058        result
1059    }
1060
1061    pub(crate) fn is_valid(&self) -> bool {
1062        self.merger.is_valid()
1063    }
1064
1065    pub(crate) fn num_parts(&self) -> usize {
1066        self.num_parts
1067    }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072    use datafusion::arrow::array::Float64Array;
1073    use datatypes::arrow::array::UInt16Array;
1074    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1075    use parquet::data_type::AsBytes;
1076
1077    use super::*;
1078    use crate::test_util::memtable_util::{
1079        extract_data_batch, metadata_for_test, write_rows_to_buffer,
1080    };
1081
1082    #[test]
1083    fn test_lazy_mutable_vector_builder() {
1084        let mut builder = LazyMutableVectorBuilder::new(ConcreteDataType::boolean_datatype());
1085        match builder {
1086            LazyMutableVectorBuilder::Type(ref t) => {
1087                assert_eq!(&ConcreteDataType::boolean_datatype(), t);
1088            }
1089            LazyMutableVectorBuilder::Builder(_) => {
1090                unreachable!()
1091            }
1092        }
1093        builder.get_or_create_builder(1);
1094        match builder {
1095            LazyMutableVectorBuilder::Type(_) => {
1096                unreachable!()
1097            }
1098            LazyMutableVectorBuilder::Builder(_) => {}
1099        }
1100    }
1101
1102    fn check_data_buffer_dedup(dedup: bool) {
1103        let metadata = metadata_for_test();
1104        let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
1105        write_rows_to_buffer(
1106            &mut buffer,
1107            &metadata,
1108            0,
1109            vec![2, 3],
1110            vec![Some(1.0), Some(2.0)],
1111            0,
1112        );
1113        write_rows_to_buffer(
1114            &mut buffer,
1115            &metadata,
1116            0,
1117            vec![1, 2],
1118            vec![Some(1.1), Some(2.1)],
1119            2,
1120        );
1121
1122        let mut reader = buffer.read().unwrap().build(Some(&[0])).unwrap();
1123        let mut res = vec![];
1124        while reader.is_valid() {
1125            let batch = reader.current_data_batch();
1126            res.push(extract_data_batch(&batch));
1127            reader.next().unwrap();
1128        }
1129        if dedup {
1130            assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
1131        } else {
1132            assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
1133        }
1134    }
1135
1136    #[test]
1137    fn test_data_buffer_dedup() {
1138        check_data_buffer_dedup(true);
1139        check_data_buffer_dedup(false);
1140    }
1141
1142    fn check_data_buffer_freeze(
1143        pk_weights: Option<&[u16]>,
1144        replace_pk_weights: bool,
1145        expected: &[(u16, Vec<(i64, u64)>)],
1146    ) {
1147        let meta = metadata_for_test();
1148        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1149
1150        // write rows with null values.
1151        write_rows_to_buffer(
1152            &mut buffer,
1153            &meta,
1154            0,
1155            vec![0, 1, 2],
1156            vec![Some(1.0), None, Some(3.0)],
1157            0,
1158        );
1159        write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);
1160
1161        let mut res = Vec::with_capacity(3);
1162        let mut reader = buffer
1163            .freeze(pk_weights, replace_pk_weights)
1164            .unwrap()
1165            .read()
1166            .unwrap();
1167        while reader.is_valid() {
1168            let batch = reader.current_data_batch();
1169            res.push(extract_data_batch(&batch));
1170            reader.next().unwrap();
1171        }
1172        assert_eq!(expected, res);
1173    }
1174
1175    #[test]
1176    fn test_data_buffer_freeze() {
1177        check_data_buffer_freeze(
1178            None,
1179            false,
1180            &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1181        );
1182
1183        check_data_buffer_freeze(
1184            Some(&[1, 2]),
1185            false,
1186            &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1187        );
1188
1189        check_data_buffer_freeze(
1190            Some(&[3, 2]),
1191            true,
1192            &[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
1193        );
1194
1195        check_data_buffer_freeze(
1196            Some(&[3, 2]),
1197            false,
1198            &[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
1199        );
1200    }
1201
1202    #[test]
1203    fn test_encode_data_buffer() {
1204        let meta = metadata_for_test();
1205        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1206
1207        // write rows with null values.
1208        write_rows_to_buffer(
1209            &mut buffer,
1210            &meta,
1211            2,
1212            vec![0, 1, 2],
1213            vec![Some(1.0), None, Some(3.0)],
1214            2,
1215        );
1216
1217        assert_eq!(3, buffer.num_rows());
1218
1219        write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
1220
1221        assert_eq!(4, buffer.num_rows());
1222
1223        let encoder = DataPartEncoder::new(
1224            &meta,
1225            Some(&[0, 1, 2]),
1226            None,
1227            meta.time_index_column().column_schema.name.clone(),
1228            true,
1229            true,
1230        );
1231        let encoded = match encoder.write(&mut buffer).unwrap() {
1232            DataPart::Parquet(data) => data.data,
1233        };
1234
1235        let s = String::from_utf8_lossy(encoded.as_bytes());
1236        assert!(s.starts_with("PAR1"));
1237        assert!(s.ends_with("PAR1"));
1238
1239        let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
1240        let mut reader = builder.build().unwrap();
1241        let batch = reader.next().unwrap().unwrap();
1242        assert_eq!(3, batch.num_rows());
1243    }
1244
1245    fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
1246        let mut output = Vec::with_capacity(expected_values.len());
1247        while reader.is_valid() {
1248            let batch = reader.current_data_batch().slice_record_batch();
1249            let values = batch
1250                .column_by_name("v1")
1251                .unwrap()
1252                .as_any()
1253                .downcast_ref::<Float64Array>()
1254                .unwrap()
1255                .iter()
1256                .map(|v| v.unwrap())
1257                .collect::<Vec<_>>();
1258            output.push(values);
1259            reader.next().unwrap();
1260        }
1261        assert_eq!(expected_values, output);
1262    }
1263
1264    #[test]
1265    fn test_search_next_pk_range() {
1266        let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
1267        assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
1268        assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
1269        assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
1270        assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
1271
1272        assert_eq!(None, search_next_pk_range(&a, 6));
1273    }
1274
1275    fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
1276        let meta = metadata_for_test();
1277        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1278
1279        write_rows_to_buffer(
1280            &mut buffer,
1281            &meta,
1282            3,
1283            vec![1, 2, 3],
1284            vec![Some(1.1), Some(2.1), Some(3.1)],
1285            3,
1286        );
1287
1288        write_rows_to_buffer(
1289            &mut buffer,
1290            &meta,
1291            2,
1292            vec![0, 1, 2],
1293            vec![Some(1.0), Some(2.0), Some(3.0)],
1294            2,
1295        );
1296
1297        let mut iter = buffer.read().unwrap().build(pk_weights).unwrap();
1298        check_buffer_values_equal(&mut iter, expected);
1299    }
1300
1301    #[test]
1302    fn test_iter_data_buffer() {
1303        check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
1304        check_iter_data_buffer(
1305            Some(&[0, 1, 2, 3]),
1306            &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
1307        );
1308        check_iter_data_buffer(
1309            Some(&[3, 2, 1, 0]),
1310            &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
1311        );
1312    }
1313
1314    #[test]
1315    fn test_iter_empty_data_buffer() {
1316        let meta = metadata_for_test();
1317        let buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1318        let mut iter = buffer.read().unwrap().build(Some(&[0, 1, 3, 2])).unwrap();
1319        check_buffer_values_equal(&mut iter, &[]);
1320    }
1321
1322    fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
1323        let mut output = Vec::with_capacity(expected_values.len());
1324        while iter.is_valid() {
1325            let batch = iter.current_data_batch().slice_record_batch();
1326            let values = batch
1327                .column_by_name("v1")
1328                .unwrap()
1329                .as_any()
1330                .downcast_ref::<Float64Array>()
1331                .unwrap()
1332                .iter()
1333                .map(|v| v.unwrap())
1334                .collect::<Vec<_>>();
1335            output.push(values);
1336            iter.next().unwrap();
1337        }
1338        assert_eq!(expected_values, output);
1339    }
1340
1341    fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
1342        let meta = metadata_for_test();
1343        let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1344
1345        write_rows_to_buffer(
1346            &mut buffer,
1347            &meta,
1348            2,
1349            vec![0, 1, 2],
1350            vec![Some(1.0), Some(2.0), Some(3.0)],
1351            2,
1352        );
1353
1354        write_rows_to_buffer(
1355            &mut buffer,
1356            &meta,
1357            3,
1358            vec![1, 2, 3],
1359            vec![Some(1.1), Some(2.1), Some(3.1)],
1360            3,
1361        );
1362
1363        write_rows_to_buffer(
1364            &mut buffer,
1365            &meta,
1366            2,
1367            vec![2, 3],
1368            vec![Some(2.2), Some(2.3)],
1369            4,
1370        );
1371
1372        let encoder = DataPartEncoder::new(
1373            &meta,
1374            Some(weights),
1375            Some(4),
1376            meta.time_index_column().column_schema.name.clone(),
1377            true,
1378            true,
1379        );
1380        let encoded = encoder.write(&mut buffer).unwrap();
1381
1382        let mut iter = encoded.read().unwrap();
1383        check_part_values_equal(&mut iter, expected_values);
1384    }
1385
1386    #[test]
1387    fn test_iter_data_part() {
1388        check_iter_data_part(
1389            &[0, 1, 2, 3],
1390            &[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
1391        );
1392
1393        check_iter_data_part(
1394            &[3, 2, 1, 0],
1395            &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
1396        );
1397    }
1398}