1use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33 Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions};
36use datatypes::arrow::datatypes::{
37 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_structured_json_field;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::types::JsonType;
43use datatypes::value::ValueRef;
44use datatypes::vectors::Helper;
45use datatypes::vectors::json::array::JsonArray;
46use mito_codec::key_values::{KeyValue, KeyValues};
47use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
48use parquet::arrow::ArrowWriter;
49use parquet::basic::{Compression, ZstdLevel};
50use parquet::file::metadata::ParquetMetaData;
51use parquet::file::properties::WriterProperties;
52use smallvec::SmallVec;
53use snafu::{OptionExt, ResultExt};
54use store_api::codec::PrimaryKeyEncoding;
55use store_api::metadata::{RegionMetadata, RegionMetadataRef};
56use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
57use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
58
59use crate::error::{
60 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
61 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
62 InvalidRequestSnafu, NewRecordBatchSnafu, Result,
63};
64use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
65use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
66use crate::memtable::time_series::{ValueBuilder, Values};
67use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
68use crate::sst::SeriesEstimator;
69use crate::sst::index::IndexOutput;
70use crate::sst::parquet::flat_format::primary_key_column_index;
71use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
72use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
73
74const INIT_DICT_VALUE_CAPACITY: usize = 8;
75
76#[derive(Clone)]
78pub struct BulkPart {
79 pub batch: RecordBatch,
80 pub max_timestamp: i64,
81 pub min_timestamp: i64,
82 pub sequence: u64,
83 pub timestamp_index: usize,
84 pub raw_data: Option<ArrowIpc>,
85}
86
87impl TryFrom<BulkWalEntry> for BulkPart {
88 type Error = error::Error;
89
90 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
91 match value.body.expect("Entry payload should be present") {
92 Body::ArrowIpc(ipc) => {
93 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
94 .context(error::ConvertBulkWalEntrySnafu)?;
95 let batch = decoder
96 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
97 .context(error::ConvertBulkWalEntrySnafu)?;
98 Ok(Self {
99 batch,
100 max_timestamp: value.max_ts,
101 min_timestamp: value.min_ts,
102 sequence: value.sequence,
103 timestamp_index: value.timestamp_index as usize,
104 raw_data: Some(ipc),
105 })
106 }
107 }
108 }
109}
110
111impl TryFrom<&BulkPart> for BulkWalEntry {
112 type Error = error::Error;
113
114 fn try_from(value: &BulkPart) -> Result<Self> {
115 if let Some(ipc) = &value.raw_data {
116 Ok(BulkWalEntry {
117 sequence: value.sequence,
118 max_ts: value.max_timestamp,
119 min_ts: value.min_timestamp,
120 timestamp_index: value.timestamp_index as u32,
121 body: Some(Body::ArrowIpc(ipc.clone())),
122 })
123 } else {
124 let mut encoder = FlightEncoder::default();
125 let schema_bytes = encoder
126 .encode_schema(value.batch.schema().as_ref())
127 .data_header;
128 let [rb_data] = encoder
129 .encode(FlightMessage::RecordBatch(value.batch.clone()))
130 .try_into()
131 .map_err(|_| {
132 error::UnsupportedOperationSnafu {
133 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
134 }
135 .build()
136 })?;
137 Ok(BulkWalEntry {
138 sequence: value.sequence,
139 max_ts: value.max_timestamp,
140 min_ts: value.min_timestamp,
141 timestamp_index: value.timestamp_index as u32,
142 body: Some(Body::ArrowIpc(ArrowIpc {
143 schema: schema_bytes,
144 data_header: rb_data.data_header,
145 payload: rb_data.data_body,
146 })),
147 })
148 }
149 }
150}
151
152impl BulkPart {
153 pub(crate) fn estimated_size(&self) -> usize {
154 record_batch_estimated_size(&self.batch)
155 }
156
157 pub fn estimated_series_count(&self) -> usize {
160 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
161 let pk_column = self.batch.column(pk_column_idx);
162 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
163 dict_array.values().len()
164 } else {
165 0
166 }
167 }
168
169 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
171 let ts_type = region_metadata.time_index_type();
172 let min_ts = ts_type.create_timestamp(self.min_timestamp);
173 let max_ts = ts_type.create_timestamp(self.max_timestamp);
174
175 MemtableStats {
176 estimated_bytes: self.estimated_size(),
177 time_range: Some((min_ts, max_ts)),
178 num_rows: self.num_rows(),
179 num_ranges: 1,
180 max_sequence: self.sequence,
181 series_count: self.estimated_series_count(),
182 }
183 }
184
185 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
195 let batch_schema = self.batch.schema();
197 let batch_columns: HashSet<_> = batch_schema
198 .fields()
199 .iter()
200 .map(|f| f.name().as_str())
201 .collect();
202
203 let mut columns_to_fill = Vec::new();
205 for column_meta in ®ion_metadata.column_metadatas {
206 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
209 columns_to_fill.push(column_meta);
210 }
211 }
212
213 if columns_to_fill.is_empty() {
214 return Ok(());
215 }
216
217 let num_rows = self.batch.num_rows();
218
219 let mut new_columns = Vec::new();
220 let mut new_fields = Vec::new();
221
222 new_fields.extend(batch_schema.fields().iter().cloned());
224 new_columns.extend_from_slice(self.batch.columns());
225
226 let region_id = region_metadata.region_id;
227 for column_meta in columns_to_fill {
229 let default_vector = column_meta
230 .column_schema
231 .create_default_vector(num_rows)
232 .context(CreateDefaultSnafu {
233 region_id,
234 column: &column_meta.column_schema.name,
235 })?
236 .with_context(|| InvalidRequestSnafu {
237 region_id,
238 reason: format!(
239 "column {} does not have default value",
240 column_meta.column_schema.name
241 ),
242 })?;
243 let arrow_array = default_vector.to_arrow_array();
244 column_meta.column_schema.data_type.as_arrow_type();
245
246 new_fields.push(Arc::new(Field::new(
247 column_meta.column_schema.name.clone(),
248 column_meta.column_schema.data_type.as_arrow_type(),
249 column_meta.column_schema.is_nullable(),
250 )));
251 new_columns.push(arrow_array);
252 }
253
254 let new_schema = Arc::new(Schema::new(new_fields));
256 let new_batch =
257 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
258
259 self.batch = new_batch;
261
262 Ok(())
263 }
264
265 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
267 let vectors = region_metadata
268 .schema
269 .column_schemas()
270 .iter()
271 .map(|col| match self.batch.column_by_name(&col.name) {
272 None => Ok(None),
273 Some(col) => Helper::try_into_vector(col).map(Some),
274 })
275 .collect::<datatypes::error::Result<Vec<_>>>()
276 .context(error::ComputeVectorSnafu)?;
277
278 let rows = (0..self.num_rows())
279 .map(|row_idx| {
280 let values = (0..self.batch.num_columns())
281 .map(|col_idx| {
282 if let Some(v) = &vectors[col_idx] {
283 to_grpc_value(v.get(row_idx))
284 } else {
285 api::v1::Value { value_data: None }
286 }
287 })
288 .collect::<Vec<_>>();
289 api::v1::Row { values }
290 })
291 .collect::<Vec<_>>();
292
293 let schema = region_metadata
294 .column_metadatas
295 .iter()
296 .map(|c| {
297 let data_type_wrapper =
298 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
299 Ok(api::v1::ColumnSchema {
300 column_name: c.column_schema.name.clone(),
301 datatype: data_type_wrapper.datatype() as i32,
302 semantic_type: c.semantic_type as i32,
303 ..Default::default()
304 })
305 })
306 .collect::<api::error::Result<Vec<_>>>()
307 .context(error::ConvertColumnDataTypeSnafu {
308 reason: "failed to convert region metadata to column schema",
309 })?;
310
311 let rows = api::v1::Rows { schema, rows };
312
313 Ok(Mutation {
314 op_type: OpType::Put as i32,
315 sequence: self.sequence,
316 rows: Some(rows),
317 write_hint: None,
318 })
319 }
320
321 pub fn timestamps(&self) -> &ArrayRef {
322 self.batch.column(self.timestamp_index)
323 }
324
325 pub fn num_rows(&self) -> usize {
326 self.batch.num_rows()
327 }
328}
329
330pub struct UnorderedPart {
333 parts: Vec<BulkPart>,
335 total_rows: usize,
337 min_timestamp: i64,
339 max_timestamp: i64,
341 max_sequence: u64,
343 threshold: usize,
345 compact_threshold: usize,
347}
348
349impl Default for UnorderedPart {
350 fn default() -> Self {
351 Self::new()
352 }
353}
354
355impl UnorderedPart {
356 pub fn new() -> Self {
358 Self {
359 parts: Vec::new(),
360 total_rows: 0,
361 min_timestamp: i64::MAX,
362 max_timestamp: i64::MIN,
363 max_sequence: 0,
364 threshold: 1024,
365 compact_threshold: 4096,
366 }
367 }
368
369 pub fn set_threshold(&mut self, threshold: usize) {
371 self.threshold = threshold;
372 }
373
374 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
376 self.compact_threshold = compact_threshold;
377 }
378
379 pub fn threshold(&self) -> usize {
381 self.threshold
382 }
383
384 pub fn compact_threshold(&self) -> usize {
386 self.compact_threshold
387 }
388
389 pub fn should_accept(&self, num_rows: usize) -> bool {
391 num_rows < self.threshold
392 }
393
394 pub fn should_compact(&self) -> bool {
396 self.total_rows >= self.compact_threshold
397 }
398
399 pub fn push(&mut self, part: BulkPart) {
401 self.total_rows += part.num_rows();
402 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
403 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
404 self.max_sequence = self.max_sequence.max(part.sequence);
405 self.parts.push(part);
406 }
407
408 pub fn num_rows(&self) -> usize {
410 self.total_rows
411 }
412
413 pub fn is_empty(&self) -> bool {
415 self.parts.is_empty()
416 }
417
418 pub fn num_parts(&self) -> usize {
420 self.parts.len()
421 }
422
423 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
426 if self.parts.is_empty() {
427 return Ok(None);
428 }
429
430 if self.parts.len() == 1 {
431 return Ok(Some(self.parts[0].batch.clone()));
433 }
434
435 let schema = self.parts[0].batch.schema();
437 let concatenated = if schema.fields().iter().any(is_structured_json_field) {
438 let (schema, batches) = align_parts(&self.parts)?;
439 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
440 } else {
441 arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
442 .context(ComputeArrowSnafu)?
443 };
444
445 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
447
448 Ok(Some(sorted_batch))
449 }
450
451 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
454 let Some(sorted_batch) = self.concat_and_sort()? else {
455 return Ok(None);
456 };
457
458 let timestamp_index = self.parts[0].timestamp_index;
459
460 Ok(Some(BulkPart {
461 batch: sorted_batch,
462 max_timestamp: self.max_timestamp,
463 min_timestamp: self.min_timestamp,
464 sequence: self.max_sequence,
465 timestamp_index,
466 raw_data: None,
467 }))
468 }
469
470 pub fn clear(&mut self) {
472 self.parts.clear();
473 self.total_rows = 0;
474 self.min_timestamp = i64::MAX;
475 self.max_timestamp = i64::MIN;
476 self.max_sequence = 0;
477 }
478}
479
480fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
483 debug_assert!(
484 !parts.is_empty()
485 && parts
486 .windows(2)
487 .all(|w| w[0].batch.schema_ref().fields().len()
488 == w[1].batch.schema_ref().fields().len())
489 );
490
491 let first = &parts[0];
492 let base_schema = first.batch.schema_ref();
493 let rest = &parts[1..];
494
495 let mut merged_types = HashMap::new();
496 let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
497 for (i, field) in base_schema.fields().iter().enumerate() {
498 if is_structured_json_field(field) {
499 let mut merged = JsonType::from(field.data_type());
500 rest.iter()
501 .try_fold(&mut merged, |acc, x| {
502 acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
503 Ok(acc)
504 })
505 .context(DataTypeMismatchSnafu)?;
506 merged_types.insert(i, merged.as_arrow_type());
507
508 aligned_fields.push(Arc::new(
509 Field::new(
510 field.name().clone(),
511 merged.as_arrow_type(),
512 field.is_nullable(),
513 )
514 .with_metadata(field.metadata().clone()),
515 ));
516 } else {
517 aligned_fields.push(field.clone())
518 };
519 }
520 let aligned_schema = Arc::new(Schema::new_with_metadata(
521 aligned_fields,
522 base_schema.metadata().clone(),
523 ));
524
525 let mut aligned_batches = Vec::with_capacity(parts.len());
526 for part in parts {
527 let mut columns = Vec::with_capacity(part.batch.num_columns());
528 for (i, column) in part.batch.columns().iter().enumerate() {
529 if let Some(expect) = merged_types.get(&i) {
530 columns.push(
531 JsonArray::from(column)
532 .try_align(expect)
533 .context(ConvertValueSnafu)?,
534 );
535 } else {
536 columns.push(column.clone());
537 }
538 }
539 aligned_batches.push(
540 RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
541 );
542 }
543
544 Ok((aligned_schema, aligned_batches))
545}
546
547pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
549 batch
550 .columns()
551 .iter()
552 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
554 .sum()
555}
556
557enum PrimaryKeyColumnBuilder {
559 StringDict(StringDictionaryBuilder<UInt32Type>),
561 Vector(Box<dyn MutableVector>),
563}
564
565impl PrimaryKeyColumnBuilder {
566 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
568 match self {
569 PrimaryKeyColumnBuilder::StringDict(builder) => {
570 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
571 builder.append_value(s);
573 } else {
574 builder.append_null();
575 }
576 }
577 PrimaryKeyColumnBuilder::Vector(builder) => {
578 builder.push_value_ref(&value);
579 }
580 }
581 Ok(())
582 }
583
584 fn into_arrow_array(self) -> ArrayRef {
586 match self {
587 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
588 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
589 }
590 }
591}
592
593pub struct BulkPartConverter {
595 schema: SchemaRef,
597 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
599 key_buf: Vec<u8>,
601 key_array_builder: PrimaryKeyArrayBuilder,
603 value_builder: ValueBuilder,
605 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
608
609 max_ts: i64,
611 min_ts: i64,
613 max_sequence: SequenceNumber,
615}
616
617impl BulkPartConverter {
618 pub fn new(
623 region_metadata: &RegionMetadataRef,
624 schema: SchemaRef,
625 capacity: usize,
626 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
627 store_primary_key_columns: bool,
628 ) -> Self {
629 debug_assert_eq!(
630 region_metadata.primary_key_encoding,
631 primary_key_codec.encoding()
632 );
633
634 let primary_key_column_builders = if store_primary_key_columns
635 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
636 {
637 new_primary_key_column_builders(region_metadata, capacity)
638 } else {
639 Vec::new()
640 };
641
642 Self {
643 schema,
644 primary_key_codec,
645 key_buf: Vec::new(),
646 key_array_builder: PrimaryKeyArrayBuilder::new(),
647 value_builder: ValueBuilder::new(region_metadata, capacity),
648 primary_key_column_builders,
649 min_ts: i64::MAX,
650 max_ts: i64::MIN,
651 max_sequence: SequenceNumber::MIN,
652 }
653 }
654
655 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
657 for kv in key_values.iter() {
658 self.append_key_value(&kv)?;
659 }
660
661 Ok(())
662 }
663
664 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
668 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
670 let mut primary_keys = kv.primary_keys();
673 if let Some(encoded) = primary_keys
674 .next()
675 .context(ColumnNotFoundSnafu {
676 column: PRIMARY_KEY_COLUMN_NAME,
677 })?
678 .try_into_binary()
679 .context(DataTypeMismatchSnafu)?
680 {
681 self.key_array_builder
682 .append(encoded)
683 .context(ComputeArrowSnafu)?;
684 } else {
685 self.key_array_builder
686 .append("")
687 .context(ComputeArrowSnafu)?;
688 }
689 } else {
690 self.key_buf.clear();
692 self.primary_key_codec
693 .encode_key_value(kv, &mut self.key_buf)
694 .context(EncodeSnafu)?;
695 self.key_array_builder
696 .append(&self.key_buf)
697 .context(ComputeArrowSnafu)?;
698 };
699
700 if !self.primary_key_column_builders.is_empty() {
702 for (builder, pk_value) in self
703 .primary_key_column_builders
704 .iter_mut()
705 .zip(kv.primary_keys())
706 {
707 builder.push_value_ref(pk_value)?;
708 }
709 }
710
711 self.value_builder.push(
713 kv.timestamp(),
714 kv.sequence(),
715 kv.op_type() as u8,
716 kv.fields(),
717 );
718
719 let ts = kv
722 .timestamp()
723 .try_into_timestamp()
724 .unwrap()
725 .unwrap()
726 .value();
727 self.min_ts = self.min_ts.min(ts);
728 self.max_ts = self.max_ts.max(ts);
729 self.max_sequence = self.max_sequence.max(kv.sequence());
730
731 Ok(())
732 }
733
734 pub fn convert(mut self) -> Result<BulkPart> {
738 let values = Values::from(self.value_builder);
739 let mut columns =
740 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
741
742 for builder in self.primary_key_column_builders {
744 columns.push(builder.into_arrow_array());
745 }
746 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
748 let timestamp_index = columns.len();
750 columns.push(values.timestamp.to_arrow_array());
751 let pk_array = self.key_array_builder.finish();
753 columns.push(Arc::new(pk_array));
754 columns.push(values.sequence.to_arrow_array());
756 columns.push(values.op_type.to_arrow_array());
757
758 let schema = align_schema_with_json_array(self.schema, &columns);
761 let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
762 let batch = sort_primary_key_record_batch(&batch)?;
764
765 Ok(BulkPart {
766 batch,
767 max_timestamp: self.max_ts,
768 min_timestamp: self.min_ts,
769 sequence: self.max_sequence,
770 timestamp_index,
771 raw_data: None,
772 })
773 }
774}
775
776fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
777 if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
778 return schema;
779 }
780
781 let mut fields = Vec::with_capacity(schema.fields().len());
782 for (field, array) in schema.fields().iter().zip(columns) {
783 if !is_structured_json_field(field) {
784 fields.push(field.clone());
785 continue;
786 }
787
788 let mut field = field.as_ref().clone();
789 field.set_data_type(array.data_type().clone());
790 fields.push(Arc::new(field));
791 }
792
793 Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
794}
795
796fn new_primary_key_column_builders(
797 metadata: &RegionMetadata,
798 capacity: usize,
799) -> Vec<PrimaryKeyColumnBuilder> {
800 metadata
801 .primary_key_columns()
802 .map(|col| {
803 if col.column_schema.data_type.is_string() {
804 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
805 capacity,
806 INIT_DICT_VALUE_CAPACITY,
807 capacity,
808 ))
809 } else {
810 PrimaryKeyColumnBuilder::Vector(
811 col.column_schema.data_type.create_mutable_vector(capacity),
812 )
813 }
814 })
815 .collect()
816}
817
818pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
820 let total_columns = batch.num_columns();
821 let sort_columns = vec![
822 SortColumn {
824 values: batch.column(total_columns - 3).clone(),
825 options: Some(SortOptions {
826 descending: false,
827 nulls_first: true,
828 }),
829 },
830 SortColumn {
832 values: batch.column(total_columns - 4).clone(),
833 options: Some(SortOptions {
834 descending: false,
835 nulls_first: true,
836 }),
837 },
838 SortColumn {
840 values: batch.column(total_columns - 2).clone(),
841 options: Some(SortOptions {
842 descending: true,
843 nulls_first: true,
844 }),
845 },
846 ];
847
848 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
849 .context(ComputeArrowSnafu)?;
850
851 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
852}
853
854pub fn convert_bulk_part(
880 part: BulkPart,
881 region_metadata: &RegionMetadataRef,
882 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
883 schema: SchemaRef,
884 store_primary_key_columns: bool,
885) -> Result<Option<BulkPart>> {
886 if part.num_rows() == 0 {
887 return Ok(None);
888 }
889
890 let num_rows = part.num_rows();
891 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
892
893 let input_schema = part.batch.schema();
895 let column_indices: HashMap<&str, usize> = input_schema
896 .fields()
897 .iter()
898 .enumerate()
899 .map(|(idx, field)| (field.name().as_str(), idx))
900 .collect();
901
902 let mut output_columns = Vec::new();
904
905 let pk_array = if is_sparse {
907 None
910 } else {
911 let pk_vectors: Result<Vec<_>> = region_metadata
913 .primary_key_columns()
914 .map(|col_meta| {
915 let col_idx = column_indices
916 .get(col_meta.column_schema.name.as_str())
917 .context(ColumnNotFoundSnafu {
918 column: &col_meta.column_schema.name,
919 })?;
920 let col = part.batch.column(*col_idx);
921 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
922 })
923 .collect();
924 let pk_vectors = pk_vectors?;
925
926 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
927 let mut encode_buf = Vec::new();
928
929 for row_idx in 0..num_rows {
930 encode_buf.clear();
931
932 let pk_values_with_ids: Vec<_> = region_metadata
934 .primary_key
935 .iter()
936 .zip(pk_vectors.iter())
937 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
938 .collect();
939
940 primary_key_codec
942 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
943 .context(EncodeSnafu)?;
944
945 key_array_builder
946 .append(&encode_buf)
947 .context(ComputeArrowSnafu)?;
948 }
949
950 Some(key_array_builder.finish())
951 };
952
953 if store_primary_key_columns && !is_sparse {
955 for col_meta in region_metadata.primary_key_columns() {
956 let col_idx = column_indices
957 .get(col_meta.column_schema.name.as_str())
958 .context(ColumnNotFoundSnafu {
959 column: &col_meta.column_schema.name,
960 })?;
961 let col = part.batch.column(*col_idx);
962
963 let col = if col_meta.column_schema.data_type.is_string() {
965 let target_type = ArrowDataType::Dictionary(
966 Box::new(ArrowDataType::UInt32),
967 Box::new(ArrowDataType::Utf8),
968 );
969 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
970 } else {
971 col.clone()
972 };
973 output_columns.push(col);
974 }
975 }
976
977 for col_meta in region_metadata.field_columns() {
979 let col_idx = column_indices
980 .get(col_meta.column_schema.name.as_str())
981 .context(ColumnNotFoundSnafu {
982 column: &col_meta.column_schema.name,
983 })?;
984 output_columns.push(part.batch.column(*col_idx).clone());
985 }
986
987 let new_timestamp_index = output_columns.len();
989 let ts_col_idx = column_indices
990 .get(
991 region_metadata
992 .time_index_column()
993 .column_schema
994 .name
995 .as_str(),
996 )
997 .context(ColumnNotFoundSnafu {
998 column: ®ion_metadata.time_index_column().column_schema.name,
999 })?;
1000 output_columns.push(part.batch.column(*ts_col_idx).clone());
1001
1002 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
1004 Arc::new(pk_dict_array) as ArrayRef
1005 } else {
1006 let pk_col_idx =
1007 column_indices
1008 .get(PRIMARY_KEY_COLUMN_NAME)
1009 .context(ColumnNotFoundSnafu {
1010 column: PRIMARY_KEY_COLUMN_NAME,
1011 })?;
1012 let col = part.batch.column(*pk_col_idx);
1013
1014 let target_type = ArrowDataType::Dictionary(
1016 Box::new(ArrowDataType::UInt32),
1017 Box::new(ArrowDataType::Binary),
1018 );
1019 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
1020 };
1021 output_columns.push(pk_dictionary);
1022
1023 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
1024 output_columns.push(Arc::new(sequence_array) as ArrayRef);
1025
1026 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
1027 output_columns.push(Arc::new(op_type_array) as ArrayRef);
1028
1029 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
1030
1031 let sorted_batch = sort_primary_key_record_batch(&batch)?;
1033
1034 Ok(Some(BulkPart {
1035 batch: sorted_batch,
1036 max_timestamp: part.max_timestamp,
1037 min_timestamp: part.min_timestamp,
1038 sequence: part.sequence,
1039 timestamp_index: new_timestamp_index,
1040 raw_data: None,
1041 }))
1042}
1043
1044#[derive(Debug, Clone)]
1045pub struct EncodedBulkPart {
1046 data: Bytes,
1047 metadata: BulkPartMeta,
1048}
1049
1050impl EncodedBulkPart {
1051 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
1052 Self { data, metadata }
1053 }
1054
1055 pub fn metadata(&self) -> &BulkPartMeta {
1056 &self.metadata
1057 }
1058
1059 pub(crate) fn size_bytes(&self) -> usize {
1061 self.data.len()
1062 }
1063
1064 pub fn data(&self) -> &Bytes {
1066 &self.data
1067 }
1068
1069 pub fn to_memtable_stats(&self) -> MemtableStats {
1071 let meta = &self.metadata;
1072 let ts_type = meta.region_metadata.time_index_type();
1073 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1074 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1075
1076 MemtableStats {
1077 estimated_bytes: self.size_bytes(),
1078 time_range: Some((min_ts, max_ts)),
1079 num_rows: meta.num_rows,
1080 num_ranges: 1,
1081 max_sequence: meta.max_sequence,
1082 series_count: meta.num_series as usize,
1083 }
1084 }
1085
1086 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1094 let unit = self.metadata.region_metadata.time_index_type().unit();
1095 let max_row_group_uncompressed_size: u64 = self
1096 .metadata
1097 .parquet_metadata
1098 .row_groups()
1099 .iter()
1100 .map(|rg| {
1101 rg.columns()
1102 .iter()
1103 .map(|c| c.uncompressed_size() as u64)
1104 .sum::<u64>()
1105 })
1106 .max()
1107 .unwrap_or(0);
1108 SstInfo {
1109 file_id,
1110 time_range: (
1111 Timestamp::new(self.metadata.min_timestamp, unit),
1112 Timestamp::new(self.metadata.max_timestamp, unit),
1113 ),
1114 file_size: self.data.len() as u64,
1115 max_row_group_uncompressed_size,
1116 num_rows: self.metadata.num_rows,
1117 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1118 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1119 index_metadata: IndexOutput::default(),
1120 num_series: self.metadata.num_series,
1121 }
1122 }
1123
1124 pub(crate) fn read(
1125 &self,
1126 context: BulkIterContextRef,
1127 sequence: Option<SequenceRange>,
1128 mem_scan_metrics: Option<MemScanMetrics>,
1129 ) -> Result<Option<BoxedRecordBatchIterator>> {
1130 let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1132
1133 let row_groups_to_read =
1135 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1136
1137 if row_groups_to_read.is_empty() {
1138 return Ok(None);
1140 }
1141
1142 let iter = EncodedBulkPartIter::try_new(
1143 self,
1144 context,
1145 row_groups_to_read,
1146 sequence,
1147 mem_scan_metrics,
1148 )?;
1149 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1150 }
1151}
1152
1153#[derive(Debug, Clone)]
1155pub struct BulkPartMeta {
1156 pub num_rows: usize,
1158 pub max_timestamp: i64,
1160 pub min_timestamp: i64,
1162 pub parquet_metadata: Arc<ParquetMetaData>,
1164 pub region_metadata: RegionMetadataRef,
1166 pub num_series: u64,
1168 pub max_sequence: u64,
1170}
1171
1172#[derive(Default, Debug)]
1174pub struct BulkPartEncodeMetrics {
1175 pub iter_cost: Duration,
1177 pub write_cost: Duration,
1179 pub raw_size: usize,
1181 pub encoded_size: usize,
1183 pub num_rows: usize,
1185}
1186
1187pub struct BulkPartEncoder {
1188 metadata: RegionMetadataRef,
1189 writer_props: Option<WriterProperties>,
1190}
1191
1192impl BulkPartEncoder {
1193 pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1194 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1196 let key_value_meta =
1197 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1198
1199 let writer_props = Some(
1201 WriterProperties::builder()
1202 .set_key_value_metadata(Some(vec![key_value_meta]))
1203 .set_write_batch_size(row_group_size)
1204 .set_max_row_group_row_count(Some(row_group_size))
1205 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1206 .set_column_index_truncate_length(None)
1207 .set_statistics_truncate_length(None)
1208 .build(),
1209 );
1210
1211 Ok(Self {
1212 metadata,
1213 writer_props,
1214 })
1215 }
1216}
1217
1218impl BulkPartEncoder {
1219 pub fn encode_record_batch_iter(
1221 &self,
1222 iter: BoxedRecordBatchIterator,
1223 arrow_schema: SchemaRef,
1224 min_timestamp: i64,
1225 max_timestamp: i64,
1226 max_sequence: u64,
1227 metrics: &mut BulkPartEncodeMetrics,
1228 ) -> Result<Option<EncodedBulkPart>> {
1229 let mut buf = Vec::with_capacity(4096);
1230 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1231 .context(EncodeMemtableSnafu)?;
1232 let mut total_rows = 0;
1233 let mut series_estimator = SeriesEstimator::default();
1234
1235 let mut iter_start = Instant::now();
1237 for batch_result in iter {
1238 metrics.iter_cost += iter_start.elapsed();
1239 let batch = batch_result?;
1240 if batch.num_rows() == 0 {
1241 continue;
1242 }
1243
1244 series_estimator.update_flat(&batch);
1245 metrics.raw_size += record_batch_estimated_size(&batch);
1246 let write_start = Instant::now();
1247 writer.write(&batch).context(EncodeMemtableSnafu)?;
1248 metrics.write_cost += write_start.elapsed();
1249 total_rows += batch.num_rows();
1250 iter_start = Instant::now();
1251 }
1252 metrics.iter_cost += iter_start.elapsed();
1253
1254 if total_rows == 0 {
1255 return Ok(None);
1256 }
1257
1258 let close_start = Instant::now();
1259 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1260 metrics.write_cost += close_start.elapsed();
1261 metrics.encoded_size += buf.len();
1262 metrics.num_rows += total_rows;
1263
1264 let buf = Bytes::from(buf);
1265 let parquet_metadata = Arc::new(file_metadata);
1266 let num_series = series_estimator.finish();
1267
1268 Ok(Some(EncodedBulkPart {
1269 data: buf,
1270 metadata: BulkPartMeta {
1271 num_rows: total_rows,
1272 max_timestamp,
1273 min_timestamp,
1274 parquet_metadata,
1275 region_metadata: self.metadata.clone(),
1276 num_series,
1277 max_sequence,
1278 },
1279 }))
1280 }
1281
1282 pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1284 if part.batch.num_rows() == 0 {
1285 return Ok(None);
1286 }
1287
1288 let mut buf = Vec::with_capacity(4096);
1289 let arrow_schema = part.batch.schema();
1290
1291 let file_metadata = {
1292 let mut writer =
1293 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1294 .context(EncodeMemtableSnafu)?;
1295 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1296 writer.finish().context(EncodeMemtableSnafu)?
1297 };
1298
1299 let buf = Bytes::from(buf);
1300 let parquet_metadata = Arc::new(file_metadata);
1301
1302 Ok(Some(EncodedBulkPart {
1303 data: buf,
1304 metadata: BulkPartMeta {
1305 num_rows: part.batch.num_rows(),
1306 max_timestamp: part.max_timestamp,
1307 min_timestamp: part.min_timestamp,
1308 parquet_metadata,
1309 region_metadata: self.metadata.clone(),
1310 num_series: part.estimated_series_count() as u64,
1311 max_sequence: part.sequence,
1312 },
1313 }))
1314 }
1315}
1316
1317#[derive(Debug, Clone)]
1323struct BatchStats {
1324 num_batches: usize,
1326 first_tag_id: ColumnId,
1328 min_values: ArrayRef,
1330 max_values: ArrayRef,
1332}
1333
1334impl BatchStats {
1335 fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1340 let first_tag_id = *metadata.primary_key.first()?;
1345 let first_tag_column = metadata.column_by_id(first_tag_id)?;
1346 let data_type = &first_tag_column.column_schema.data_type;
1347
1348 let converter = build_primary_key_codec_with_fields(
1349 metadata.primary_key_encoding,
1350 [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1351 );
1352 let pk_index = primary_key_column_index(batches.first()?.num_columns());
1353
1354 let mut min_builder = data_type.create_mutable_vector(batches.len());
1355 let mut max_builder = data_type.create_mutable_vector(batches.len());
1356
1357 for batch in batches {
1358 match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1359 Some((min_val, max_val)) => {
1360 min_builder.push_value_ref(&min_val.as_value_ref());
1361 max_builder.push_value_ref(&max_val.as_value_ref());
1362 }
1363 None => {
1364 min_builder.push_null();
1365 max_builder.push_null();
1366 }
1367 }
1368 }
1369
1370 Some(Self {
1371 num_batches: batches.len(),
1372 first_tag_id,
1373 min_values: min_builder.to_vector().to_arrow_array(),
1374 max_values: max_builder.to_vector().to_arrow_array(),
1375 })
1376 }
1377
1378 fn extract_first_tag_bounds(
1380 batch: &RecordBatch,
1381 pk_index: usize,
1382 converter: &dyn PrimaryKeyCodec,
1383 ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1384 if batch.num_rows() == 0 {
1385 return None;
1386 }
1387
1388 let pk_dict = batch
1389 .column(pk_index)
1390 .as_any()
1391 .downcast_ref::<PrimaryKeyArray>()?;
1392 let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1393
1394 let keys = pk_dict.keys();
1395 let min_key = keys.value(0);
1396 let max_key = keys.value(batch.num_rows() - 1);
1397 let min_bytes = pk_values.value(min_key as usize);
1398 let max_bytes = pk_values.value(max_key as usize);
1399
1400 Some((
1401 converter.decode_leftmost(min_bytes).ok()??,
1402 converter.decode_leftmost(max_bytes).ok()??,
1403 ))
1404 }
1405}
1406
1407struct BatchPruningStats<'a> {
1412 stats: &'a BatchStats,
1413 metadata: &'a RegionMetadataRef,
1414}
1415
1416impl PruningStatistics for BatchPruningStats<'_> {
1417 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1418 let col = self.metadata.column_by_name(&column.name)?;
1419 if col.column_id == self.stats.first_tag_id {
1420 Some(self.stats.min_values.clone())
1421 } else {
1422 None
1423 }
1424 }
1425
1426 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1427 let col = self.metadata.column_by_name(&column.name)?;
1428 if col.column_id == self.stats.first_tag_id {
1429 Some(self.stats.max_values.clone())
1430 } else {
1431 None
1432 }
1433 }
1434
1435 fn num_containers(&self) -> usize {
1436 self.stats.num_batches
1437 }
1438
1439 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1440 None
1441 }
1442
1443 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1444 None
1445 }
1446
1447 fn contained(
1448 &self,
1449 _column: &Column,
1450 _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1451 ) -> Option<BooleanArray> {
1452 None
1453 }
1454}
1455
1456fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1458 let mut columns = HashSet::new();
1459 for expr in predicate.exprs() {
1460 let _ = expr_to_columns(expr, &mut columns);
1461 }
1462 columns.iter().any(|col| col.name == column_name)
1463}
1464
1465pub(crate) fn should_prune_bulk_part(
1469 batch: &RecordBatch,
1470 context: &BulkIterContext,
1471 metadata: &RegionMetadata,
1472) -> bool {
1473 let predicate = match &context.predicate {
1474 Some(p) => p,
1475 None => return false,
1476 };
1477 let first_tag_id = match metadata.primary_key.first() {
1480 Some(id) => *id,
1481 None => return false,
1482 };
1483 let first_tag_name = &metadata
1485 .column_by_id(first_tag_id)
1486 .unwrap()
1487 .column_schema
1488 .name;
1489 if !predicate_references_column(predicate, first_tag_name) {
1490 return false;
1491 }
1492 let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1493 Some(s) => s,
1494 None => return false,
1495 };
1496 let region_meta = context.read_format().metadata();
1497 let pruning_stats = BatchPruningStats {
1498 stats: &stats,
1499 metadata: region_meta,
1500 };
1501 let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1502 !mask.first().copied().unwrap_or(true)
1503}
1504
1505#[derive(Debug, Clone)]
1511pub struct MultiBulkPart {
1512 batches: SmallVec<[RecordBatch; 4]>,
1514 total_rows: usize,
1516 max_timestamp: i64,
1518 min_timestamp: i64,
1520 max_sequence: SequenceNumber,
1522 series_count: usize,
1524 batch_stats: Option<BatchStats>,
1527}
1528
1529impl MultiBulkPart {
1530 pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1532 let num_rows = part.num_rows();
1533 let series_count = part.estimated_series_count();
1534 let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1535 let mut batches = SmallVec::new();
1536 batches.push(part.batch);
1537
1538 Self {
1539 batches,
1540 total_rows: num_rows,
1541 max_timestamp: part.max_timestamp,
1542 min_timestamp: part.min_timestamp,
1543 max_sequence: part.sequence,
1544 series_count,
1545 batch_stats,
1546 }
1547 }
1548
1549 pub fn new(
1562 batches: Vec<RecordBatch>,
1563 min_timestamp: i64,
1564 max_timestamp: i64,
1565 max_sequence: SequenceNumber,
1566 series_count: usize,
1567 metadata: &RegionMetadata,
1568 ) -> Self {
1569 assert!(!batches.is_empty(), "batches must not be empty");
1570
1571 let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1572 let batch_stats = BatchStats::compute(&batches, metadata);
1573
1574 Self {
1575 batches: SmallVec::from_vec(batches),
1576 total_rows,
1577 max_timestamp,
1578 min_timestamp,
1579 max_sequence,
1580 series_count,
1581 batch_stats,
1582 }
1583 }
1584
1585 pub fn num_rows(&self) -> usize {
1587 self.total_rows
1588 }
1589
1590 pub fn min_timestamp(&self) -> i64 {
1592 self.min_timestamp
1593 }
1594
1595 pub fn max_timestamp(&self) -> i64 {
1597 self.max_timestamp
1598 }
1599
1600 pub fn max_sequence(&self) -> SequenceNumber {
1602 self.max_sequence
1603 }
1604
1605 pub fn series_count(&self) -> usize {
1607 self.series_count
1608 }
1609
1610 pub fn num_batches(&self) -> usize {
1612 self.batches.len()
1613 }
1614
1615 pub(crate) fn estimated_size(&self) -> usize {
1617 self.batches.iter().map(record_batch_estimated_size).sum()
1618 }
1619
1620 pub(crate) fn read(
1626 &self,
1627 context: BulkIterContextRef,
1628 sequence: Option<SequenceRange>,
1629 mem_scan_metrics: Option<MemScanMetrics>,
1630 ) -> Result<Option<BoxedRecordBatchIterator>> {
1631 if self.batches.is_empty() {
1632 return Ok(None);
1633 }
1634
1635 let batches_to_read = self.prune_batches(&context);
1636
1637 if batches_to_read.is_empty() {
1638 return Ok(None);
1639 }
1640
1641 let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1642 batches_to_read,
1643 context,
1644 sequence,
1645 self.series_count,
1646 mem_scan_metrics,
1647 );
1648 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1649 }
1650
1651 fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1654 if let Some(stats) = &self.batch_stats
1655 && let Some(predicate) = &context.predicate
1656 {
1657 let region_meta = context.read_format().metadata();
1658 let pruning_stats = BatchPruningStats {
1659 stats,
1660 metadata: region_meta,
1661 };
1662 let mask =
1663 predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1664 self.batches
1665 .iter()
1666 .zip(mask.iter())
1667 .filter_map(
1668 |(batch, &selected)| {
1669 if selected { Some(batch.clone()) } else { None }
1670 },
1671 )
1672 .collect()
1673 } else {
1674 self.batches.iter().cloned().collect()
1675 }
1676 }
1677
1678 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1680 let ts_type = region_metadata.time_index_type();
1681 let min_ts = ts_type.create_timestamp(self.min_timestamp);
1682 let max_ts = ts_type.create_timestamp(self.max_timestamp);
1683
1684 MemtableStats {
1685 estimated_bytes: self.estimated_size(),
1686 time_range: Some((min_ts, max_ts)),
1687 num_rows: self.num_rows(),
1688 num_ranges: 1,
1689 max_sequence: self.max_sequence,
1690 series_count: self.series_count,
1691 }
1692 }
1693}
1694
1695#[cfg(test)]
1696mod tests {
1697 use api::v1::{Row, SemanticType, WriteHint};
1698 use datafusion_common::ScalarValue;
1699 use datatypes::arrow::array::{
1700 BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1701 };
1702 use datatypes::arrow::datatypes::UInt32Type;
1703 use datatypes::prelude::{ConcreteDataType, Value};
1704 use datatypes::schema::ColumnSchema;
1705 use mito_codec::row_converter::build_primary_key_codec;
1706 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1707 use store_api::storage::RegionId;
1708 use store_api::storage::consts::ReservedColumnId;
1709 use table::predicate::Predicate;
1710
1711 use super::*;
1712 use crate::memtable::bulk::context::BulkIterContext;
1713 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1714 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1715
1716 struct MutationInput<'a> {
1717 k0: &'a str,
1718 k1: u32,
1719 timestamps: &'a [i64],
1720 v1: &'a [Option<f64>],
1721 sequence: u64,
1722 }
1723
1724 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1725 let metadata = metadata_for_test();
1726 let kvs = input
1727 .iter()
1728 .map(|m| {
1729 build_key_values_with_ts_seq_values(
1730 &metadata,
1731 m.k0.to_string(),
1732 m.k1,
1733 m.timestamps.iter().copied(),
1734 m.v1.iter().copied(),
1735 m.sequence,
1736 )
1737 })
1738 .collect::<Vec<_>>();
1739 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1740 let primary_key_codec = build_primary_key_codec(&metadata);
1741 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1742 for kv in kvs {
1743 converter.append_key_values(&kv).unwrap();
1744 }
1745 let part = converter.convert().unwrap();
1746 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1747 encoder.encode_part(&part).unwrap().unwrap()
1748 }
1749
1750 #[test]
1751 fn test_write_and_read_part_projection() {
1752 let part = encode(&[
1753 MutationInput {
1754 k0: "a",
1755 k1: 0,
1756 timestamps: &[1],
1757 v1: &[Some(0.1)],
1758 sequence: 0,
1759 },
1760 MutationInput {
1761 k0: "b",
1762 k1: 0,
1763 timestamps: &[1],
1764 v1: &[Some(0.0)],
1765 sequence: 0,
1766 },
1767 MutationInput {
1768 k0: "a",
1769 k1: 0,
1770 timestamps: &[2],
1771 v1: &[Some(0.2)],
1772 sequence: 1,
1773 },
1774 ]);
1775
1776 let projection = &[4u32];
1777 let reader = part
1778 .read(
1779 Arc::new(
1780 BulkIterContext::new(
1781 part.metadata.region_metadata.clone(),
1782 Some(projection.as_slice()),
1783 None,
1784 false,
1785 )
1786 .unwrap(),
1787 ),
1788 None,
1789 None,
1790 )
1791 .unwrap()
1792 .expect("expect at least one row group");
1793
1794 let mut total_rows_read = 0;
1795 let mut field: Vec<f64> = vec![];
1796 for res in reader {
1797 let batch = res.unwrap();
1798 assert_eq!(5, batch.num_columns());
1799 field.extend_from_slice(
1800 batch
1801 .column(0)
1802 .as_any()
1803 .downcast_ref::<Float64Array>()
1804 .unwrap()
1805 .values(),
1806 );
1807 total_rows_read += batch.num_rows();
1808 }
1809 assert_eq!(3, total_rows_read);
1810 assert_eq!(vec![0.1, 0.2, 0.0], field);
1811 }
1812
1813 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1814 let metadata = metadata_for_test();
1815 let kvs = key_values
1816 .into_iter()
1817 .map(|(k0, k1, (start, end), sequence)| {
1818 let ts = start..end;
1819 let v1 = (start..end).map(|_| None);
1820 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1821 })
1822 .collect::<Vec<_>>();
1823 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1824 let primary_key_codec = build_primary_key_codec(&metadata);
1825 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1826 for kv in kvs {
1827 converter.append_key_values(&kv).unwrap();
1828 }
1829 let part = converter.convert().unwrap();
1830 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1831 encoder.encode_part(&part).unwrap().unwrap()
1832 }
1833
1834 fn check_prune_row_group(
1835 part: &EncodedBulkPart,
1836 predicate: Option<Predicate>,
1837 expected_rows: usize,
1838 ) {
1839 let context = Arc::new(
1840 BulkIterContext::new(
1841 part.metadata.region_metadata.clone(),
1842 None,
1843 predicate,
1844 false,
1845 )
1846 .unwrap(),
1847 );
1848 let reader = part
1849 .read(context, None, None)
1850 .unwrap()
1851 .expect("expect at least one row group");
1852 let mut total_rows_read = 0;
1853 for res in reader {
1854 let batch = res.unwrap();
1855 total_rows_read += batch.num_rows();
1856 }
1857 assert_eq!(expected_rows, total_rows_read);
1859 }
1860
1861 #[test]
1862 fn test_prune_row_groups() {
1863 let part = prepare(vec![
1864 ("a", 0, (0, 40), 1),
1865 ("a", 1, (0, 60), 1),
1866 ("b", 0, (0, 100), 2),
1867 ("b", 1, (100, 180), 3),
1868 ("b", 1, (180, 210), 4),
1869 ]);
1870
1871 let context = Arc::new(
1872 BulkIterContext::new(
1873 part.metadata.region_metadata.clone(),
1874 None,
1875 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1876 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1877 )])),
1878 false,
1879 )
1880 .unwrap(),
1881 );
1882 assert!(part.read(context, None, None).unwrap().is_none());
1883
1884 check_prune_row_group(&part, None, 310);
1885
1886 check_prune_row_group(
1887 &part,
1888 Some(Predicate::new(vec![
1889 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1890 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1891 ])),
1892 40,
1893 );
1894
1895 check_prune_row_group(
1896 &part,
1897 Some(Predicate::new(vec![
1898 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1899 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1900 ])),
1901 60,
1902 );
1903
1904 check_prune_row_group(
1905 &part,
1906 Some(Predicate::new(vec![
1907 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1908 ])),
1909 100,
1910 );
1911
1912 check_prune_row_group(
1913 &part,
1914 Some(Predicate::new(vec![
1915 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1916 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1917 ])),
1918 100,
1919 );
1920
1921 check_prune_row_group(
1923 &part,
1924 Some(Predicate::new(vec![
1925 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1926 ])),
1927 1,
1928 );
1929 }
1930
1931 #[test]
1932 fn test_bulk_part_converter_append_and_convert() {
1933 let metadata = metadata_for_test();
1934 let capacity = 100;
1935 let primary_key_codec = build_primary_key_codec(&metadata);
1936 let schema = to_flat_sst_arrow_schema(
1937 &metadata,
1938 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1939 );
1940
1941 let mut converter =
1942 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1943
1944 let key_values1 = build_key_values_with_ts_seq_values(
1945 &metadata,
1946 "key1".to_string(),
1947 1u32,
1948 vec![1000, 2000].into_iter(),
1949 vec![Some(1.0), Some(2.0)].into_iter(),
1950 1,
1951 );
1952
1953 let key_values2 = build_key_values_with_ts_seq_values(
1954 &metadata,
1955 "key2".to_string(),
1956 2u32,
1957 vec![1500].into_iter(),
1958 vec![Some(3.0)].into_iter(),
1959 2,
1960 );
1961
1962 converter.append_key_values(&key_values1).unwrap();
1963 converter.append_key_values(&key_values2).unwrap();
1964
1965 let bulk_part = converter.convert().unwrap();
1966
1967 assert_eq!(bulk_part.num_rows(), 3);
1968 assert_eq!(bulk_part.min_timestamp, 1000);
1969 assert_eq!(bulk_part.max_timestamp, 2000);
1970 assert_eq!(bulk_part.sequence, 2);
1971 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1972
1973 let schema = bulk_part.batch.schema();
1976 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1977 assert_eq!(
1978 field_names,
1979 vec![
1980 "k0",
1981 "k1",
1982 "v0",
1983 "v1",
1984 "ts",
1985 "__primary_key",
1986 "__sequence",
1987 "__op_type"
1988 ]
1989 );
1990 }
1991
1992 #[test]
1993 fn test_bulk_part_converter_sorting() {
1994 let metadata = metadata_for_test();
1995 let capacity = 100;
1996 let primary_key_codec = build_primary_key_codec(&metadata);
1997 let schema = to_flat_sst_arrow_schema(
1998 &metadata,
1999 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2000 );
2001
2002 let mut converter =
2003 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2004
2005 let key_values1 = build_key_values_with_ts_seq_values(
2006 &metadata,
2007 "z_key".to_string(),
2008 3u32,
2009 vec![3000].into_iter(),
2010 vec![Some(3.0)].into_iter(),
2011 3,
2012 );
2013
2014 let key_values2 = build_key_values_with_ts_seq_values(
2015 &metadata,
2016 "a_key".to_string(),
2017 1u32,
2018 vec![1000].into_iter(),
2019 vec![Some(1.0)].into_iter(),
2020 1,
2021 );
2022
2023 let key_values3 = build_key_values_with_ts_seq_values(
2024 &metadata,
2025 "m_key".to_string(),
2026 2u32,
2027 vec![2000].into_iter(),
2028 vec![Some(2.0)].into_iter(),
2029 2,
2030 );
2031
2032 converter.append_key_values(&key_values1).unwrap();
2033 converter.append_key_values(&key_values2).unwrap();
2034 converter.append_key_values(&key_values3).unwrap();
2035
2036 let bulk_part = converter.convert().unwrap();
2037
2038 assert_eq!(bulk_part.num_rows(), 3);
2039
2040 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2041 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2042
2043 let ts_array = ts_column
2044 .as_any()
2045 .downcast_ref::<TimestampMillisecondArray>()
2046 .unwrap();
2047 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2048
2049 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2050 assert_eq!(seq_array.values(), &[1, 2, 3]);
2051
2052 let schema = bulk_part.batch.schema();
2054 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2055 assert_eq!(
2056 field_names,
2057 vec![
2058 "k0",
2059 "k1",
2060 "v0",
2061 "v1",
2062 "ts",
2063 "__primary_key",
2064 "__sequence",
2065 "__op_type"
2066 ]
2067 );
2068 }
2069
2070 #[test]
2071 fn test_bulk_part_converter_empty() {
2072 let metadata = metadata_for_test();
2073 let capacity = 10;
2074 let primary_key_codec = build_primary_key_codec(&metadata);
2075 let schema = to_flat_sst_arrow_schema(
2076 &metadata,
2077 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2078 );
2079
2080 let converter =
2081 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2082
2083 let bulk_part = converter.convert().unwrap();
2084
2085 assert_eq!(bulk_part.num_rows(), 0);
2086 assert_eq!(bulk_part.min_timestamp, i64::MAX);
2087 assert_eq!(bulk_part.max_timestamp, i64::MIN);
2088 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2089
2090 let schema = bulk_part.batch.schema();
2092 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2093 assert_eq!(
2094 field_names,
2095 vec![
2096 "k0",
2097 "k1",
2098 "v0",
2099 "v1",
2100 "ts",
2101 "__primary_key",
2102 "__sequence",
2103 "__op_type"
2104 ]
2105 );
2106 }
2107
2108 #[test]
2109 fn test_bulk_part_converter_without_primary_key_columns() {
2110 let metadata = metadata_for_test();
2111 let primary_key_codec = build_primary_key_codec(&metadata);
2112 let schema = to_flat_sst_arrow_schema(
2113 &metadata,
2114 &FlatSchemaOptions {
2115 raw_pk_columns: false,
2116 string_pk_use_dict: true,
2117 ..Default::default()
2118 },
2119 );
2120
2121 let capacity = 100;
2122 let mut converter =
2123 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2124
2125 let key_values1 = build_key_values_with_ts_seq_values(
2126 &metadata,
2127 "key1".to_string(),
2128 1u32,
2129 vec![1000, 2000].into_iter(),
2130 vec![Some(1.0), Some(2.0)].into_iter(),
2131 1,
2132 );
2133
2134 let key_values2 = build_key_values_with_ts_seq_values(
2135 &metadata,
2136 "key2".to_string(),
2137 2u32,
2138 vec![1500].into_iter(),
2139 vec![Some(3.0)].into_iter(),
2140 2,
2141 );
2142
2143 converter.append_key_values(&key_values1).unwrap();
2144 converter.append_key_values(&key_values2).unwrap();
2145
2146 let bulk_part = converter.convert().unwrap();
2147
2148 assert_eq!(bulk_part.num_rows(), 3);
2149 assert_eq!(bulk_part.min_timestamp, 1000);
2150 assert_eq!(bulk_part.max_timestamp, 2000);
2151 assert_eq!(bulk_part.sequence, 2);
2152 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2153
2154 let schema = bulk_part.batch.schema();
2156 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2157 assert_eq!(
2158 field_names,
2159 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2160 );
2161 }
2162
2163 #[allow(clippy::too_many_arguments)]
2164 fn build_key_values_with_sparse_encoding(
2165 metadata: &RegionMetadataRef,
2166 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2167 table_id: u32,
2168 tsid: u64,
2169 k0: String,
2170 k1: String,
2171 timestamps: impl Iterator<Item = i64>,
2172 values: impl Iterator<Item = Option<f64>>,
2173 sequence: SequenceNumber,
2174 ) -> KeyValues {
2175 let pk_values = vec![
2177 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2178 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2179 (0, Value::String(k0.clone().into())),
2180 (1, Value::String(k1.clone().into())),
2181 ];
2182 let mut encoded_key = Vec::new();
2183 primary_key_codec
2184 .encode_values(&pk_values, &mut encoded_key)
2185 .unwrap();
2186 assert!(!encoded_key.is_empty());
2187
2188 let column_schema = vec![
2190 api::v1::ColumnSchema {
2191 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2192 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2193 ConcreteDataType::binary_datatype(),
2194 )
2195 .unwrap()
2196 .datatype() as i32,
2197 semantic_type: api::v1::SemanticType::Tag as i32,
2198 ..Default::default()
2199 },
2200 api::v1::ColumnSchema {
2201 column_name: "ts".to_string(),
2202 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2203 ConcreteDataType::timestamp_millisecond_datatype(),
2204 )
2205 .unwrap()
2206 .datatype() as i32,
2207 semantic_type: api::v1::SemanticType::Timestamp as i32,
2208 ..Default::default()
2209 },
2210 api::v1::ColumnSchema {
2211 column_name: "v0".to_string(),
2212 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2213 ConcreteDataType::int64_datatype(),
2214 )
2215 .unwrap()
2216 .datatype() as i32,
2217 semantic_type: api::v1::SemanticType::Field as i32,
2218 ..Default::default()
2219 },
2220 api::v1::ColumnSchema {
2221 column_name: "v1".to_string(),
2222 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2223 ConcreteDataType::float64_datatype(),
2224 )
2225 .unwrap()
2226 .datatype() as i32,
2227 semantic_type: api::v1::SemanticType::Field as i32,
2228 ..Default::default()
2229 },
2230 ];
2231
2232 let rows = timestamps
2233 .zip(values)
2234 .map(|(ts, v)| Row {
2235 values: vec![
2236 api::v1::Value {
2237 value_data: Some(api::v1::value::ValueData::BinaryValue(
2238 encoded_key.clone(),
2239 )),
2240 },
2241 api::v1::Value {
2242 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2243 },
2244 api::v1::Value {
2245 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2246 },
2247 api::v1::Value {
2248 value_data: v.map(api::v1::value::ValueData::F64Value),
2249 },
2250 ],
2251 })
2252 .collect();
2253
2254 let mutation = api::v1::Mutation {
2255 op_type: 1,
2256 sequence,
2257 rows: Some(api::v1::Rows {
2258 schema: column_schema,
2259 rows,
2260 }),
2261 write_hint: Some(WriteHint {
2262 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2263 }),
2264 };
2265 KeyValues::new(metadata.as_ref(), mutation).unwrap()
2266 }
2267
2268 #[test]
2269 fn test_bulk_part_converter_sparse_primary_key_encoding() {
2270 use api::v1::SemanticType;
2271 use datatypes::schema::ColumnSchema;
2272 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2273 use store_api::storage::RegionId;
2274
2275 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2276 builder
2277 .push_column_metadata(ColumnMetadata {
2278 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2279 semantic_type: SemanticType::Tag,
2280 column_id: 0,
2281 })
2282 .push_column_metadata(ColumnMetadata {
2283 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2284 semantic_type: SemanticType::Tag,
2285 column_id: 1,
2286 })
2287 .push_column_metadata(ColumnMetadata {
2288 column_schema: ColumnSchema::new(
2289 "ts",
2290 ConcreteDataType::timestamp_millisecond_datatype(),
2291 false,
2292 ),
2293 semantic_type: SemanticType::Timestamp,
2294 column_id: 2,
2295 })
2296 .push_column_metadata(ColumnMetadata {
2297 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2298 semantic_type: SemanticType::Field,
2299 column_id: 3,
2300 })
2301 .push_column_metadata(ColumnMetadata {
2302 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2303 semantic_type: SemanticType::Field,
2304 column_id: 4,
2305 })
2306 .primary_key(vec![0, 1])
2307 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2308 let metadata = Arc::new(builder.build().unwrap());
2309
2310 let primary_key_codec = build_primary_key_codec(&metadata);
2311 let schema = to_flat_sst_arrow_schema(
2312 &metadata,
2313 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2314 );
2315
2316 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2317 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2318
2319 let capacity = 100;
2320 let mut converter =
2321 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2322
2323 let key_values1 = build_key_values_with_sparse_encoding(
2324 &metadata,
2325 &primary_key_codec,
2326 2048u32, 100u64, "key11".to_string(),
2329 "key21".to_string(),
2330 vec![1000, 2000].into_iter(),
2331 vec![Some(1.0), Some(2.0)].into_iter(),
2332 1,
2333 );
2334
2335 let key_values2 = build_key_values_with_sparse_encoding(
2336 &metadata,
2337 &primary_key_codec,
2338 4096u32, 200u64, "key12".to_string(),
2341 "key22".to_string(),
2342 vec![1500].into_iter(),
2343 vec![Some(3.0)].into_iter(),
2344 2,
2345 );
2346
2347 converter.append_key_values(&key_values1).unwrap();
2348 converter.append_key_values(&key_values2).unwrap();
2349
2350 let bulk_part = converter.convert().unwrap();
2351
2352 assert_eq!(bulk_part.num_rows(), 3);
2353 assert_eq!(bulk_part.min_timestamp, 1000);
2354 assert_eq!(bulk_part.max_timestamp, 2000);
2355 assert_eq!(bulk_part.sequence, 2);
2356 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2357
2358 let schema = bulk_part.batch.schema();
2362 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2363 assert_eq!(
2364 field_names,
2365 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2366 );
2367
2368 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2370 let dict_array = primary_key_column
2371 .as_any()
2372 .downcast_ref::<DictionaryArray<UInt32Type>>()
2373 .unwrap();
2374
2375 assert!(!dict_array.is_empty());
2377 assert_eq!(dict_array.len(), 3); let values = dict_array
2381 .values()
2382 .as_any()
2383 .downcast_ref::<BinaryArray>()
2384 .unwrap();
2385 for i in 0..values.len() {
2386 assert!(
2387 !values.value(i).is_empty(),
2388 "Encoded primary key should not be empty"
2389 );
2390 }
2391 }
2392
2393 #[test]
2394 fn test_convert_bulk_part_empty() {
2395 let metadata = metadata_for_test();
2396 let schema = to_flat_sst_arrow_schema(
2397 &metadata,
2398 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2399 );
2400 let primary_key_codec = build_primary_key_codec(&metadata);
2401
2402 let empty_batch = RecordBatch::new_empty(schema.clone());
2404 let empty_part = BulkPart {
2405 batch: empty_batch,
2406 max_timestamp: 0,
2407 min_timestamp: 0,
2408 sequence: 0,
2409 timestamp_index: 0,
2410 raw_data: None,
2411 };
2412
2413 let result =
2414 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2415 assert!(result.is_none());
2416 }
2417
2418 #[test]
2419 fn test_convert_bulk_part_dense_with_pk_columns() {
2420 let metadata = metadata_for_test();
2421 let primary_key_codec = build_primary_key_codec(&metadata);
2422
2423 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2424 "key1", "key2", "key1",
2425 ]));
2426 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2427 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2428 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2429 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2430
2431 let input_schema = Arc::new(Schema::new(vec![
2432 Field::new("k0", ArrowDataType::Utf8, false),
2433 Field::new("k1", ArrowDataType::UInt32, false),
2434 Field::new("v0", ArrowDataType::Int64, true),
2435 Field::new("v1", ArrowDataType::Float64, true),
2436 Field::new(
2437 "ts",
2438 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2439 false,
2440 ),
2441 ]));
2442
2443 let input_batch = RecordBatch::try_new(
2444 input_schema,
2445 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2446 )
2447 .unwrap();
2448
2449 let part = BulkPart {
2450 batch: input_batch,
2451 max_timestamp: 2000,
2452 min_timestamp: 1000,
2453 sequence: 5,
2454 timestamp_index: 4,
2455 raw_data: None,
2456 };
2457
2458 let output_schema = to_flat_sst_arrow_schema(
2459 &metadata,
2460 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2461 );
2462
2463 let result = convert_bulk_part(
2464 part,
2465 &metadata,
2466 primary_key_codec,
2467 output_schema,
2468 true, )
2470 .unwrap();
2471
2472 let converted = result.unwrap();
2473
2474 assert_eq!(converted.num_rows(), 3);
2475 assert_eq!(converted.max_timestamp, 2000);
2476 assert_eq!(converted.min_timestamp, 1000);
2477 assert_eq!(converted.sequence, 5);
2478
2479 let schema = converted.batch.schema();
2480 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2481 assert_eq!(
2482 field_names,
2483 vec![
2484 "k0",
2485 "k1",
2486 "v0",
2487 "v1",
2488 "ts",
2489 "__primary_key",
2490 "__sequence",
2491 "__op_type"
2492 ]
2493 );
2494
2495 let k0_col = converted.batch.column_by_name("k0").unwrap();
2496 assert!(matches!(
2497 k0_col.data_type(),
2498 ArrowDataType::Dictionary(_, _)
2499 ));
2500
2501 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2502 let dict_array = pk_col
2503 .as_any()
2504 .downcast_ref::<DictionaryArray<UInt32Type>>()
2505 .unwrap();
2506 let keys = dict_array.keys();
2507
2508 assert_eq!(keys.len(), 3);
2509 }
2510
2511 #[test]
2512 fn test_convert_bulk_part_dense_without_pk_columns() {
2513 let metadata = metadata_for_test();
2514 let primary_key_codec = build_primary_key_codec(&metadata);
2515
2516 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2518 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2519 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2520 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2521 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2522
2523 let input_schema = Arc::new(Schema::new(vec![
2524 Field::new("k0", ArrowDataType::Utf8, false),
2525 Field::new("k1", ArrowDataType::UInt32, false),
2526 Field::new("v0", ArrowDataType::Int64, true),
2527 Field::new("v1", ArrowDataType::Float64, true),
2528 Field::new(
2529 "ts",
2530 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2531 false,
2532 ),
2533 ]));
2534
2535 let input_batch = RecordBatch::try_new(
2536 input_schema,
2537 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2538 )
2539 .unwrap();
2540
2541 let part = BulkPart {
2542 batch: input_batch,
2543 max_timestamp: 2000,
2544 min_timestamp: 1000,
2545 sequence: 3,
2546 timestamp_index: 4,
2547 raw_data: None,
2548 };
2549
2550 let output_schema = to_flat_sst_arrow_schema(
2551 &metadata,
2552 &FlatSchemaOptions {
2553 raw_pk_columns: false,
2554 string_pk_use_dict: true,
2555 ..Default::default()
2556 },
2557 );
2558
2559 let result = convert_bulk_part(
2560 part,
2561 &metadata,
2562 primary_key_codec,
2563 output_schema,
2564 false, )
2566 .unwrap();
2567
2568 let converted = result.unwrap();
2569
2570 assert_eq!(converted.num_rows(), 2);
2571 assert_eq!(converted.max_timestamp, 2000);
2572 assert_eq!(converted.min_timestamp, 1000);
2573 assert_eq!(converted.sequence, 3);
2574
2575 let schema = converted.batch.schema();
2577 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2578 assert_eq!(
2579 field_names,
2580 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2581 );
2582
2583 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2585 assert!(matches!(
2586 pk_col.data_type(),
2587 ArrowDataType::Dictionary(_, _)
2588 ));
2589 }
2590
2591 #[test]
2592 fn test_convert_bulk_part_sparse_encoding() {
2593 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2594 builder
2595 .push_column_metadata(ColumnMetadata {
2596 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2597 semantic_type: SemanticType::Tag,
2598 column_id: 0,
2599 })
2600 .push_column_metadata(ColumnMetadata {
2601 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2602 semantic_type: SemanticType::Tag,
2603 column_id: 1,
2604 })
2605 .push_column_metadata(ColumnMetadata {
2606 column_schema: ColumnSchema::new(
2607 "ts",
2608 ConcreteDataType::timestamp_millisecond_datatype(),
2609 false,
2610 ),
2611 semantic_type: SemanticType::Timestamp,
2612 column_id: 2,
2613 })
2614 .push_column_metadata(ColumnMetadata {
2615 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2616 semantic_type: SemanticType::Field,
2617 column_id: 3,
2618 })
2619 .push_column_metadata(ColumnMetadata {
2620 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2621 semantic_type: SemanticType::Field,
2622 column_id: 4,
2623 })
2624 .primary_key(vec![0, 1])
2625 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2626 let metadata = Arc::new(builder.build().unwrap());
2627
2628 let primary_key_codec = build_primary_key_codec(&metadata);
2629
2630 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2632 b"encoded_key_1".as_slice(),
2633 b"encoded_key_2".as_slice(),
2634 ]));
2635 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2636 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2637 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2638
2639 let input_schema = Arc::new(Schema::new(vec![
2640 Field::new("__primary_key", ArrowDataType::Binary, false),
2641 Field::new("v0", ArrowDataType::Int64, true),
2642 Field::new("v1", ArrowDataType::Float64, true),
2643 Field::new(
2644 "ts",
2645 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2646 false,
2647 ),
2648 ]));
2649
2650 let input_batch =
2651 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2652 .unwrap();
2653
2654 let part = BulkPart {
2655 batch: input_batch,
2656 max_timestamp: 2000,
2657 min_timestamp: 1000,
2658 sequence: 7,
2659 timestamp_index: 3,
2660 raw_data: None,
2661 };
2662
2663 let output_schema = to_flat_sst_arrow_schema(
2664 &metadata,
2665 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2666 );
2667
2668 let result = convert_bulk_part(
2669 part,
2670 &metadata,
2671 primary_key_codec,
2672 output_schema,
2673 true, )
2675 .unwrap();
2676
2677 let converted = result.unwrap();
2678
2679 assert_eq!(converted.num_rows(), 2);
2680 assert_eq!(converted.max_timestamp, 2000);
2681 assert_eq!(converted.min_timestamp, 1000);
2682 assert_eq!(converted.sequence, 7);
2683
2684 let schema = converted.batch.schema();
2686 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2687 assert_eq!(
2688 field_names,
2689 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2690 );
2691
2692 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2694 assert!(matches!(
2695 pk_col.data_type(),
2696 ArrowDataType::Dictionary(_, _)
2697 ));
2698 }
2699
2700 #[test]
2701 fn test_convert_bulk_part_sorting_with_multiple_series() {
2702 let metadata = metadata_for_test();
2703 let primary_key_codec = build_primary_key_codec(&metadata);
2704
2705 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2707 "series_b", "series_a", "series_b", "series_a",
2708 ]));
2709 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2710 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2711 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2712 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2713 2000, 1000, 4000, 3000,
2714 ]));
2715
2716 let input_schema = Arc::new(Schema::new(vec![
2717 Field::new("k0", ArrowDataType::Utf8, false),
2718 Field::new("k1", ArrowDataType::UInt32, false),
2719 Field::new("v0", ArrowDataType::Int64, true),
2720 Field::new("v1", ArrowDataType::Float64, true),
2721 Field::new(
2722 "ts",
2723 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2724 false,
2725 ),
2726 ]));
2727
2728 let input_batch = RecordBatch::try_new(
2729 input_schema,
2730 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2731 )
2732 .unwrap();
2733
2734 let part = BulkPart {
2735 batch: input_batch,
2736 max_timestamp: 4000,
2737 min_timestamp: 1000,
2738 sequence: 10,
2739 timestamp_index: 4,
2740 raw_data: None,
2741 };
2742
2743 let output_schema = to_flat_sst_arrow_schema(
2744 &metadata,
2745 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2746 );
2747
2748 let result =
2749 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2750
2751 let converted = result.unwrap();
2752
2753 assert_eq!(converted.num_rows(), 4);
2754
2755 let ts_col = converted.batch.column(converted.timestamp_index);
2757 let ts_array = ts_col
2758 .as_any()
2759 .downcast_ref::<TimestampMillisecondArray>()
2760 .unwrap();
2761
2762 let timestamps: Vec<i64> = ts_array.values().to_vec();
2766 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2767 }
2768
2769 fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2771 let metadata = metadata_for_test();
2772 let kvs = inputs
2773 .iter()
2774 .map(|m| {
2775 build_key_values_with_ts_seq_values(
2776 &metadata,
2777 m.k0.to_string(),
2778 m.k1,
2779 m.timestamps.iter().copied(),
2780 m.v1.iter().copied(),
2781 m.sequence,
2782 )
2783 })
2784 .collect::<Vec<_>>();
2785 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2786 let primary_key_codec = build_primary_key_codec(&metadata);
2787 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2788 for kv in kvs {
2789 converter.append_key_values(&kv).unwrap();
2790 }
2791 converter.convert().unwrap()
2792 }
2793
2794 fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2796 let metadata = metadata_for_test();
2797 let mut all_batches = Vec::new();
2798 let mut min_ts = i64::MAX;
2799 let mut max_ts = i64::MIN;
2800 let mut max_seq = 0u64;
2801
2802 for inputs in groups {
2803 let part = build_converted_bulk_part(inputs);
2804 min_ts = min_ts.min(part.min_timestamp);
2805 max_ts = max_ts.max(part.max_timestamp);
2806 max_seq = max_seq.max(part.sequence);
2807 all_batches.push(part.batch);
2808 }
2809
2810 let multi = MultiBulkPart::new(
2811 all_batches,
2812 min_ts,
2813 max_ts,
2814 max_seq,
2815 groups.len(),
2816 &metadata,
2817 );
2818 (multi, metadata)
2819 }
2820
2821 #[test]
2822 fn test_multi_bulk_part_prune_batches() {
2823 let (multi, metadata) = build_multi_bulk_part(&[
2825 &[MutationInput {
2826 k0: "a",
2827 k1: 0,
2828 timestamps: &[1, 2],
2829 v1: &[Some(1.0), Some(2.0)],
2830 sequence: 0,
2831 }],
2832 &[MutationInput {
2833 k0: "m",
2834 k1: 0,
2835 timestamps: &[3, 4],
2836 v1: &[Some(3.0), Some(4.0)],
2837 sequence: 1,
2838 }],
2839 &[MutationInput {
2840 k0: "z",
2841 k1: 0,
2842 timestamps: &[5, 6],
2843 v1: &[Some(5.0), Some(6.0)],
2844 sequence: 2,
2845 }],
2846 ]);
2847 assert_eq!(multi.num_rows(), 6);
2848 assert_eq!(multi.num_batches(), 3);
2849
2850 let context = Arc::new(
2852 BulkIterContext::new(
2853 metadata.clone(),
2854 None,
2855 Some(Predicate::new(vec![
2856 datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2857 ])),
2858 false,
2859 )
2860 .unwrap(),
2861 );
2862 let reader = multi
2863 .read(context, None, None)
2864 .unwrap()
2865 .expect("should have results");
2866 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2867 assert_eq!(total_rows, 2);
2868
2869 let context = Arc::new(
2871 BulkIterContext::new(
2872 metadata.clone(),
2873 None,
2874 Some(Predicate::new(vec![
2875 datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2876 ])),
2877 false,
2878 )
2879 .unwrap(),
2880 );
2881 assert!(multi.read(context, None, None).unwrap().is_none());
2882
2883 let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2885 let reader = multi
2886 .read(context, None, None)
2887 .unwrap()
2888 .expect("should have results");
2889 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2890 assert_eq!(total_rows, 6);
2891 }
2892}