1use std::cmp::{Ordering, Reverse};
18use std::fmt::{Debug, Formatter};
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use bytes::Bytes;
24use datatypes::arrow;
25use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
26use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
27use datatypes::data_type::DataType;
28use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
29use datatypes::schema::ColumnSchema;
30use datatypes::types::TimestampType;
31use datatypes::vectors::{
32 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33 TimestampSecondVector, UInt8Vector, UInt8VectorBuilder, UInt16Vector, UInt16VectorBuilder,
34 UInt64Vector, UInt64VectorBuilder,
35};
36use mito_codec::key_values::KeyValue;
37use parquet::arrow::ArrowWriter;
38use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::properties::{EnabledStatistics, WriterProperties};
41use parquet::schema::types::ColumnPath;
42use snafu::ResultExt;
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
45
46use crate::error;
47use crate::error::Result;
48use crate::memtable::partition_tree::PkIndex;
49use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
50use crate::metrics::{
51 PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
52};
53
54const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
55
56pub(crate) const DATA_INIT_CAP: usize = 8;
58
59#[derive(Debug, Clone, Copy)]
61pub(crate) struct DataBatchRange {
62 pub(crate) pk_index: PkIndex,
64 pub(crate) start: usize,
66 pub(crate) end: usize,
68}
69
70impl DataBatchRange {
71 pub(crate) fn len(&self) -> usize {
72 self.end - self.start
73 }
74}
75
76#[derive(Debug, Clone, Copy)]
78pub struct DataBatch<'a> {
79 rb: &'a RecordBatch,
81 range: DataBatchRange,
83}
84
85impl<'a> DataBatch<'a> {
86 pub(crate) fn pk_index(&self) -> PkIndex {
87 self.range.pk_index
88 }
89
90 pub(crate) fn range(&self) -> DataBatchRange {
91 self.range
92 }
93
94 pub(crate) fn slice_record_batch(&self) -> RecordBatch {
95 self.rb.slice(self.range.start, self.range.len())
96 }
97
98 pub(crate) fn first_row(&self) -> (i64, u64) {
99 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
100 let sequence_values = self
101 .rb
102 .column(2)
103 .as_any()
104 .downcast_ref::<UInt64Array>()
105 .unwrap()
106 .values();
107 (
108 ts_values[self.range.start],
109 sequence_values[self.range.start],
110 )
111 }
112
113 pub(crate) fn last_row(&self) -> (i64, u64) {
114 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
115 let sequence_values = self
116 .rb
117 .column(2)
118 .as_any()
119 .downcast_ref::<UInt64Array>()
120 .unwrap()
121 .values();
122 (
123 ts_values[self.range.end - 1],
124 sequence_values[self.range.end - 1],
125 )
126 }
127
128 pub(crate) fn first_key(&self) -> DataBatchKey {
129 let pk_index = self.pk_index();
130 let ts_array = self.rb.column(1);
131
132 let ts_values = timestamp_array_to_i64_slice(ts_array);
134 let timestamp = ts_values[self.range.start];
135 DataBatchKey {
136 pk_index,
137 timestamp,
138 }
139 }
140
141 pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
142 let DataBatchKey {
143 pk_index,
144 timestamp,
145 } = key;
146 assert_eq!(*pk_index, self.range.pk_index);
147 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
148 let ts_values = &ts_values[self.range.start..self.range.end];
149 ts_values.binary_search(timestamp)
150 }
151
152 pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
153 let start = self.range.start + offset;
154 let end = start + length;
155 DataBatch {
156 rb: self.rb,
157 range: DataBatchRange {
158 pk_index: self.range.pk_index,
159 start,
160 end,
161 },
162 }
163 }
164
165 pub(crate) fn num_rows(&self) -> usize {
166 self.range.len()
167 }
168}
169
170pub struct DataBuffer {
172 metadata: RegionMetadataRef,
173 data_part_schema: SchemaRef,
175 pk_index_builder: UInt16VectorBuilder,
177 ts_builder: Box<dyn MutableVector>,
179 sequence_builder: UInt64VectorBuilder,
181 op_type_builder: UInt8VectorBuilder,
183 field_builders: Vec<LazyMutableVectorBuilder>,
185
186 dedup: bool,
187}
188
189impl DataBuffer {
190 pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
192 let ts_builder = metadata
193 .time_index_column()
194 .column_schema
195 .data_type
196 .create_mutable_vector(init_capacity);
197
198 let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
199 let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
200 let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
201
202 let field_builders = metadata
203 .field_columns()
204 .map(|c| LazyMutableVectorBuilder::new(c.column_schema.data_type.clone()))
205 .collect::<Vec<_>>();
206
207 let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
208 Self {
209 metadata,
210 data_part_schema,
211 pk_index_builder: pk_id_builder,
212 ts_builder,
213 sequence_builder,
214 op_type_builder,
215 field_builders,
216 dedup,
217 }
218 }
219
220 pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
222 self.ts_builder.push_value_ref(&kv.timestamp());
223 self.pk_index_builder.push(Some(pk_index));
224 self.sequence_builder.push(Some(kv.sequence()));
225 self.op_type_builder.push(Some(kv.op_type() as u8));
226
227 debug_assert_eq!(self.field_builders.len(), kv.num_fields());
228
229 for (idx, field) in kv.fields().enumerate() {
230 self.field_builders[idx]
231 .get_or_create_builder(self.ts_builder.len())
232 .push_value_ref(&field);
233 }
234 }
235
236 pub fn freeze(
241 &mut self,
242 pk_weights: Option<&[u16]>,
243 replace_pk_index: bool,
244 ) -> Result<DataPart> {
245 let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone();
246 let encoder = DataPartEncoder::new(
247 &self.metadata,
248 pk_weights,
249 None,
250 timestamp_col_name,
251 replace_pk_index,
252 self.dedup,
253 );
254 let parts = encoder.write(self)?;
255 Ok(parts)
256 }
257
258 pub fn read(&self) -> Result<DataBufferReaderBuilder> {
260 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
261 .with_label_values(&["read_data_buffer"])
262 .start_timer();
263
264 let (pk_index, timestamp, sequence, op_type) = (
265 self.pk_index_builder.finish_cloned(),
266 self.ts_builder.to_vector_cloned(),
267 self.sequence_builder.finish_cloned(),
268 self.op_type_builder.finish_cloned(),
269 );
270
271 let mut fields = Vec::with_capacity(self.field_builders.len());
272 for b in self.field_builders.iter() {
273 let field = match b {
274 LazyMutableVectorBuilder::Type(ty) => LazyFieldVector::Type(ty.clone()),
275 LazyMutableVectorBuilder::Builder(builder) => {
276 LazyFieldVector::Vector(builder.to_vector_cloned())
277 }
278 };
279 fields.push(field);
280 }
281
282 Ok(DataBufferReaderBuilder {
283 schema: self.data_part_schema.clone(),
284 pk_index,
285 timestamp,
286 sequence,
287 op_type,
288 fields,
289 dedup: self.dedup,
290 })
291 }
292
293 pub fn num_rows(&self) -> usize {
295 self.ts_builder.len()
296 }
297
298 pub fn is_empty(&self) -> bool {
300 self.num_rows() == 0
301 }
302}
303
304enum LazyMutableVectorBuilder {
305 Type(ConcreteDataType),
306 Builder(Box<dyn MutableVector>),
307}
308
309impl LazyMutableVectorBuilder {
310 fn new(ty: ConcreteDataType) -> Self {
311 Self::Type(ty)
312 }
313
314 fn get_or_create_builder(&mut self, init_capacity: usize) -> &mut Box<dyn MutableVector> {
315 match self {
316 LazyMutableVectorBuilder::Type(ty) => {
317 let builder = ty.create_mutable_vector(init_capacity);
318 *self = LazyMutableVectorBuilder::Builder(builder);
319 self.get_or_create_builder(init_capacity)
320 }
321 LazyMutableVectorBuilder::Builder(builder) => builder,
322 }
323 }
324}
325
326fn drain_data_buffer_to_record_batches(
330 schema: SchemaRef,
331 buffer: &mut DataBuffer,
332 pk_weights: Option<&[u16]>,
333 dedup: bool,
334 replace_pk_index: bool,
335) -> Result<RecordBatch> {
336 let num_rows = buffer.ts_builder.len();
337
338 let (pk_index_v, ts_v, sequence_v, op_type_v) = (
339 buffer.pk_index_builder.finish(),
340 buffer.ts_builder.to_vector(),
341 buffer.sequence_builder.finish(),
342 buffer.op_type_builder.finish(),
343 );
344
345 let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
346 pk_weights,
347 pk_index_v,
348 ts_v,
349 sequence_v,
350 op_type_v,
351 replace_pk_index,
352 dedup,
353 buffer.field_builders.len() + 4,
354 )?;
355
356 for b in buffer.field_builders.iter_mut() {
357 let array = match b {
358 LazyMutableVectorBuilder::Type(ty) => {
359 let mut single_null = ty.create_mutable_vector(num_rows);
360 single_null.push_nulls(num_rows);
361 single_null.to_vector().to_arrow_array()
362 }
363 LazyMutableVectorBuilder::Builder(builder) => builder.to_vector().to_arrow_array(),
364 };
365 columns.push(
366 arrow::compute::take(&array, &indices_to_take, None)
367 .context(error::ComputeArrowSnafu)?,
368 );
369 }
370
371 RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
372}
373
374#[allow(clippy::too_many_arguments)]
375fn build_row_sort_indices_and_columns(
376 pk_weights: Option<&[u16]>,
377 pk_index: UInt16Vector,
378 ts: VectorRef,
379 sequence: UInt64Vector,
380 op_type: UInt8Vector,
381 replace_pk_index: bool,
382 dedup: bool,
383 column_num: usize,
384) -> Result<(UInt32Array, Vec<ArrayRef>)> {
385 let mut rows = build_rows_to_sort(pk_weights, &pk_index, &ts, &sequence);
386
387 let pk_array = if replace_pk_index {
388 Arc::new(UInt16Array::from_iter_values(
390 rows.iter().map(|(_, key)| key.pk_weight),
391 )) as Arc<_>
392 } else {
393 pk_index.to_arrow_array()
394 };
395
396 rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
398 if dedup {
399 rows.dedup_by(|l, r| l.1.pk_weight == r.1.pk_weight && l.1.timestamp == r.1.timestamp);
400 }
401
402 let indices_to_take = UInt32Array::from_iter_values(rows.iter().map(|(idx, _)| *idx as u32));
403
404 let mut columns = Vec::with_capacity(column_num);
405
406 columns.push(
407 arrow::compute::take(&pk_array, &indices_to_take, None)
408 .context(error::ComputeArrowSnafu)?,
409 );
410
411 columns.push(
412 arrow::compute::take(&ts.to_arrow_array(), &indices_to_take, None)
413 .context(error::ComputeArrowSnafu)?,
414 );
415
416 columns.push(
417 arrow::compute::take(&sequence.as_arrow(), &indices_to_take, None)
418 .context(error::ComputeArrowSnafu)?,
419 );
420
421 columns.push(
422 arrow::compute::take(&op_type.as_arrow(), &indices_to_take, None)
423 .context(error::ComputeArrowSnafu)?,
424 );
425
426 Ok((indices_to_take, columns))
427}
428
429pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
430 use datatypes::arrow::array::{
431 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
432 TimestampSecondArray,
433 };
434 use datatypes::arrow::datatypes::{DataType, TimeUnit};
435
436 match arr.data_type() {
437 DataType::Timestamp(t, _) => match t {
438 TimeUnit::Second => arr
439 .as_any()
440 .downcast_ref::<TimestampSecondArray>()
441 .unwrap()
442 .values(),
443 TimeUnit::Millisecond => arr
444 .as_any()
445 .downcast_ref::<TimestampMillisecondArray>()
446 .unwrap()
447 .values(),
448 TimeUnit::Microsecond => arr
449 .as_any()
450 .downcast_ref::<TimestampMicrosecondArray>()
451 .unwrap()
452 .values(),
453 TimeUnit::Nanosecond => arr
454 .as_any()
455 .downcast_ref::<TimestampNanosecondArray>()
456 .unwrap()
457 .values(),
458 },
459 _ => unreachable!(),
460 }
461}
462
463enum LazyFieldVector {
464 Type(ConcreteDataType),
465 Vector(VectorRef),
466}
467
468pub(crate) struct DataBufferReaderBuilder {
469 schema: SchemaRef,
470 pk_index: UInt16Vector,
471 timestamp: VectorRef,
472 sequence: UInt64Vector,
473 op_type: UInt8Vector,
474 fields: Vec<LazyFieldVector>,
475 dedup: bool,
476}
477
478impl DataBufferReaderBuilder {
479 fn build_record_batch(self, pk_weights: Option<&[u16]>) -> Result<RecordBatch> {
480 let num_rows = self.timestamp.len();
481 let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
482 pk_weights,
483 self.pk_index,
484 self.timestamp,
485 self.sequence,
486 self.op_type,
487 false,
491 self.dedup,
492 self.fields.len() + 4,
493 )?;
494
495 for b in self.fields.iter() {
496 let array = match b {
497 LazyFieldVector::Type(ty) => {
498 let mut single_null = ty.create_mutable_vector(num_rows);
499 single_null.push_nulls(num_rows);
500 single_null.to_vector().to_arrow_array()
501 }
502 LazyFieldVector::Vector(vector) => vector.to_arrow_array(),
503 };
504 columns.push(
505 arrow::compute::take(&array, &indices_to_take, None)
506 .context(error::ComputeArrowSnafu)?,
507 );
508 }
509 RecordBatch::try_new(self.schema, columns).context(error::NewRecordBatchSnafu)
510 }
511
512 pub fn build(self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
513 self.build_record_batch(pk_weights)
514 .and_then(DataBufferReader::new)
515 }
516}
517
518#[derive(Debug)]
519pub(crate) struct DataBufferReader {
520 batch: RecordBatch,
521 offset: usize,
522 current_range: Option<DataBatchRange>,
523 elapsed_time: Duration,
524}
525
526impl Drop for DataBufferReader {
527 fn drop(&mut self) {
528 PARTITION_TREE_READ_STAGE_ELAPSED
529 .with_label_values(&["read_data_buffer"])
530 .observe(self.elapsed_time.as_secs_f64())
531 }
532}
533
534impl DataBufferReader {
535 pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
536 let mut reader = Self {
537 batch,
538 offset: 0,
539 current_range: None,
540 elapsed_time: Duration::default(),
541 };
542 reader.next()?; Ok(reader)
544 }
545
546 pub(crate) fn is_valid(&self) -> bool {
547 self.current_range.is_some()
548 }
549
550 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
554 let range = self.current_range.unwrap();
555 DataBatch {
556 rb: &self.batch,
557 range,
558 }
559 }
560
561 pub(crate) fn next(&mut self) -> Result<()> {
563 if self.offset >= self.batch.num_rows() {
564 self.current_range = None;
565 return Ok(());
566 }
567 let start = Instant::now();
568 let pk_index_array = pk_index_array(&self.batch);
569 if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
570 self.offset = range.end;
571 self.current_range = Some(DataBatchRange {
572 pk_index: next_pk,
573 start: range.start,
574 end: range.end,
575 });
576 } else {
577 self.current_range = None;
578 }
579 self.elapsed_time += start.elapsed();
580 Ok(())
581 }
582}
583
584fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
588 batch
589 .column(0)
590 .as_any()
591 .downcast_ref::<UInt16Array>()
592 .unwrap()
593}
594
595fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
597 let num_rows = array.len();
598 if start >= num_rows {
599 return None;
600 }
601
602 let values = array.values();
603 let next_pk = values[start];
604
605 for idx in start..num_rows {
606 if values[idx] != next_pk {
607 return Some((next_pk, start..idx));
608 }
609 }
610 Some((next_pk, start..num_rows))
611}
612
613#[derive(Eq, PartialEq)]
614struct InnerKey {
615 pk_weight: u16,
616 timestamp: i64,
617 sequence: u64,
618}
619
620impl PartialOrd for InnerKey {
621 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
622 Some(self.cmp(other))
623 }
624}
625
626impl Ord for InnerKey {
627 fn cmp(&self, other: &Self) -> Ordering {
628 (self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
629 other.pk_weight,
630 other.timestamp,
631 Reverse(other.sequence),
632 ))
633 }
634}
635
636fn build_rows_to_sort(
637 pk_weights: Option<&[u16]>,
638 pk_index: &UInt16Vector,
639 ts: &VectorRef,
640 sequence: &UInt64Vector,
641) -> Vec<(usize, InnerKey)> {
642 let ts_values = match ts.data_type() {
643 ConcreteDataType::Timestamp(t) => match t {
644 TimestampType::Second(_) => ts
645 .as_any()
646 .downcast_ref::<TimestampSecondVector>()
647 .unwrap()
648 .as_arrow()
649 .values(),
650 TimestampType::Millisecond(_) => ts
651 .as_any()
652 .downcast_ref::<TimestampMillisecondVector>()
653 .unwrap()
654 .as_arrow()
655 .values(),
656 TimestampType::Microsecond(_) => ts
657 .as_any()
658 .downcast_ref::<TimestampMicrosecondVector>()
659 .unwrap()
660 .as_arrow()
661 .values(),
662 TimestampType::Nanosecond(_) => ts
663 .as_any()
664 .downcast_ref::<TimestampNanosecondVector>()
665 .unwrap()
666 .as_arrow()
667 .values(),
668 },
669 other => unreachable!("Unexpected type {:?}", other),
670 };
671 let pk_index_values = pk_index.as_arrow().values();
672 let sequence_values = sequence.as_arrow().values();
673 debug_assert_eq!(ts_values.len(), pk_index_values.len());
674 debug_assert_eq!(ts_values.len(), sequence_values.len());
675
676 ts_values
677 .iter()
678 .zip(pk_index_values.iter())
679 .zip(sequence_values.iter())
680 .enumerate()
681 .map(|(idx, ((timestamp, pk_index), sequence))| {
682 let pk_weight = if let Some(weights) = pk_weights {
683 weights[*pk_index as usize] } else {
685 *pk_index };
687 (
688 idx,
689 InnerKey {
690 timestamp: *timestamp,
691 pk_weight,
692 sequence: *sequence,
693 },
694 )
695 })
696 .collect()
697}
698
699fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
700 use datatypes::arrow::datatypes::DataType;
701 let ColumnSchema {
702 name: ts_name,
703 data_type: ts_type,
704 ..
705 } = &schema.time_index_column().column_schema;
706
707 let mut fields = vec![
708 Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
709 Field::new(ts_name, ts_type.as_arrow_type(), false),
710 Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
711 Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
712 ];
713
714 fields.extend(schema.field_columns().map(|c| {
715 Field::new(
716 &c.column_schema.name,
717 c.column_schema.data_type.as_arrow_type(),
718 c.column_schema.is_nullable(),
719 )
720 }));
721
722 Arc::new(Schema::new(fields))
723}
724
725struct DataPartEncoder<'a> {
726 schema: SchemaRef,
727 pk_weights: Option<&'a [u16]>,
728 row_group_size: Option<usize>,
729 timestamp_column_name: String,
730 replace_pk_index: bool,
731 dedup: bool,
732}
733
734impl<'a> DataPartEncoder<'a> {
735 pub fn new(
736 metadata: &RegionMetadataRef,
737 pk_weights: Option<&'a [u16]>,
738 row_group_size: Option<usize>,
739 timestamp_column_name: String,
740 replace_pk_index: bool,
741 dedup: bool,
742 ) -> DataPartEncoder<'a> {
743 let schema = memtable_schema_to_encoded_schema(metadata);
744 Self {
745 schema,
746 pk_weights,
747 row_group_size,
748 timestamp_column_name,
749 replace_pk_index,
750 dedup,
751 }
752 }
753
754 fn writer_props(self) -> WriterProperties {
756 let mut builder = WriterProperties::builder();
757 if let Some(row_group_size) = self.row_group_size {
758 builder = builder.set_max_row_group_size(row_group_size)
759 }
760
761 let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);
762 let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]);
763 let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
764 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
765
766 builder = builder
767 .set_compression(Compression::ZSTD(ZstdLevel::default()))
768 .set_statistics_enabled(EnabledStatistics::None);
769 builder = builder
770 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
771 .set_column_dictionary_enabled(ts_col, false)
772 .set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED)
773 .set_column_dictionary_enabled(pk_index_col, true)
774 .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
775 .set_column_dictionary_enabled(sequence_col, false)
776 .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
777 .set_column_dictionary_enabled(op_type_col, true)
778 .set_column_index_truncate_length(None)
779 .set_statistics_truncate_length(None);
780 builder.build()
781 }
782
783 pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
784 let mut bytes = Vec::with_capacity(1024);
785
786 let rb = {
787 let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
788 .with_label_values(&["drain_data_buffer_to_batch"])
789 .start_timer();
790 drain_data_buffer_to_record_batches(
791 self.schema.clone(),
792 source,
793 self.pk_weights,
794 self.dedup,
795 self.replace_pk_index,
796 )?
797 };
798
799 {
800 let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
801 .with_label_values(&["encode"])
802 .start_timer();
803 let mut writer =
804 ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
805 .context(error::EncodeMemtableSnafu)?;
806 writer.write(&rb).context(error::EncodeMemtableSnafu)?;
807 let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
808 }
809 Ok(DataPart::Parquet(ParquetPart {
810 data: Bytes::from(bytes),
811 }))
812 }
813}
814
815pub enum DataPart {
817 Parquet(ParquetPart),
818}
819
820impl DataPart {
821 pub fn read(&self) -> Result<DataPartReader> {
823 match self {
824 DataPart::Parquet(data_bytes) => DataPartReader::new(data_bytes.data.clone(), None),
825 }
826 }
827
828 fn is_empty(&self) -> bool {
829 match self {
830 DataPart::Parquet(p) => p.data.is_empty(),
831 }
832 }
833}
834
835pub struct DataPartReader {
836 inner: ParquetRecordBatchReader,
837 current_batch: Option<RecordBatch>,
838 current_range: Option<DataBatchRange>,
839 elapsed: Duration,
840}
841
842impl Drop for DataPartReader {
843 fn drop(&mut self) {
844 PARTITION_TREE_READ_STAGE_ELAPSED
845 .with_label_values(&["read_data_part"])
846 .observe(self.elapsed.as_secs_f64());
847 }
848}
849
850impl Debug for DataPartReader {
851 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
852 f.debug_struct("DataPartReader")
853 .field("current_range", &self.current_range)
854 .finish()
855 }
856}
857
858impl DataPartReader {
859 pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
860 let mut builder =
861 ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
862 if let Some(batch_size) = batch_size {
863 builder = builder.with_batch_size(batch_size);
864 }
865 let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
866 let mut reader = Self {
867 inner: parquet_reader,
868 current_batch: None,
869 current_range: None,
870 elapsed: Default::default(),
871 };
872 reader.next()?;
873 Ok(reader)
874 }
875
876 pub(crate) fn is_valid(&self) -> bool {
878 self.current_range.is_some()
879 }
880
881 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
885 let range = self.current_range.unwrap();
886 DataBatch {
887 rb: self.current_batch.as_ref().unwrap(),
888 range,
889 }
890 }
891
892 pub(crate) fn next(&mut self) -> Result<()> {
893 let start = Instant::now();
894 if let Some((next_pk, range)) = self.search_next_pk_range() {
895 self.current_range = Some(DataBatchRange {
897 pk_index: next_pk,
898 start: range.start,
899 end: range.end,
900 });
901 } else {
902 if let Some(rb) = self.inner.next() {
904 let rb = rb.context(error::ComputeArrowSnafu)?;
905 self.current_batch = Some(rb);
906 self.current_range = None;
907 return self.next();
908 } else {
909 self.current_batch = None;
911 self.current_range = None;
912 }
913 }
914 self.elapsed += start.elapsed();
915 Ok(())
916 }
917
918 fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
920 self.current_batch.as_ref().and_then(|b| {
921 let pk_array = pk_index_array(b);
923 let start = self
924 .current_range
925 .as_ref()
926 .map(|range| range.end)
927 .unwrap_or(0);
928 search_next_pk_range(pk_array, start)
929 })
930 }
931}
932
933pub struct ParquetPart {
935 data: Bytes,
936}
937
938pub struct DataParts {
940 active: DataBuffer,
942 frozen: Vec<DataPart>,
944}
945
946impl DataParts {
947 pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
948 Self {
949 active: DataBuffer::with_capacity(metadata, capacity, dedup),
950 frozen: Vec::new(),
951 }
952 }
953
954 pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
955 self.frozen = frozen;
956 self
957 }
958
959 pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
961 self.active.write_row(pk_index, kv)
962 }
963
964 pub fn num_active_rows(&self) -> usize {
966 self.active.num_rows()
967 }
968
969 pub fn freeze(&mut self) -> Result<()> {
971 let part = self.active.freeze(None, false)?;
972 self.frozen.push(part);
973 Ok(())
974 }
975
976 pub fn read(&self) -> Result<DataPartsReaderBuilder> {
980 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
981 .with_label_values(&["build_data_parts_reader"])
982 .start_timer();
983
984 let buffer = self.active.read()?;
985 let mut parts = Vec::with_capacity(self.frozen.len());
986 for p in &self.frozen {
987 parts.push(p.read()?);
988 }
989 Ok(DataPartsReaderBuilder { buffer, parts })
990 }
991
992 pub(crate) fn is_empty(&self) -> bool {
993 self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
994 }
995
996 #[cfg(test)]
997 pub(crate) fn frozen_len(&self) -> usize {
998 self.frozen.len()
999 }
1000}
1001
1002pub struct DataPartsReaderBuilder {
1003 buffer: DataBufferReaderBuilder,
1004 parts: Vec<DataPartReader>,
1005}
1006
1007impl DataPartsReaderBuilder {
1008 pub(crate) fn build(self) -> Result<DataPartsReader> {
1009 let mut nodes = Vec::with_capacity(self.parts.len() + 1);
1010 nodes.push(DataNode::new(DataSource::Buffer(
1011 self.buffer.build(None)?,
1014 )));
1015 for p in self.parts {
1016 nodes.push(DataNode::new(DataSource::Part(p)));
1017 }
1018 let num_parts = nodes.len();
1019 let merger = Merger::try_new(nodes)?;
1020 Ok(DataPartsReader {
1021 merger,
1022 num_parts,
1023 elapsed: Default::default(),
1024 })
1025 }
1026}
1027
1028pub struct DataPartsReader {
1030 merger: Merger<DataNode>,
1031 num_parts: usize,
1032 elapsed: Duration,
1033}
1034
1035impl Drop for DataPartsReader {
1036 fn drop(&mut self) {
1037 PARTITION_TREE_READ_STAGE_ELAPSED
1038 .with_label_values(&["read_data_parts"])
1039 .observe(self.elapsed.as_secs_f64())
1040 }
1041}
1042
1043impl DataPartsReader {
1044 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
1045 let batch = self.merger.current_node().current_data_batch();
1046 batch.slice(0, self.merger.current_rows())
1047 }
1048
1049 pub(crate) fn next(&mut self) -> Result<()> {
1050 let start = Instant::now();
1051 let result = self.merger.next();
1052 self.elapsed += start.elapsed();
1053 result
1054 }
1055
1056 pub(crate) fn is_valid(&self) -> bool {
1057 self.merger.is_valid()
1058 }
1059
1060 pub(crate) fn num_parts(&self) -> usize {
1061 self.num_parts
1062 }
1063}
1064
1065#[cfg(test)]
1066mod tests {
1067 use datafusion::arrow::array::Float64Array;
1068 use datatypes::arrow::array::UInt16Array;
1069 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1070 use parquet::data_type::AsBytes;
1071
1072 use super::*;
1073 use crate::test_util::memtable_util::{
1074 extract_data_batch, metadata_for_test, write_rows_to_buffer,
1075 };
1076
1077 #[test]
1078 fn test_lazy_mutable_vector_builder() {
1079 let mut builder = LazyMutableVectorBuilder::new(ConcreteDataType::boolean_datatype());
1080 match builder {
1081 LazyMutableVectorBuilder::Type(ref t) => {
1082 assert_eq!(&ConcreteDataType::boolean_datatype(), t);
1083 }
1084 LazyMutableVectorBuilder::Builder(_) => {
1085 unreachable!()
1086 }
1087 }
1088 builder.get_or_create_builder(1);
1089 match builder {
1090 LazyMutableVectorBuilder::Type(_) => {
1091 unreachable!()
1092 }
1093 LazyMutableVectorBuilder::Builder(_) => {}
1094 }
1095 }
1096
1097 fn check_data_buffer_dedup(dedup: bool) {
1098 let metadata = metadata_for_test();
1099 let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
1100 write_rows_to_buffer(
1101 &mut buffer,
1102 &metadata,
1103 0,
1104 vec![2, 3],
1105 vec![Some(1.0), Some(2.0)],
1106 0,
1107 );
1108 write_rows_to_buffer(
1109 &mut buffer,
1110 &metadata,
1111 0,
1112 vec![1, 2],
1113 vec![Some(1.1), Some(2.1)],
1114 2,
1115 );
1116
1117 let mut reader = buffer.read().unwrap().build(Some(&[0])).unwrap();
1118 let mut res = vec![];
1119 while reader.is_valid() {
1120 let batch = reader.current_data_batch();
1121 res.push(extract_data_batch(&batch));
1122 reader.next().unwrap();
1123 }
1124 if dedup {
1125 assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
1126 } else {
1127 assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
1128 }
1129 }
1130
1131 #[test]
1132 fn test_data_buffer_dedup() {
1133 check_data_buffer_dedup(true);
1134 check_data_buffer_dedup(false);
1135 }
1136
1137 fn check_data_buffer_freeze(
1138 pk_weights: Option<&[u16]>,
1139 replace_pk_weights: bool,
1140 expected: &[(u16, Vec<(i64, u64)>)],
1141 ) {
1142 let meta = metadata_for_test();
1143 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1144
1145 write_rows_to_buffer(
1147 &mut buffer,
1148 &meta,
1149 0,
1150 vec![0, 1, 2],
1151 vec![Some(1.0), None, Some(3.0)],
1152 0,
1153 );
1154 write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);
1155
1156 let mut res = Vec::with_capacity(3);
1157 let mut reader = buffer
1158 .freeze(pk_weights, replace_pk_weights)
1159 .unwrap()
1160 .read()
1161 .unwrap();
1162 while reader.is_valid() {
1163 let batch = reader.current_data_batch();
1164 res.push(extract_data_batch(&batch));
1165 reader.next().unwrap();
1166 }
1167 assert_eq!(expected, res);
1168 }
1169
1170 #[test]
1171 fn test_data_buffer_freeze() {
1172 check_data_buffer_freeze(
1173 None,
1174 false,
1175 &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1176 );
1177
1178 check_data_buffer_freeze(
1179 Some(&[1, 2]),
1180 false,
1181 &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1182 );
1183
1184 check_data_buffer_freeze(
1185 Some(&[3, 2]),
1186 true,
1187 &[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
1188 );
1189
1190 check_data_buffer_freeze(
1191 Some(&[3, 2]),
1192 false,
1193 &[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
1194 );
1195 }
1196
1197 #[test]
1198 fn test_encode_data_buffer() {
1199 let meta = metadata_for_test();
1200 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1201
1202 write_rows_to_buffer(
1204 &mut buffer,
1205 &meta,
1206 2,
1207 vec![0, 1, 2],
1208 vec![Some(1.0), None, Some(3.0)],
1209 2,
1210 );
1211
1212 assert_eq!(3, buffer.num_rows());
1213
1214 write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
1215
1216 assert_eq!(4, buffer.num_rows());
1217
1218 let encoder = DataPartEncoder::new(
1219 &meta,
1220 Some(&[0, 1, 2]),
1221 None,
1222 meta.time_index_column().column_schema.name.clone(),
1223 true,
1224 true,
1225 );
1226 let encoded = match encoder.write(&mut buffer).unwrap() {
1227 DataPart::Parquet(data) => data.data,
1228 };
1229
1230 let s = String::from_utf8_lossy(encoded.as_bytes());
1231 assert!(s.starts_with("PAR1"));
1232 assert!(s.ends_with("PAR1"));
1233
1234 let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
1235 let mut reader = builder.build().unwrap();
1236 let batch = reader.next().unwrap().unwrap();
1237 assert_eq!(3, batch.num_rows());
1238 }
1239
1240 fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
1241 let mut output = Vec::with_capacity(expected_values.len());
1242 while reader.is_valid() {
1243 let batch = reader.current_data_batch().slice_record_batch();
1244 let values = batch
1245 .column_by_name("v1")
1246 .unwrap()
1247 .as_any()
1248 .downcast_ref::<Float64Array>()
1249 .unwrap()
1250 .iter()
1251 .map(|v| v.unwrap())
1252 .collect::<Vec<_>>();
1253 output.push(values);
1254 reader.next().unwrap();
1255 }
1256 assert_eq!(expected_values, output);
1257 }
1258
1259 #[test]
1260 fn test_search_next_pk_range() {
1261 let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
1262 assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
1263 assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
1264 assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
1265 assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
1266
1267 assert_eq!(None, search_next_pk_range(&a, 6));
1268 }
1269
1270 fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
1271 let meta = metadata_for_test();
1272 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1273
1274 write_rows_to_buffer(
1275 &mut buffer,
1276 &meta,
1277 3,
1278 vec![1, 2, 3],
1279 vec![Some(1.1), Some(2.1), Some(3.1)],
1280 3,
1281 );
1282
1283 write_rows_to_buffer(
1284 &mut buffer,
1285 &meta,
1286 2,
1287 vec![0, 1, 2],
1288 vec![Some(1.0), Some(2.0), Some(3.0)],
1289 2,
1290 );
1291
1292 let mut iter = buffer.read().unwrap().build(pk_weights).unwrap();
1293 check_buffer_values_equal(&mut iter, expected);
1294 }
1295
1296 #[test]
1297 fn test_iter_data_buffer() {
1298 check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
1299 check_iter_data_buffer(
1300 Some(&[0, 1, 2, 3]),
1301 &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
1302 );
1303 check_iter_data_buffer(
1304 Some(&[3, 2, 1, 0]),
1305 &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
1306 );
1307 }
1308
1309 #[test]
1310 fn test_iter_empty_data_buffer() {
1311 let meta = metadata_for_test();
1312 let buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1313 let mut iter = buffer.read().unwrap().build(Some(&[0, 1, 3, 2])).unwrap();
1314 check_buffer_values_equal(&mut iter, &[]);
1315 }
1316
1317 fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
1318 let mut output = Vec::with_capacity(expected_values.len());
1319 while iter.is_valid() {
1320 let batch = iter.current_data_batch().slice_record_batch();
1321 let values = batch
1322 .column_by_name("v1")
1323 .unwrap()
1324 .as_any()
1325 .downcast_ref::<Float64Array>()
1326 .unwrap()
1327 .iter()
1328 .map(|v| v.unwrap())
1329 .collect::<Vec<_>>();
1330 output.push(values);
1331 iter.next().unwrap();
1332 }
1333 assert_eq!(expected_values, output);
1334 }
1335
1336 fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
1337 let meta = metadata_for_test();
1338 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1339
1340 write_rows_to_buffer(
1341 &mut buffer,
1342 &meta,
1343 2,
1344 vec![0, 1, 2],
1345 vec![Some(1.0), Some(2.0), Some(3.0)],
1346 2,
1347 );
1348
1349 write_rows_to_buffer(
1350 &mut buffer,
1351 &meta,
1352 3,
1353 vec![1, 2, 3],
1354 vec![Some(1.1), Some(2.1), Some(3.1)],
1355 3,
1356 );
1357
1358 write_rows_to_buffer(
1359 &mut buffer,
1360 &meta,
1361 2,
1362 vec![2, 3],
1363 vec![Some(2.2), Some(2.3)],
1364 4,
1365 );
1366
1367 let encoder = DataPartEncoder::new(
1368 &meta,
1369 Some(weights),
1370 Some(4),
1371 meta.time_index_column().column_schema.name.clone(),
1372 true,
1373 true,
1374 );
1375 let encoded = encoder.write(&mut buffer).unwrap();
1376
1377 let mut iter = encoded.read().unwrap();
1378 check_part_values_equal(&mut iter, expected_values);
1379 }
1380
1381 #[test]
1382 fn test_iter_data_part() {
1383 check_iter_data_part(
1384 &[0, 1, 2, 3],
1385 &[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
1386 );
1387
1388 check_iter_data_part(
1389 &[3, 2, 1, 0],
1390 &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
1391 );
1392 }
1393}