1use std::cmp::{Ordering, Reverse};
18use std::fmt::{Debug, Formatter};
19use std::ops::Range;
20use std::sync::Arc;
21use std::time::{Duration, Instant};
22
23use bytes::Bytes;
24use datatypes::arrow;
25use datatypes::arrow::array::{ArrayRef, RecordBatch, UInt16Array, UInt32Array, UInt64Array};
26use datatypes::arrow::datatypes::{Field, Schema, SchemaRef};
27use datatypes::data_type::DataType;
28use datatypes::prelude::{ConcreteDataType, MutableVector, ScalarVectorBuilder, Vector, VectorRef};
29use datatypes::schema::ColumnSchema;
30use datatypes::types::TimestampType;
31use datatypes::vectors::{
32 TimestampMicrosecondVector, TimestampMillisecondVector, TimestampNanosecondVector,
33 TimestampSecondVector, UInt8Vector, UInt8VectorBuilder, UInt16Vector, UInt16VectorBuilder,
34 UInt64Vector, UInt64VectorBuilder,
35};
36use mito_codec::key_values::KeyValue;
37use parquet::arrow::ArrowWriter;
38use parquet::arrow::arrow_reader::{ParquetRecordBatchReader, ParquetRecordBatchReaderBuilder};
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::properties::{EnabledStatistics, WriterProperties};
41use parquet::schema::types::ColumnPath;
42use snafu::ResultExt;
43use store_api::metadata::RegionMetadataRef;
44use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
45
46use crate::error;
47use crate::error::Result;
48use crate::memtable::partition_tree::PkIndex;
49use crate::memtable::partition_tree::merger::{DataBatchKey, DataNode, DataSource, Merger};
50use crate::metrics::{
51 PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED, PARTITION_TREE_READ_STAGE_ELAPSED,
52};
53use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
54
55const PK_INDEX_COLUMN_NAME: &str = "__pk_index";
56
57pub(crate) const DATA_INIT_CAP: usize = 8;
59
60#[derive(Debug, Clone, Copy)]
62pub(crate) struct DataBatchRange {
63 pub(crate) pk_index: PkIndex,
65 pub(crate) start: usize,
67 pub(crate) end: usize,
69}
70
71impl DataBatchRange {
72 pub(crate) fn len(&self) -> usize {
73 self.end - self.start
74 }
75}
76
77#[derive(Debug, Clone, Copy)]
79pub struct DataBatch<'a> {
80 rb: &'a RecordBatch,
82 range: DataBatchRange,
84}
85
86impl<'a> DataBatch<'a> {
87 pub(crate) fn pk_index(&self) -> PkIndex {
88 self.range.pk_index
89 }
90
91 pub(crate) fn range(&self) -> DataBatchRange {
92 self.range
93 }
94
95 pub(crate) fn slice_record_batch(&self) -> RecordBatch {
96 self.rb.slice(self.range.start, self.range.len())
97 }
98
99 pub(crate) fn first_row(&self) -> (i64, u64) {
100 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
101 let sequence_values = self
102 .rb
103 .column(2)
104 .as_any()
105 .downcast_ref::<UInt64Array>()
106 .unwrap()
107 .values();
108 (
109 ts_values[self.range.start],
110 sequence_values[self.range.start],
111 )
112 }
113
114 pub(crate) fn last_row(&self) -> (i64, u64) {
115 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
116 let sequence_values = self
117 .rb
118 .column(2)
119 .as_any()
120 .downcast_ref::<UInt64Array>()
121 .unwrap()
122 .values();
123 (
124 ts_values[self.range.end - 1],
125 sequence_values[self.range.end - 1],
126 )
127 }
128
129 pub(crate) fn first_key(&self) -> DataBatchKey {
130 let pk_index = self.pk_index();
131 let ts_array = self.rb.column(1);
132
133 let ts_values = timestamp_array_to_i64_slice(ts_array);
135 let timestamp = ts_values[self.range.start];
136 DataBatchKey {
137 pk_index,
138 timestamp,
139 }
140 }
141
142 pub(crate) fn search_key(&self, key: &DataBatchKey) -> Result<usize, usize> {
143 let DataBatchKey {
144 pk_index,
145 timestamp,
146 } = key;
147 assert_eq!(*pk_index, self.range.pk_index);
148 let ts_values = timestamp_array_to_i64_slice(self.rb.column(1));
149 let ts_values = &ts_values[self.range.start..self.range.end];
150 ts_values.binary_search(timestamp)
151 }
152
153 pub(crate) fn slice(self, offset: usize, length: usize) -> DataBatch<'a> {
154 let start = self.range.start + offset;
155 let end = start + length;
156 DataBatch {
157 rb: self.rb,
158 range: DataBatchRange {
159 pk_index: self.range.pk_index,
160 start,
161 end,
162 },
163 }
164 }
165
166 pub(crate) fn num_rows(&self) -> usize {
167 self.range.len()
168 }
169}
170
171pub struct DataBuffer {
173 metadata: RegionMetadataRef,
174 data_part_schema: SchemaRef,
176 pk_index_builder: UInt16VectorBuilder,
178 ts_builder: Box<dyn MutableVector>,
180 sequence_builder: UInt64VectorBuilder,
182 op_type_builder: UInt8VectorBuilder,
184 field_builders: Vec<LazyMutableVectorBuilder>,
186
187 dedup: bool,
188}
189
190impl DataBuffer {
191 pub fn with_capacity(metadata: RegionMetadataRef, init_capacity: usize, dedup: bool) -> Self {
193 let ts_builder = metadata
194 .time_index_column()
195 .column_schema
196 .data_type
197 .create_mutable_vector(init_capacity);
198
199 let pk_id_builder = UInt16VectorBuilder::with_capacity(init_capacity);
200 let sequence_builder = UInt64VectorBuilder::with_capacity(init_capacity);
201 let op_type_builder = UInt8VectorBuilder::with_capacity(init_capacity);
202
203 let field_builders = metadata
204 .field_columns()
205 .map(|c| LazyMutableVectorBuilder::new(c.column_schema.data_type.clone()))
206 .collect::<Vec<_>>();
207
208 let data_part_schema = memtable_schema_to_encoded_schema(&metadata);
209 Self {
210 metadata,
211 data_part_schema,
212 pk_index_builder: pk_id_builder,
213 ts_builder,
214 sequence_builder,
215 op_type_builder,
216 field_builders,
217 dedup,
218 }
219 }
220
221 pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
223 self.ts_builder.push_value_ref(&kv.timestamp());
224 self.pk_index_builder.push(Some(pk_index));
225 self.sequence_builder.push(Some(kv.sequence()));
226 self.op_type_builder.push(Some(kv.op_type() as u8));
227
228 debug_assert_eq!(self.field_builders.len(), kv.num_fields());
229
230 for (idx, field) in kv.fields().enumerate() {
231 self.field_builders[idx]
232 .get_or_create_builder(self.ts_builder.len())
233 .push_value_ref(&field);
234 }
235 }
236
237 pub fn freeze(
242 &mut self,
243 pk_weights: Option<&[u16]>,
244 replace_pk_index: bool,
245 ) -> Result<DataPart> {
246 let timestamp_col_name = self.metadata.time_index_column().column_schema.name.clone();
247 let encoder = DataPartEncoder::new(
248 &self.metadata,
249 pk_weights,
250 None,
251 timestamp_col_name,
252 replace_pk_index,
253 self.dedup,
254 );
255 let parts = encoder.write(self)?;
256 Ok(parts)
257 }
258
259 pub fn read(&self) -> Result<DataBufferReaderBuilder> {
261 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
262 .with_label_values(&["read_data_buffer"])
263 .start_timer();
264
265 let (pk_index, timestamp, sequence, op_type) = (
266 self.pk_index_builder.finish_cloned(),
267 self.ts_builder.to_vector_cloned(),
268 self.sequence_builder.finish_cloned(),
269 self.op_type_builder.finish_cloned(),
270 );
271
272 let mut fields = Vec::with_capacity(self.field_builders.len());
273 for b in self.field_builders.iter() {
274 let field = match b {
275 LazyMutableVectorBuilder::Type(ty) => LazyFieldVector::Type(ty.clone()),
276 LazyMutableVectorBuilder::Builder(builder) => {
277 LazyFieldVector::Vector(builder.to_vector_cloned())
278 }
279 };
280 fields.push(field);
281 }
282
283 Ok(DataBufferReaderBuilder {
284 schema: self.data_part_schema.clone(),
285 pk_index,
286 timestamp,
287 sequence,
288 op_type,
289 fields,
290 dedup: self.dedup,
291 })
292 }
293
294 pub fn num_rows(&self) -> usize {
296 self.ts_builder.len()
297 }
298
299 pub fn is_empty(&self) -> bool {
301 self.num_rows() == 0
302 }
303}
304
305enum LazyMutableVectorBuilder {
306 Type(ConcreteDataType),
307 Builder(Box<dyn MutableVector>),
308}
309
310impl LazyMutableVectorBuilder {
311 fn new(ty: ConcreteDataType) -> Self {
312 Self::Type(ty)
313 }
314
315 fn get_or_create_builder(&mut self, init_capacity: usize) -> &mut Box<dyn MutableVector> {
316 match self {
317 LazyMutableVectorBuilder::Type(ty) => {
318 let builder = ty.create_mutable_vector(init_capacity);
319 *self = LazyMutableVectorBuilder::Builder(builder);
320 self.get_or_create_builder(init_capacity)
321 }
322 LazyMutableVectorBuilder::Builder(builder) => builder,
323 }
324 }
325}
326
327fn drain_data_buffer_to_record_batches(
331 schema: SchemaRef,
332 buffer: &mut DataBuffer,
333 pk_weights: Option<&[u16]>,
334 dedup: bool,
335 replace_pk_index: bool,
336) -> Result<RecordBatch> {
337 let num_rows = buffer.ts_builder.len();
338
339 let (pk_index_v, ts_v, sequence_v, op_type_v) = (
340 buffer.pk_index_builder.finish(),
341 buffer.ts_builder.to_vector(),
342 buffer.sequence_builder.finish(),
343 buffer.op_type_builder.finish(),
344 );
345
346 let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
347 pk_weights,
348 pk_index_v,
349 ts_v,
350 sequence_v,
351 op_type_v,
352 replace_pk_index,
353 dedup,
354 buffer.field_builders.len() + 4,
355 )?;
356
357 for b in buffer.field_builders.iter_mut() {
358 let array = match b {
359 LazyMutableVectorBuilder::Type(ty) => {
360 let mut single_null = ty.create_mutable_vector(num_rows);
361 single_null.push_nulls(num_rows);
362 single_null.to_vector().to_arrow_array()
363 }
364 LazyMutableVectorBuilder::Builder(builder) => builder.to_vector().to_arrow_array(),
365 };
366 columns.push(
367 arrow::compute::take(&array, &indices_to_take, None)
368 .context(error::ComputeArrowSnafu)?,
369 );
370 }
371
372 RecordBatch::try_new(schema, columns).context(error::NewRecordBatchSnafu)
373}
374
375#[allow(clippy::too_many_arguments)]
376fn build_row_sort_indices_and_columns(
377 pk_weights: Option<&[u16]>,
378 pk_index: UInt16Vector,
379 ts: VectorRef,
380 sequence: UInt64Vector,
381 op_type: UInt8Vector,
382 replace_pk_index: bool,
383 dedup: bool,
384 column_num: usize,
385) -> Result<(UInt32Array, Vec<ArrayRef>)> {
386 let mut rows = build_rows_to_sort(pk_weights, &pk_index, &ts, &sequence);
387
388 let pk_array = if replace_pk_index {
389 Arc::new(UInt16Array::from_iter_values(
391 rows.iter().map(|(_, key)| key.pk_weight),
392 )) as Arc<_>
393 } else {
394 pk_index.to_arrow_array()
395 };
396
397 rows.sort_unstable_by(|l, r| l.1.cmp(&r.1));
399 if dedup {
400 rows.dedup_by(|l, r| l.1.pk_weight == r.1.pk_weight && l.1.timestamp == r.1.timestamp);
401 }
402
403 let indices_to_take = UInt32Array::from_iter_values(rows.iter().map(|(idx, _)| *idx as u32));
404
405 let mut columns = Vec::with_capacity(column_num);
406
407 columns.push(
408 arrow::compute::take(&pk_array, &indices_to_take, None)
409 .context(error::ComputeArrowSnafu)?,
410 );
411
412 columns.push(
413 arrow::compute::take(&ts.to_arrow_array(), &indices_to_take, None)
414 .context(error::ComputeArrowSnafu)?,
415 );
416
417 columns.push(
418 arrow::compute::take(&sequence.as_arrow(), &indices_to_take, None)
419 .context(error::ComputeArrowSnafu)?,
420 );
421
422 columns.push(
423 arrow::compute::take(&op_type.as_arrow(), &indices_to_take, None)
424 .context(error::ComputeArrowSnafu)?,
425 );
426
427 Ok((indices_to_take, columns))
428}
429
430pub(crate) fn timestamp_array_to_i64_slice(arr: &ArrayRef) -> &[i64] {
431 use datatypes::arrow::array::{
432 TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
433 TimestampSecondArray,
434 };
435 use datatypes::arrow::datatypes::{DataType, TimeUnit};
436
437 match arr.data_type() {
438 DataType::Timestamp(t, _) => match t {
439 TimeUnit::Second => arr
440 .as_any()
441 .downcast_ref::<TimestampSecondArray>()
442 .unwrap()
443 .values(),
444 TimeUnit::Millisecond => arr
445 .as_any()
446 .downcast_ref::<TimestampMillisecondArray>()
447 .unwrap()
448 .values(),
449 TimeUnit::Microsecond => arr
450 .as_any()
451 .downcast_ref::<TimestampMicrosecondArray>()
452 .unwrap()
453 .values(),
454 TimeUnit::Nanosecond => arr
455 .as_any()
456 .downcast_ref::<TimestampNanosecondArray>()
457 .unwrap()
458 .values(),
459 },
460 _ => unreachable!(),
461 }
462}
463
464enum LazyFieldVector {
465 Type(ConcreteDataType),
466 Vector(VectorRef),
467}
468
469pub(crate) struct DataBufferReaderBuilder {
470 schema: SchemaRef,
471 pk_index: UInt16Vector,
472 timestamp: VectorRef,
473 sequence: UInt64Vector,
474 op_type: UInt8Vector,
475 fields: Vec<LazyFieldVector>,
476 dedup: bool,
477}
478
479impl DataBufferReaderBuilder {
480 fn build_record_batch(self, pk_weights: Option<&[u16]>) -> Result<RecordBatch> {
481 let num_rows = self.timestamp.len();
482 let (indices_to_take, mut columns) = build_row_sort_indices_and_columns(
483 pk_weights,
484 self.pk_index,
485 self.timestamp,
486 self.sequence,
487 self.op_type,
488 false,
492 self.dedup,
493 self.fields.len() + 4,
494 )?;
495
496 for b in self.fields.iter() {
497 let array = match b {
498 LazyFieldVector::Type(ty) => {
499 let mut single_null = ty.create_mutable_vector(num_rows);
500 single_null.push_nulls(num_rows);
501 single_null.to_vector().to_arrow_array()
502 }
503 LazyFieldVector::Vector(vector) => vector.to_arrow_array(),
504 };
505 columns.push(
506 arrow::compute::take(&array, &indices_to_take, None)
507 .context(error::ComputeArrowSnafu)?,
508 );
509 }
510 RecordBatch::try_new(self.schema, columns).context(error::NewRecordBatchSnafu)
511 }
512
513 pub fn build(self, pk_weights: Option<&[u16]>) -> Result<DataBufferReader> {
514 self.build_record_batch(pk_weights)
515 .and_then(DataBufferReader::new)
516 }
517}
518
519#[derive(Debug)]
520pub(crate) struct DataBufferReader {
521 batch: RecordBatch,
522 offset: usize,
523 current_range: Option<DataBatchRange>,
524 elapsed_time: Duration,
525}
526
527impl Drop for DataBufferReader {
528 fn drop(&mut self) {
529 PARTITION_TREE_READ_STAGE_ELAPSED
530 .with_label_values(&["read_data_buffer"])
531 .observe(self.elapsed_time.as_secs_f64())
532 }
533}
534
535impl DataBufferReader {
536 pub(crate) fn new(batch: RecordBatch) -> Result<Self> {
537 let mut reader = Self {
538 batch,
539 offset: 0,
540 current_range: None,
541 elapsed_time: Duration::default(),
542 };
543 reader.next()?; Ok(reader)
545 }
546
547 pub(crate) fn is_valid(&self) -> bool {
548 self.current_range.is_some()
549 }
550
551 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
555 let range = self.current_range.unwrap();
556 DataBatch {
557 rb: &self.batch,
558 range,
559 }
560 }
561
562 pub(crate) fn next(&mut self) -> Result<()> {
564 if self.offset >= self.batch.num_rows() {
565 self.current_range = None;
566 return Ok(());
567 }
568 let start = Instant::now();
569 let pk_index_array = pk_index_array(&self.batch);
570 if let Some((next_pk, range)) = search_next_pk_range(pk_index_array, self.offset) {
571 self.offset = range.end;
572 self.current_range = Some(DataBatchRange {
573 pk_index: next_pk,
574 start: range.start,
575 end: range.end,
576 });
577 } else {
578 self.current_range = None;
579 }
580 self.elapsed_time += start.elapsed();
581 Ok(())
582 }
583}
584
585fn pk_index_array(batch: &RecordBatch) -> &UInt16Array {
589 batch
590 .column(0)
591 .as_any()
592 .downcast_ref::<UInt16Array>()
593 .unwrap()
594}
595
596fn search_next_pk_range(array: &UInt16Array, start: usize) -> Option<(PkIndex, Range<usize>)> {
598 let num_rows = array.len();
599 if start >= num_rows {
600 return None;
601 }
602
603 let values = array.values();
604 let next_pk = values[start];
605
606 for idx in start..num_rows {
607 if values[idx] != next_pk {
608 return Some((next_pk, start..idx));
609 }
610 }
611 Some((next_pk, start..num_rows))
612}
613
614#[derive(Eq, PartialEq)]
615struct InnerKey {
616 pk_weight: u16,
617 timestamp: i64,
618 sequence: u64,
619}
620
621impl PartialOrd for InnerKey {
622 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
623 Some(self.cmp(other))
624 }
625}
626
627impl Ord for InnerKey {
628 fn cmp(&self, other: &Self) -> Ordering {
629 (self.pk_weight, self.timestamp, Reverse(self.sequence)).cmp(&(
630 other.pk_weight,
631 other.timestamp,
632 Reverse(other.sequence),
633 ))
634 }
635}
636
637fn build_rows_to_sort(
638 pk_weights: Option<&[u16]>,
639 pk_index: &UInt16Vector,
640 ts: &VectorRef,
641 sequence: &UInt64Vector,
642) -> Vec<(usize, InnerKey)> {
643 let ts_values = match ts.data_type() {
644 ConcreteDataType::Timestamp(t) => match t {
645 TimestampType::Second(_) => ts
646 .as_any()
647 .downcast_ref::<TimestampSecondVector>()
648 .unwrap()
649 .as_arrow()
650 .values(),
651 TimestampType::Millisecond(_) => ts
652 .as_any()
653 .downcast_ref::<TimestampMillisecondVector>()
654 .unwrap()
655 .as_arrow()
656 .values(),
657 TimestampType::Microsecond(_) => ts
658 .as_any()
659 .downcast_ref::<TimestampMicrosecondVector>()
660 .unwrap()
661 .as_arrow()
662 .values(),
663 TimestampType::Nanosecond(_) => ts
664 .as_any()
665 .downcast_ref::<TimestampNanosecondVector>()
666 .unwrap()
667 .as_arrow()
668 .values(),
669 },
670 other => unreachable!("Unexpected type {:?}", other),
671 };
672 let pk_index_values = pk_index.as_arrow().values();
673 let sequence_values = sequence.as_arrow().values();
674 debug_assert_eq!(ts_values.len(), pk_index_values.len());
675 debug_assert_eq!(ts_values.len(), sequence_values.len());
676
677 ts_values
678 .iter()
679 .zip(pk_index_values.iter())
680 .zip(sequence_values.iter())
681 .enumerate()
682 .map(|(idx, ((timestamp, pk_index), sequence))| {
683 let pk_weight = if let Some(weights) = pk_weights {
684 weights[*pk_index as usize] } else {
686 *pk_index };
688 (
689 idx,
690 InnerKey {
691 timestamp: *timestamp,
692 pk_weight,
693 sequence: *sequence,
694 },
695 )
696 })
697 .collect()
698}
699
700fn memtable_schema_to_encoded_schema(schema: &RegionMetadataRef) -> SchemaRef {
701 use datatypes::arrow::datatypes::DataType;
702 let ColumnSchema {
703 name: ts_name,
704 data_type: ts_type,
705 ..
706 } = &schema.time_index_column().column_schema;
707
708 let mut fields = vec![
709 Field::new(PK_INDEX_COLUMN_NAME, DataType::UInt16, false),
710 Field::new(ts_name, ts_type.as_arrow_type(), false),
711 Field::new(SEQUENCE_COLUMN_NAME, DataType::UInt64, false),
712 Field::new(OP_TYPE_COLUMN_NAME, DataType::UInt8, false),
713 ];
714
715 fields.extend(schema.field_columns().map(|c| {
716 Field::new(
717 &c.column_schema.name,
718 c.column_schema.data_type.as_arrow_type(),
719 c.column_schema.is_nullable(),
720 )
721 }));
722
723 Arc::new(Schema::new(fields))
724}
725
726struct DataPartEncoder<'a> {
727 schema: SchemaRef,
728 pk_weights: Option<&'a [u16]>,
729 row_group_size: Option<usize>,
730 timestamp_column_name: String,
731 replace_pk_index: bool,
732 dedup: bool,
733}
734
735impl<'a> DataPartEncoder<'a> {
736 pub fn new(
737 metadata: &RegionMetadataRef,
738 pk_weights: Option<&'a [u16]>,
739 row_group_size: Option<usize>,
740 timestamp_column_name: String,
741 replace_pk_index: bool,
742 dedup: bool,
743 ) -> DataPartEncoder<'a> {
744 let schema = memtable_schema_to_encoded_schema(metadata);
745 Self {
746 schema,
747 pk_weights,
748 row_group_size,
749 timestamp_column_name,
750 replace_pk_index,
751 dedup,
752 }
753 }
754
755 fn writer_props(self) -> WriterProperties {
757 let mut builder = WriterProperties::builder();
758 if let Some(row_group_size) = self.row_group_size {
759 builder = builder.set_max_row_group_size(row_group_size)
760 }
761
762 let ts_col = ColumnPath::new(vec![self.timestamp_column_name]);
763 let pk_index_col = ColumnPath::new(vec![PK_INDEX_COLUMN_NAME.to_string()]);
764 let sequence_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
765 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
766
767 builder = builder
768 .set_compression(Compression::ZSTD(ZstdLevel::default()))
769 .set_statistics_enabled(EnabledStatistics::None);
770 builder = builder
771 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
772 .set_column_dictionary_enabled(ts_col, false)
773 .set_column_encoding(pk_index_col.clone(), Encoding::DELTA_BINARY_PACKED)
774 .set_column_dictionary_enabled(pk_index_col, true)
775 .set_column_encoding(sequence_col.clone(), Encoding::DELTA_BINARY_PACKED)
776 .set_column_dictionary_enabled(sequence_col, false)
777 .set_column_encoding(op_type_col.clone(), Encoding::DELTA_BINARY_PACKED)
778 .set_column_dictionary_enabled(op_type_col, true)
779 .set_column_index_truncate_length(None)
780 .set_statistics_truncate_length(None);
781 builder.build()
782 }
783
784 pub fn write(self, source: &mut DataBuffer) -> Result<DataPart> {
785 let mut bytes = Vec::with_capacity(1024);
786
787 let rb = {
788 let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
789 .with_label_values(&["drain_data_buffer_to_batch"])
790 .start_timer();
791 drain_data_buffer_to_record_batches(
792 self.schema.clone(),
793 source,
794 self.pk_weights,
795 self.dedup,
796 self.replace_pk_index,
797 )?
798 };
799
800 {
801 let _timer = PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED
802 .with_label_values(&["encode"])
803 .start_timer();
804 let mut writer =
805 ArrowWriter::try_new(&mut bytes, self.schema.clone(), Some(self.writer_props()))
806 .context(error::EncodeMemtableSnafu)?;
807 writer.write(&rb).context(error::EncodeMemtableSnafu)?;
808 let _metadata = writer.close().context(error::EncodeMemtableSnafu)?;
809 }
810 Ok(DataPart::Parquet(ParquetPart {
811 data: Bytes::from(bytes),
812 }))
813 }
814}
815
816pub enum DataPart {
818 Parquet(ParquetPart),
819}
820
821impl DataPart {
822 pub fn read(&self) -> Result<DataPartReader> {
824 match self {
825 DataPart::Parquet(data_bytes) => {
828 DataPartReader::new(data_bytes.data.clone(), Some(DEFAULT_READ_BATCH_SIZE))
829 }
830 }
831 }
832
833 fn is_empty(&self) -> bool {
834 match self {
835 DataPart::Parquet(p) => p.data.is_empty(),
836 }
837 }
838}
839
840pub struct DataPartReader {
841 inner: ParquetRecordBatchReader,
842 current_batch: Option<RecordBatch>,
843 current_range: Option<DataBatchRange>,
844 elapsed: Duration,
845}
846
847impl Drop for DataPartReader {
848 fn drop(&mut self) {
849 PARTITION_TREE_READ_STAGE_ELAPSED
850 .with_label_values(&["read_data_part"])
851 .observe(self.elapsed.as_secs_f64());
852 }
853}
854
855impl Debug for DataPartReader {
856 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
857 f.debug_struct("DataPartReader")
858 .field("current_range", &self.current_range)
859 .finish()
860 }
861}
862
863impl DataPartReader {
864 pub fn new(data: Bytes, batch_size: Option<usize>) -> Result<Self> {
865 let mut builder =
866 ParquetRecordBatchReaderBuilder::try_new(data).context(error::ReadDataPartSnafu)?;
867 if let Some(batch_size) = batch_size {
868 builder = builder.with_batch_size(batch_size);
869 }
870 let parquet_reader = builder.build().context(error::ReadDataPartSnafu)?;
871 let mut reader = Self {
872 inner: parquet_reader,
873 current_batch: None,
874 current_range: None,
875 elapsed: Default::default(),
876 };
877 reader.next()?;
878 Ok(reader)
879 }
880
881 pub(crate) fn is_valid(&self) -> bool {
883 self.current_range.is_some()
884 }
885
886 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
890 let range = self.current_range.unwrap();
891 DataBatch {
892 rb: self.current_batch.as_ref().unwrap(),
893 range,
894 }
895 }
896
897 pub(crate) fn next(&mut self) -> Result<()> {
898 let start = Instant::now();
899 if let Some((next_pk, range)) = self.search_next_pk_range() {
900 self.current_range = Some(DataBatchRange {
902 pk_index: next_pk,
903 start: range.start,
904 end: range.end,
905 });
906 } else {
907 if let Some(rb) = self.inner.next() {
909 let rb = rb.context(error::ComputeArrowSnafu)?;
910 self.current_batch = Some(rb);
911 self.current_range = None;
912 return self.next();
913 } else {
914 self.current_batch = None;
916 self.current_range = None;
917 }
918 }
919 self.elapsed += start.elapsed();
920 Ok(())
921 }
922
923 fn search_next_pk_range(&self) -> Option<(PkIndex, Range<usize>)> {
925 self.current_batch.as_ref().and_then(|b| {
926 let pk_array = pk_index_array(b);
928 let start = self
929 .current_range
930 .as_ref()
931 .map(|range| range.end)
932 .unwrap_or(0);
933 search_next_pk_range(pk_array, start)
934 })
935 }
936}
937
938pub struct ParquetPart {
940 data: Bytes,
941}
942
943pub struct DataParts {
945 active: DataBuffer,
947 frozen: Vec<DataPart>,
949}
950
951impl DataParts {
952 pub(crate) fn new(metadata: RegionMetadataRef, capacity: usize, dedup: bool) -> Self {
953 Self {
954 active: DataBuffer::with_capacity(metadata, capacity, dedup),
955 frozen: Vec::new(),
956 }
957 }
958
959 pub(crate) fn with_frozen(mut self, frozen: Vec<DataPart>) -> Self {
960 self.frozen = frozen;
961 self
962 }
963
964 pub fn write_row(&mut self, pk_index: PkIndex, kv: &KeyValue) {
966 self.active.write_row(pk_index, kv)
967 }
968
969 pub fn num_active_rows(&self) -> usize {
971 self.active.num_rows()
972 }
973
974 pub fn freeze(&mut self) -> Result<()> {
976 let part = self.active.freeze(None, false)?;
977 self.frozen.push(part);
978 Ok(())
979 }
980
981 pub fn read(&self) -> Result<DataPartsReaderBuilder> {
985 let _timer = PARTITION_TREE_READ_STAGE_ELAPSED
986 .with_label_values(&["build_data_parts_reader"])
987 .start_timer();
988
989 let buffer = self.active.read()?;
990 let mut parts = Vec::with_capacity(self.frozen.len());
991 for p in &self.frozen {
992 parts.push(p.read()?);
993 }
994 Ok(DataPartsReaderBuilder { buffer, parts })
995 }
996
997 pub(crate) fn is_empty(&self) -> bool {
998 self.active.is_empty() && self.frozen.iter().all(|part| part.is_empty())
999 }
1000
1001 #[cfg(test)]
1002 pub(crate) fn frozen_len(&self) -> usize {
1003 self.frozen.len()
1004 }
1005}
1006
1007pub struct DataPartsReaderBuilder {
1008 buffer: DataBufferReaderBuilder,
1009 parts: Vec<DataPartReader>,
1010}
1011
1012impl DataPartsReaderBuilder {
1013 pub(crate) fn build(self) -> Result<DataPartsReader> {
1014 let mut nodes = Vec::with_capacity(self.parts.len() + 1);
1015 nodes.push(DataNode::new(DataSource::Buffer(
1016 self.buffer.build(None)?,
1019 )));
1020 for p in self.parts {
1021 nodes.push(DataNode::new(DataSource::Part(p)));
1022 }
1023 let num_parts = nodes.len();
1024 let merger = Merger::try_new(nodes)?;
1025 Ok(DataPartsReader {
1026 merger,
1027 num_parts,
1028 elapsed: Default::default(),
1029 })
1030 }
1031}
1032
1033pub struct DataPartsReader {
1035 merger: Merger<DataNode>,
1036 num_parts: usize,
1037 elapsed: Duration,
1038}
1039
1040impl Drop for DataPartsReader {
1041 fn drop(&mut self) {
1042 PARTITION_TREE_READ_STAGE_ELAPSED
1043 .with_label_values(&["read_data_parts"])
1044 .observe(self.elapsed.as_secs_f64())
1045 }
1046}
1047
1048impl DataPartsReader {
1049 pub(crate) fn current_data_batch(&self) -> DataBatch<'_> {
1050 let batch = self.merger.current_node().current_data_batch();
1051 batch.slice(0, self.merger.current_rows())
1052 }
1053
1054 pub(crate) fn next(&mut self) -> Result<()> {
1055 let start = Instant::now();
1056 let result = self.merger.next();
1057 self.elapsed += start.elapsed();
1058 result
1059 }
1060
1061 pub(crate) fn is_valid(&self) -> bool {
1062 self.merger.is_valid()
1063 }
1064
1065 pub(crate) fn num_parts(&self) -> usize {
1066 self.num_parts
1067 }
1068}
1069
1070#[cfg(test)]
1071mod tests {
1072 use datafusion::arrow::array::Float64Array;
1073 use datatypes::arrow::array::UInt16Array;
1074 use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
1075 use parquet::data_type::AsBytes;
1076
1077 use super::*;
1078 use crate::test_util::memtable_util::{
1079 extract_data_batch, metadata_for_test, write_rows_to_buffer,
1080 };
1081
1082 #[test]
1083 fn test_lazy_mutable_vector_builder() {
1084 let mut builder = LazyMutableVectorBuilder::new(ConcreteDataType::boolean_datatype());
1085 match builder {
1086 LazyMutableVectorBuilder::Type(ref t) => {
1087 assert_eq!(&ConcreteDataType::boolean_datatype(), t);
1088 }
1089 LazyMutableVectorBuilder::Builder(_) => {
1090 unreachable!()
1091 }
1092 }
1093 builder.get_or_create_builder(1);
1094 match builder {
1095 LazyMutableVectorBuilder::Type(_) => {
1096 unreachable!()
1097 }
1098 LazyMutableVectorBuilder::Builder(_) => {}
1099 }
1100 }
1101
1102 fn check_data_buffer_dedup(dedup: bool) {
1103 let metadata = metadata_for_test();
1104 let mut buffer = DataBuffer::with_capacity(metadata.clone(), 10, dedup);
1105 write_rows_to_buffer(
1106 &mut buffer,
1107 &metadata,
1108 0,
1109 vec![2, 3],
1110 vec![Some(1.0), Some(2.0)],
1111 0,
1112 );
1113 write_rows_to_buffer(
1114 &mut buffer,
1115 &metadata,
1116 0,
1117 vec![1, 2],
1118 vec![Some(1.1), Some(2.1)],
1119 2,
1120 );
1121
1122 let mut reader = buffer.read().unwrap().build(Some(&[0])).unwrap();
1123 let mut res = vec![];
1124 while reader.is_valid() {
1125 let batch = reader.current_data_batch();
1126 res.push(extract_data_batch(&batch));
1127 reader.next().unwrap();
1128 }
1129 if dedup {
1130 assert_eq!(vec![(0, vec![(1, 2), (2, 3), (3, 1)])], res);
1131 } else {
1132 assert_eq!(vec![(0, vec![(1, 2), (2, 3), (2, 0), (3, 1)])], res);
1133 }
1134 }
1135
1136 #[test]
1137 fn test_data_buffer_dedup() {
1138 check_data_buffer_dedup(true);
1139 check_data_buffer_dedup(false);
1140 }
1141
1142 fn check_data_buffer_freeze(
1143 pk_weights: Option<&[u16]>,
1144 replace_pk_weights: bool,
1145 expected: &[(u16, Vec<(i64, u64)>)],
1146 ) {
1147 let meta = metadata_for_test();
1148 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1149
1150 write_rows_to_buffer(
1152 &mut buffer,
1153 &meta,
1154 0,
1155 vec![0, 1, 2],
1156 vec![Some(1.0), None, Some(3.0)],
1157 0,
1158 );
1159 write_rows_to_buffer(&mut buffer, &meta, 1, vec![1], vec![Some(2.0)], 3);
1160
1161 let mut res = Vec::with_capacity(3);
1162 let mut reader = buffer
1163 .freeze(pk_weights, replace_pk_weights)
1164 .unwrap()
1165 .read()
1166 .unwrap();
1167 while reader.is_valid() {
1168 let batch = reader.current_data_batch();
1169 res.push(extract_data_batch(&batch));
1170 reader.next().unwrap();
1171 }
1172 assert_eq!(expected, res);
1173 }
1174
1175 #[test]
1176 fn test_data_buffer_freeze() {
1177 check_data_buffer_freeze(
1178 None,
1179 false,
1180 &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1181 );
1182
1183 check_data_buffer_freeze(
1184 Some(&[1, 2]),
1185 false,
1186 &[(0, vec![(0, 0), (1, 1), (2, 2)]), (1, vec![(1, 3)])],
1187 );
1188
1189 check_data_buffer_freeze(
1190 Some(&[3, 2]),
1191 true,
1192 &[(2, vec![(1, 3)]), (3, vec![(0, 0), (1, 1), (2, 2)])],
1193 );
1194
1195 check_data_buffer_freeze(
1196 Some(&[3, 2]),
1197 false,
1198 &[(1, vec![(1, 3)]), (0, vec![(0, 0), (1, 1), (2, 2)])],
1199 );
1200 }
1201
1202 #[test]
1203 fn test_encode_data_buffer() {
1204 let meta = metadata_for_test();
1205 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1206
1207 write_rows_to_buffer(
1209 &mut buffer,
1210 &meta,
1211 2,
1212 vec![0, 1, 2],
1213 vec![Some(1.0), None, Some(3.0)],
1214 2,
1215 );
1216
1217 assert_eq!(3, buffer.num_rows());
1218
1219 write_rows_to_buffer(&mut buffer, &meta, 2, vec![1], vec![Some(2.0)], 3);
1220
1221 assert_eq!(4, buffer.num_rows());
1222
1223 let encoder = DataPartEncoder::new(
1224 &meta,
1225 Some(&[0, 1, 2]),
1226 None,
1227 meta.time_index_column().column_schema.name.clone(),
1228 true,
1229 true,
1230 );
1231 let encoded = match encoder.write(&mut buffer).unwrap() {
1232 DataPart::Parquet(data) => data.data,
1233 };
1234
1235 let s = String::from_utf8_lossy(encoded.as_bytes());
1236 assert!(s.starts_with("PAR1"));
1237 assert!(s.ends_with("PAR1"));
1238
1239 let builder = ParquetRecordBatchReaderBuilder::try_new(encoded).unwrap();
1240 let mut reader = builder.build().unwrap();
1241 let batch = reader.next().unwrap().unwrap();
1242 assert_eq!(3, batch.num_rows());
1243 }
1244
1245 fn check_buffer_values_equal(reader: &mut DataBufferReader, expected_values: &[Vec<f64>]) {
1246 let mut output = Vec::with_capacity(expected_values.len());
1247 while reader.is_valid() {
1248 let batch = reader.current_data_batch().slice_record_batch();
1249 let values = batch
1250 .column_by_name("v1")
1251 .unwrap()
1252 .as_any()
1253 .downcast_ref::<Float64Array>()
1254 .unwrap()
1255 .iter()
1256 .map(|v| v.unwrap())
1257 .collect::<Vec<_>>();
1258 output.push(values);
1259 reader.next().unwrap();
1260 }
1261 assert_eq!(expected_values, output);
1262 }
1263
1264 #[test]
1265 fn test_search_next_pk_range() {
1266 let a = UInt16Array::from_iter_values([1, 1, 3, 3, 4, 6]);
1267 assert_eq!((1, 0..2), search_next_pk_range(&a, 0).unwrap());
1268 assert_eq!((3, 2..4), search_next_pk_range(&a, 2).unwrap());
1269 assert_eq!((4, 4..5), search_next_pk_range(&a, 4).unwrap());
1270 assert_eq!((6, 5..6), search_next_pk_range(&a, 5).unwrap());
1271
1272 assert_eq!(None, search_next_pk_range(&a, 6));
1273 }
1274
1275 fn check_iter_data_buffer(pk_weights: Option<&[u16]>, expected: &[Vec<f64>]) {
1276 let meta = metadata_for_test();
1277 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1278
1279 write_rows_to_buffer(
1280 &mut buffer,
1281 &meta,
1282 3,
1283 vec![1, 2, 3],
1284 vec![Some(1.1), Some(2.1), Some(3.1)],
1285 3,
1286 );
1287
1288 write_rows_to_buffer(
1289 &mut buffer,
1290 &meta,
1291 2,
1292 vec![0, 1, 2],
1293 vec![Some(1.0), Some(2.0), Some(3.0)],
1294 2,
1295 );
1296
1297 let mut iter = buffer.read().unwrap().build(pk_weights).unwrap();
1298 check_buffer_values_equal(&mut iter, expected);
1299 }
1300
1301 #[test]
1302 fn test_iter_data_buffer() {
1303 check_iter_data_buffer(None, &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]]);
1304 check_iter_data_buffer(
1305 Some(&[0, 1, 2, 3]),
1306 &[vec![1.0, 2.0, 3.0], vec![1.1, 2.1, 3.1]],
1307 );
1308 check_iter_data_buffer(
1309 Some(&[3, 2, 1, 0]),
1310 &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0]],
1311 );
1312 }
1313
1314 #[test]
1315 fn test_iter_empty_data_buffer() {
1316 let meta = metadata_for_test();
1317 let buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1318 let mut iter = buffer.read().unwrap().build(Some(&[0, 1, 3, 2])).unwrap();
1319 check_buffer_values_equal(&mut iter, &[]);
1320 }
1321
1322 fn check_part_values_equal(iter: &mut DataPartReader, expected_values: &[Vec<f64>]) {
1323 let mut output = Vec::with_capacity(expected_values.len());
1324 while iter.is_valid() {
1325 let batch = iter.current_data_batch().slice_record_batch();
1326 let values = batch
1327 .column_by_name("v1")
1328 .unwrap()
1329 .as_any()
1330 .downcast_ref::<Float64Array>()
1331 .unwrap()
1332 .iter()
1333 .map(|v| v.unwrap())
1334 .collect::<Vec<_>>();
1335 output.push(values);
1336 iter.next().unwrap();
1337 }
1338 assert_eq!(expected_values, output);
1339 }
1340
1341 fn check_iter_data_part(weights: &[u16], expected_values: &[Vec<f64>]) {
1342 let meta = metadata_for_test();
1343 let mut buffer = DataBuffer::with_capacity(meta.clone(), 10, true);
1344
1345 write_rows_to_buffer(
1346 &mut buffer,
1347 &meta,
1348 2,
1349 vec![0, 1, 2],
1350 vec![Some(1.0), Some(2.0), Some(3.0)],
1351 2,
1352 );
1353
1354 write_rows_to_buffer(
1355 &mut buffer,
1356 &meta,
1357 3,
1358 vec![1, 2, 3],
1359 vec![Some(1.1), Some(2.1), Some(3.1)],
1360 3,
1361 );
1362
1363 write_rows_to_buffer(
1364 &mut buffer,
1365 &meta,
1366 2,
1367 vec![2, 3],
1368 vec![Some(2.2), Some(2.3)],
1369 4,
1370 );
1371
1372 let encoder = DataPartEncoder::new(
1373 &meta,
1374 Some(weights),
1375 Some(4),
1376 meta.time_index_column().column_schema.name.clone(),
1377 true,
1378 true,
1379 );
1380 let encoded = encoder.write(&mut buffer).unwrap();
1381
1382 let mut iter = encoded.read().unwrap();
1383 check_part_values_equal(&mut iter, expected_values);
1384 }
1385
1386 #[test]
1387 fn test_iter_data_part() {
1388 check_iter_data_part(
1389 &[0, 1, 2, 3],
1390 &[vec![1.0, 2.0, 3.0, 2.3], vec![1.1, 2.1, 3.1]],
1391 );
1392
1393 check_iter_data_part(
1394 &[3, 2, 1, 0],
1395 &[vec![1.1, 2.1, 3.1], vec![1.0, 2.0, 3.0, 2.3]],
1396 );
1397 }
1398}