1use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33 Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions};
36use datatypes::arrow::datatypes::{
37 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_json_extension_type;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::schema::ext::ArrowSchemaExt;
43use datatypes::types::JsonType;
44use datatypes::value::ValueRef;
45use datatypes::vectors::Helper;
46use datatypes::vectors::json::array::JsonArray;
47use mito_codec::key_values::{KeyValue, KeyValues};
48use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
49use parquet::arrow::ArrowWriter;
50use parquet::basic::{Compression, ZstdLevel};
51use parquet::file::metadata::ParquetMetaData;
52use parquet::file::properties::WriterProperties;
53use smallvec::SmallVec;
54use snafu::{OptionExt, ResultExt};
55use store_api::codec::PrimaryKeyEncoding;
56use store_api::metadata::{RegionMetadata, RegionMetadataRef};
57use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
58use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
59
60use crate::error::{
61 self, ColumnNotFoundSnafu, ComputeArrowSnafu, ConvertValueSnafu, CreateDefaultSnafu,
62 DataTypeMismatchSnafu, EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu,
63 InvalidRequestSnafu, NewRecordBatchSnafu, Result,
64};
65use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
66use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
67use crate::memtable::time_series::{ValueBuilder, Values};
68use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
69use crate::sst::SeriesEstimator;
70use crate::sst::index::IndexOutput;
71use crate::sst::parquet::flat_format::primary_key_column_index;
72use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
73use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
74
75const INIT_DICT_VALUE_CAPACITY: usize = 8;
76
77#[derive(Clone)]
79pub struct BulkPart {
80 pub batch: RecordBatch,
81 pub max_timestamp: i64,
82 pub min_timestamp: i64,
83 pub sequence: u64,
84 pub timestamp_index: usize,
85 pub raw_data: Option<ArrowIpc>,
86}
87
88impl TryFrom<BulkWalEntry> for BulkPart {
89 type Error = error::Error;
90
91 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
92 match value.body.expect("Entry payload should be present") {
93 Body::ArrowIpc(ipc) => {
94 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
95 .context(error::ConvertBulkWalEntrySnafu)?;
96 let batch = decoder
97 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
98 .context(error::ConvertBulkWalEntrySnafu)?;
99 Ok(Self {
100 batch,
101 max_timestamp: value.max_ts,
102 min_timestamp: value.min_ts,
103 sequence: value.sequence,
104 timestamp_index: value.timestamp_index as usize,
105 raw_data: Some(ipc),
106 })
107 }
108 }
109 }
110}
111
112impl TryFrom<&BulkPart> for BulkWalEntry {
113 type Error = error::Error;
114
115 fn try_from(value: &BulkPart) -> Result<Self> {
116 if let Some(ipc) = &value.raw_data {
117 Ok(BulkWalEntry {
118 sequence: value.sequence,
119 max_ts: value.max_timestamp,
120 min_ts: value.min_timestamp,
121 timestamp_index: value.timestamp_index as u32,
122 body: Some(Body::ArrowIpc(ipc.clone())),
123 })
124 } else {
125 let mut encoder = FlightEncoder::default();
126 let schema_bytes = encoder
127 .encode_schema(value.batch.schema().as_ref())
128 .data_header;
129 let [rb_data] = encoder
130 .encode(FlightMessage::RecordBatch(value.batch.clone()))
131 .try_into()
132 .map_err(|_| {
133 error::UnsupportedOperationSnafu {
134 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
135 }
136 .build()
137 })?;
138 Ok(BulkWalEntry {
139 sequence: value.sequence,
140 max_ts: value.max_timestamp,
141 min_ts: value.min_timestamp,
142 timestamp_index: value.timestamp_index as u32,
143 body: Some(Body::ArrowIpc(ArrowIpc {
144 schema: schema_bytes,
145 data_header: rb_data.data_header,
146 payload: rb_data.data_body,
147 })),
148 })
149 }
150 }
151}
152
153impl BulkPart {
154 pub(crate) fn estimated_size(&self) -> usize {
155 record_batch_estimated_size(&self.batch)
156 }
157
158 pub fn estimated_series_count(&self) -> usize {
161 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
162 let pk_column = self.batch.column(pk_column_idx);
163 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
164 dict_array.values().len()
165 } else {
166 0
167 }
168 }
169
170 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
172 let ts_type = region_metadata.time_index_type();
173 let min_ts = ts_type.create_timestamp(self.min_timestamp);
174 let max_ts = ts_type.create_timestamp(self.max_timestamp);
175
176 MemtableStats {
177 estimated_bytes: self.estimated_size(),
178 time_range: Some((min_ts, max_ts)),
179 num_rows: self.num_rows(),
180 num_ranges: 1,
181 max_sequence: self.sequence,
182 series_count: self.estimated_series_count(),
183 }
184 }
185
186 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
196 let batch_schema = self.batch.schema();
198 let batch_columns: HashSet<_> = batch_schema
199 .fields()
200 .iter()
201 .map(|f| f.name().as_str())
202 .collect();
203
204 let mut columns_to_fill = Vec::new();
206 for column_meta in ®ion_metadata.column_metadatas {
207 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
210 columns_to_fill.push(column_meta);
211 }
212 }
213
214 if columns_to_fill.is_empty() {
215 return Ok(());
216 }
217
218 let num_rows = self.batch.num_rows();
219
220 let mut new_columns = Vec::new();
221 let mut new_fields = Vec::new();
222
223 new_fields.extend(batch_schema.fields().iter().cloned());
225 new_columns.extend_from_slice(self.batch.columns());
226
227 let region_id = region_metadata.region_id;
228 for column_meta in columns_to_fill {
230 let default_vector = column_meta
231 .column_schema
232 .create_default_vector(num_rows)
233 .context(CreateDefaultSnafu {
234 region_id,
235 column: &column_meta.column_schema.name,
236 })?
237 .with_context(|| InvalidRequestSnafu {
238 region_id,
239 reason: format!(
240 "column {} does not have default value",
241 column_meta.column_schema.name
242 ),
243 })?;
244 let arrow_array = default_vector.to_arrow_array();
245 column_meta.column_schema.data_type.as_arrow_type();
246
247 new_fields.push(Arc::new(Field::new(
248 column_meta.column_schema.name.clone(),
249 column_meta.column_schema.data_type.as_arrow_type(),
250 column_meta.column_schema.is_nullable(),
251 )));
252 new_columns.push(arrow_array);
253 }
254
255 let new_schema = Arc::new(Schema::new(new_fields));
257 let new_batch =
258 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
259
260 self.batch = new_batch;
262
263 Ok(())
264 }
265
266 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
268 let vectors = region_metadata
269 .schema
270 .column_schemas()
271 .iter()
272 .map(|col| match self.batch.column_by_name(&col.name) {
273 None => Ok(None),
274 Some(col) => Helper::try_into_vector(col).map(Some),
275 })
276 .collect::<datatypes::error::Result<Vec<_>>>()
277 .context(error::ComputeVectorSnafu)?;
278
279 let rows = (0..self.num_rows())
280 .map(|row_idx| {
281 let values = (0..self.batch.num_columns())
282 .map(|col_idx| {
283 if let Some(v) = &vectors[col_idx] {
284 to_grpc_value(v.get(row_idx))
285 } else {
286 api::v1::Value { value_data: None }
287 }
288 })
289 .collect::<Vec<_>>();
290 api::v1::Row { values }
291 })
292 .collect::<Vec<_>>();
293
294 let schema = region_metadata
295 .column_metadatas
296 .iter()
297 .map(|c| {
298 let data_type_wrapper =
299 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
300 Ok(api::v1::ColumnSchema {
301 column_name: c.column_schema.name.clone(),
302 datatype: data_type_wrapper.datatype() as i32,
303 semantic_type: c.semantic_type as i32,
304 ..Default::default()
305 })
306 })
307 .collect::<api::error::Result<Vec<_>>>()
308 .context(error::ConvertColumnDataTypeSnafu {
309 reason: "failed to convert region metadata to column schema",
310 })?;
311
312 let rows = api::v1::Rows { schema, rows };
313
314 Ok(Mutation {
315 op_type: OpType::Put as i32,
316 sequence: self.sequence,
317 rows: Some(rows),
318 write_hint: None,
319 })
320 }
321
322 pub fn timestamps(&self) -> &ArrayRef {
323 self.batch.column(self.timestamp_index)
324 }
325
326 pub fn num_rows(&self) -> usize {
327 self.batch.num_rows()
328 }
329}
330
331pub struct UnorderedPart {
334 parts: Vec<BulkPart>,
336 total_rows: usize,
338 min_timestamp: i64,
340 max_timestamp: i64,
342 max_sequence: u64,
344 threshold: usize,
346 compact_threshold: usize,
348}
349
350impl Default for UnorderedPart {
351 fn default() -> Self {
352 Self::new()
353 }
354}
355
356impl UnorderedPart {
357 pub fn new() -> Self {
359 Self {
360 parts: Vec::new(),
361 total_rows: 0,
362 min_timestamp: i64::MAX,
363 max_timestamp: i64::MIN,
364 max_sequence: 0,
365 threshold: 1024,
366 compact_threshold: 4096,
367 }
368 }
369
370 pub fn set_threshold(&mut self, threshold: usize) {
372 self.threshold = threshold;
373 }
374
375 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
377 self.compact_threshold = compact_threshold;
378 }
379
380 pub fn threshold(&self) -> usize {
382 self.threshold
383 }
384
385 pub fn compact_threshold(&self) -> usize {
387 self.compact_threshold
388 }
389
390 pub fn should_accept(&self, num_rows: usize) -> bool {
392 num_rows < self.threshold
393 }
394
395 pub fn should_compact(&self) -> bool {
397 self.total_rows >= self.compact_threshold
398 }
399
400 pub fn push(&mut self, part: BulkPart) {
402 self.total_rows += part.num_rows();
403 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
404 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
405 self.max_sequence = self.max_sequence.max(part.sequence);
406 self.parts.push(part);
407 }
408
409 pub fn num_rows(&self) -> usize {
411 self.total_rows
412 }
413
414 pub fn is_empty(&self) -> bool {
416 self.parts.is_empty()
417 }
418
419 pub fn num_parts(&self) -> usize {
421 self.parts.len()
422 }
423
424 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
427 if self.parts.is_empty() {
428 return Ok(None);
429 }
430
431 if self.parts.len() == 1 {
432 return Ok(Some(self.parts[0].batch.clone()));
434 }
435
436 let schema = self.parts[0].batch.schema();
438 let concatenated = if schema.has_json_extension_field() {
439 let (schema, batches) = align_parts(&self.parts)?;
440 arrow::compute::concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
441 } else {
442 arrow::compute::concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
443 .context(ComputeArrowSnafu)?
444 };
445
446 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
448
449 Ok(Some(sorted_batch))
450 }
451
452 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
455 let Some(sorted_batch) = self.concat_and_sort()? else {
456 return Ok(None);
457 };
458
459 let timestamp_index = self.parts[0].timestamp_index;
460
461 Ok(Some(BulkPart {
462 batch: sorted_batch,
463 max_timestamp: self.max_timestamp,
464 min_timestamp: self.min_timestamp,
465 sequence: self.max_sequence,
466 timestamp_index,
467 raw_data: None,
468 }))
469 }
470
471 pub fn clear(&mut self) {
473 self.parts.clear();
474 self.total_rows = 0;
475 self.min_timestamp = i64::MAX;
476 self.max_timestamp = i64::MIN;
477 self.max_sequence = 0;
478 }
479}
480
481fn align_parts(parts: &[BulkPart]) -> Result<(SchemaRef, Vec<RecordBatch>)> {
484 debug_assert!(
485 !parts.is_empty()
486 && parts
487 .windows(2)
488 .all(|w| w[0].batch.schema_ref().fields().len()
489 == w[1].batch.schema_ref().fields().len())
490 );
491
492 let first = &parts[0];
493 let base_schema = first.batch.schema_ref();
494 let rest = &parts[1..];
495
496 let mut merged_types = HashMap::new();
497 let mut aligned_fields = Vec::with_capacity(base_schema.fields().len());
498 for (i, field) in base_schema.fields().iter().enumerate() {
499 if is_json_extension_type(field) {
500 let mut merged = JsonType::from(field.data_type());
501 rest.iter()
502 .try_fold(&mut merged, |acc, x| {
503 acc.merge(&JsonType::from(x.batch.schema_ref().field(i).data_type()))?;
504 Ok(acc)
505 })
506 .context(DataTypeMismatchSnafu)?;
507 merged_types.insert(i, merged.as_arrow_type());
508
509 aligned_fields.push(Arc::new(
510 Field::new(
511 field.name().clone(),
512 merged.as_arrow_type(),
513 field.is_nullable(),
514 )
515 .with_metadata(field.metadata().clone()),
516 ));
517 } else {
518 aligned_fields.push(field.clone())
519 };
520 }
521 let aligned_schema = Arc::new(Schema::new_with_metadata(
522 aligned_fields,
523 base_schema.metadata().clone(),
524 ));
525
526 let mut aligned_batches = Vec::with_capacity(parts.len());
527 for part in parts {
528 let mut columns = Vec::with_capacity(part.batch.num_columns());
529 for (i, column) in part.batch.columns().iter().enumerate() {
530 if let Some(expect) = merged_types.get(&i) {
531 columns.push(
532 JsonArray::from(column)
533 .try_align(expect)
534 .context(ConvertValueSnafu)?,
535 );
536 } else {
537 columns.push(column.clone());
538 }
539 }
540 aligned_batches.push(
541 RecordBatch::try_new(aligned_schema.clone(), columns).context(NewRecordBatchSnafu)?,
542 );
543 }
544
545 Ok((aligned_schema, aligned_batches))
546}
547
548pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
550 batch
551 .columns()
552 .iter()
553 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
555 .sum()
556}
557
558enum PrimaryKeyColumnBuilder {
560 StringDict(StringDictionaryBuilder<UInt32Type>),
562 Vector(Box<dyn MutableVector>),
564}
565
566impl PrimaryKeyColumnBuilder {
567 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
569 match self {
570 PrimaryKeyColumnBuilder::StringDict(builder) => {
571 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
572 builder.append_value(s);
574 } else {
575 builder.append_null();
576 }
577 }
578 PrimaryKeyColumnBuilder::Vector(builder) => {
579 builder.push_value_ref(&value);
580 }
581 }
582 Ok(())
583 }
584
585 fn into_arrow_array(self) -> ArrayRef {
587 match self {
588 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
589 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
590 }
591 }
592}
593
594pub struct BulkPartConverter {
596 schema: SchemaRef,
598 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
600 key_buf: Vec<u8>,
602 key_array_builder: PrimaryKeyArrayBuilder,
604 value_builder: ValueBuilder,
606 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
609
610 max_ts: i64,
612 min_ts: i64,
614 max_sequence: SequenceNumber,
616}
617
618impl BulkPartConverter {
619 pub fn new(
624 region_metadata: &RegionMetadataRef,
625 schema: SchemaRef,
626 capacity: usize,
627 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
628 store_primary_key_columns: bool,
629 ) -> Self {
630 debug_assert_eq!(
631 region_metadata.primary_key_encoding,
632 primary_key_codec.encoding()
633 );
634
635 let primary_key_column_builders = if store_primary_key_columns
636 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
637 {
638 new_primary_key_column_builders(region_metadata, capacity)
639 } else {
640 Vec::new()
641 };
642
643 Self {
644 schema,
645 primary_key_codec,
646 key_buf: Vec::new(),
647 key_array_builder: PrimaryKeyArrayBuilder::new(),
648 value_builder: ValueBuilder::new(region_metadata, capacity),
649 primary_key_column_builders,
650 min_ts: i64::MAX,
651 max_ts: i64::MIN,
652 max_sequence: SequenceNumber::MIN,
653 }
654 }
655
656 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
658 for kv in key_values.iter() {
659 self.append_key_value(&kv)?;
660 }
661
662 Ok(())
663 }
664
665 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
669 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
671 let mut primary_keys = kv.primary_keys();
674 if let Some(encoded) = primary_keys
675 .next()
676 .context(ColumnNotFoundSnafu {
677 column: PRIMARY_KEY_COLUMN_NAME,
678 })?
679 .try_into_binary()
680 .context(DataTypeMismatchSnafu)?
681 {
682 self.key_array_builder
683 .append(encoded)
684 .context(ComputeArrowSnafu)?;
685 } else {
686 self.key_array_builder
687 .append("")
688 .context(ComputeArrowSnafu)?;
689 }
690 } else {
691 self.key_buf.clear();
693 self.primary_key_codec
694 .encode_key_value(kv, &mut self.key_buf)
695 .context(EncodeSnafu)?;
696 self.key_array_builder
697 .append(&self.key_buf)
698 .context(ComputeArrowSnafu)?;
699 };
700
701 if !self.primary_key_column_builders.is_empty() {
703 for (builder, pk_value) in self
704 .primary_key_column_builders
705 .iter_mut()
706 .zip(kv.primary_keys())
707 {
708 builder.push_value_ref(pk_value)?;
709 }
710 }
711
712 self.value_builder.push(
714 kv.timestamp(),
715 kv.sequence(),
716 kv.op_type() as u8,
717 kv.fields(),
718 );
719
720 let ts = kv
723 .timestamp()
724 .try_into_timestamp()
725 .unwrap()
726 .unwrap()
727 .value();
728 self.min_ts = self.min_ts.min(ts);
729 self.max_ts = self.max_ts.max(ts);
730 self.max_sequence = self.max_sequence.max(kv.sequence());
731
732 Ok(())
733 }
734
735 pub fn convert(mut self) -> Result<BulkPart> {
739 let values = Values::from(self.value_builder);
740 let mut columns =
741 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
742
743 for builder in self.primary_key_column_builders {
745 columns.push(builder.into_arrow_array());
746 }
747 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
749 let timestamp_index = columns.len();
751 columns.push(values.timestamp.to_arrow_array());
752 let pk_array = self.key_array_builder.finish();
754 columns.push(Arc::new(pk_array));
755 columns.push(values.sequence.to_arrow_array());
757 columns.push(values.op_type.to_arrow_array());
758
759 let schema = align_schema_with_json_array(self.schema, &columns);
762 let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
763 let batch = sort_primary_key_record_batch(&batch)?;
765
766 Ok(BulkPart {
767 batch,
768 max_timestamp: self.max_ts,
769 min_timestamp: self.min_ts,
770 sequence: self.max_sequence,
771 timestamp_index,
772 raw_data: None,
773 })
774 }
775}
776
777fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
778 if schema.fields().iter().all(|f| !is_json_extension_type(f)) {
779 return schema;
780 }
781
782 let mut fields = Vec::with_capacity(schema.fields().len());
783 for (field, array) in schema.fields().iter().zip(columns) {
784 if !is_json_extension_type(field) {
785 fields.push(field.clone());
786 continue;
787 }
788
789 let mut field = field.as_ref().clone();
790 field.set_data_type(array.data_type().clone());
791 fields.push(Arc::new(field));
792 }
793
794 Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
795}
796
797fn new_primary_key_column_builders(
798 metadata: &RegionMetadata,
799 capacity: usize,
800) -> Vec<PrimaryKeyColumnBuilder> {
801 metadata
802 .primary_key_columns()
803 .map(|col| {
804 if col.column_schema.data_type.is_string() {
805 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
806 capacity,
807 INIT_DICT_VALUE_CAPACITY,
808 capacity,
809 ))
810 } else {
811 PrimaryKeyColumnBuilder::Vector(
812 col.column_schema.data_type.create_mutable_vector(capacity),
813 )
814 }
815 })
816 .collect()
817}
818
819pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
821 let total_columns = batch.num_columns();
822 let sort_columns = vec![
823 SortColumn {
825 values: batch.column(total_columns - 3).clone(),
826 options: Some(SortOptions {
827 descending: false,
828 nulls_first: true,
829 }),
830 },
831 SortColumn {
833 values: batch.column(total_columns - 4).clone(),
834 options: Some(SortOptions {
835 descending: false,
836 nulls_first: true,
837 }),
838 },
839 SortColumn {
841 values: batch.column(total_columns - 2).clone(),
842 options: Some(SortOptions {
843 descending: true,
844 nulls_first: true,
845 }),
846 },
847 ];
848
849 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
850 .context(ComputeArrowSnafu)?;
851
852 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
853}
854
855pub fn convert_bulk_part(
881 part: BulkPart,
882 region_metadata: &RegionMetadataRef,
883 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
884 schema: SchemaRef,
885 store_primary_key_columns: bool,
886) -> Result<Option<BulkPart>> {
887 if part.num_rows() == 0 {
888 return Ok(None);
889 }
890
891 let num_rows = part.num_rows();
892 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
893
894 let input_schema = part.batch.schema();
896 let column_indices: HashMap<&str, usize> = input_schema
897 .fields()
898 .iter()
899 .enumerate()
900 .map(|(idx, field)| (field.name().as_str(), idx))
901 .collect();
902
903 let mut output_columns = Vec::new();
905
906 let pk_array = if is_sparse {
908 None
911 } else {
912 let pk_vectors: Result<Vec<_>> = region_metadata
914 .primary_key_columns()
915 .map(|col_meta| {
916 let col_idx = column_indices
917 .get(col_meta.column_schema.name.as_str())
918 .context(ColumnNotFoundSnafu {
919 column: &col_meta.column_schema.name,
920 })?;
921 let col = part.batch.column(*col_idx);
922 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
923 })
924 .collect();
925 let pk_vectors = pk_vectors?;
926
927 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
928 let mut encode_buf = Vec::new();
929
930 for row_idx in 0..num_rows {
931 encode_buf.clear();
932
933 let pk_values_with_ids: Vec<_> = region_metadata
935 .primary_key
936 .iter()
937 .zip(pk_vectors.iter())
938 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
939 .collect();
940
941 primary_key_codec
943 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
944 .context(EncodeSnafu)?;
945
946 key_array_builder
947 .append(&encode_buf)
948 .context(ComputeArrowSnafu)?;
949 }
950
951 Some(key_array_builder.finish())
952 };
953
954 if store_primary_key_columns && !is_sparse {
956 for col_meta in region_metadata.primary_key_columns() {
957 let col_idx = column_indices
958 .get(col_meta.column_schema.name.as_str())
959 .context(ColumnNotFoundSnafu {
960 column: &col_meta.column_schema.name,
961 })?;
962 let col = part.batch.column(*col_idx);
963
964 let col = if col_meta.column_schema.data_type.is_string() {
966 let target_type = ArrowDataType::Dictionary(
967 Box::new(ArrowDataType::UInt32),
968 Box::new(ArrowDataType::Utf8),
969 );
970 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
971 } else {
972 col.clone()
973 };
974 output_columns.push(col);
975 }
976 }
977
978 for col_meta in region_metadata.field_columns() {
980 let col_idx = column_indices
981 .get(col_meta.column_schema.name.as_str())
982 .context(ColumnNotFoundSnafu {
983 column: &col_meta.column_schema.name,
984 })?;
985 output_columns.push(part.batch.column(*col_idx).clone());
986 }
987
988 let new_timestamp_index = output_columns.len();
990 let ts_col_idx = column_indices
991 .get(
992 region_metadata
993 .time_index_column()
994 .column_schema
995 .name
996 .as_str(),
997 )
998 .context(ColumnNotFoundSnafu {
999 column: ®ion_metadata.time_index_column().column_schema.name,
1000 })?;
1001 output_columns.push(part.batch.column(*ts_col_idx).clone());
1002
1003 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
1005 Arc::new(pk_dict_array) as ArrayRef
1006 } else {
1007 let pk_col_idx =
1008 column_indices
1009 .get(PRIMARY_KEY_COLUMN_NAME)
1010 .context(ColumnNotFoundSnafu {
1011 column: PRIMARY_KEY_COLUMN_NAME,
1012 })?;
1013 let col = part.batch.column(*pk_col_idx);
1014
1015 let target_type = ArrowDataType::Dictionary(
1017 Box::new(ArrowDataType::UInt32),
1018 Box::new(ArrowDataType::Binary),
1019 );
1020 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
1021 };
1022 output_columns.push(pk_dictionary);
1023
1024 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
1025 output_columns.push(Arc::new(sequence_array) as ArrayRef);
1026
1027 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
1028 output_columns.push(Arc::new(op_type_array) as ArrayRef);
1029
1030 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
1031
1032 let sorted_batch = sort_primary_key_record_batch(&batch)?;
1034
1035 Ok(Some(BulkPart {
1036 batch: sorted_batch,
1037 max_timestamp: part.max_timestamp,
1038 min_timestamp: part.min_timestamp,
1039 sequence: part.sequence,
1040 timestamp_index: new_timestamp_index,
1041 raw_data: None,
1042 }))
1043}
1044
1045#[derive(Debug, Clone)]
1046pub struct EncodedBulkPart {
1047 data: Bytes,
1048 metadata: BulkPartMeta,
1049}
1050
1051impl EncodedBulkPart {
1052 pub fn new(data: Bytes, metadata: BulkPartMeta) -> Self {
1053 Self { data, metadata }
1054 }
1055
1056 pub fn metadata(&self) -> &BulkPartMeta {
1057 &self.metadata
1058 }
1059
1060 pub(crate) fn size_bytes(&self) -> usize {
1062 self.data.len()
1063 }
1064
1065 pub fn data(&self) -> &Bytes {
1067 &self.data
1068 }
1069
1070 pub fn to_memtable_stats(&self) -> MemtableStats {
1072 let meta = &self.metadata;
1073 let ts_type = meta.region_metadata.time_index_type();
1074 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1075 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1076
1077 MemtableStats {
1078 estimated_bytes: self.size_bytes(),
1079 time_range: Some((min_ts, max_ts)),
1080 num_rows: meta.num_rows,
1081 num_ranges: 1,
1082 max_sequence: meta.max_sequence,
1083 series_count: meta.num_series as usize,
1084 }
1085 }
1086
1087 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1095 let unit = self.metadata.region_metadata.time_index_type().unit();
1096 let max_row_group_uncompressed_size: u64 = self
1097 .metadata
1098 .parquet_metadata
1099 .row_groups()
1100 .iter()
1101 .map(|rg| {
1102 rg.columns()
1103 .iter()
1104 .map(|c| c.uncompressed_size() as u64)
1105 .sum::<u64>()
1106 })
1107 .max()
1108 .unwrap_or(0);
1109 SstInfo {
1110 file_id,
1111 time_range: (
1112 Timestamp::new(self.metadata.min_timestamp, unit),
1113 Timestamp::new(self.metadata.max_timestamp, unit),
1114 ),
1115 file_size: self.data.len() as u64,
1116 max_row_group_uncompressed_size,
1117 num_rows: self.metadata.num_rows,
1118 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1119 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1120 index_metadata: IndexOutput::default(),
1121 num_series: self.metadata.num_series,
1122 }
1123 }
1124
1125 pub(crate) fn read(
1126 &self,
1127 context: BulkIterContextRef,
1128 sequence: Option<SequenceRange>,
1129 mem_scan_metrics: Option<MemScanMetrics>,
1130 ) -> Result<Option<BoxedRecordBatchIterator>> {
1131 let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1133
1134 let row_groups_to_read =
1136 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1137
1138 if row_groups_to_read.is_empty() {
1139 return Ok(None);
1141 }
1142
1143 let iter = EncodedBulkPartIter::try_new(
1144 self,
1145 context,
1146 row_groups_to_read,
1147 sequence,
1148 mem_scan_metrics,
1149 )?;
1150 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1151 }
1152}
1153
1154#[derive(Debug, Clone)]
1156pub struct BulkPartMeta {
1157 pub num_rows: usize,
1159 pub max_timestamp: i64,
1161 pub min_timestamp: i64,
1163 pub parquet_metadata: Arc<ParquetMetaData>,
1165 pub region_metadata: RegionMetadataRef,
1167 pub num_series: u64,
1169 pub max_sequence: u64,
1171}
1172
1173#[derive(Default, Debug)]
1175pub struct BulkPartEncodeMetrics {
1176 pub iter_cost: Duration,
1178 pub write_cost: Duration,
1180 pub raw_size: usize,
1182 pub encoded_size: usize,
1184 pub num_rows: usize,
1186}
1187
1188pub struct BulkPartEncoder {
1189 metadata: RegionMetadataRef,
1190 writer_props: Option<WriterProperties>,
1191}
1192
1193impl BulkPartEncoder {
1194 pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1195 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1197 let key_value_meta =
1198 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1199
1200 let writer_props = Some(
1202 WriterProperties::builder()
1203 .set_key_value_metadata(Some(vec![key_value_meta]))
1204 .set_write_batch_size(row_group_size)
1205 .set_max_row_group_size(row_group_size)
1206 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1207 .set_column_index_truncate_length(None)
1208 .set_statistics_truncate_length(None)
1209 .build(),
1210 );
1211
1212 Ok(Self {
1213 metadata,
1214 writer_props,
1215 })
1216 }
1217}
1218
1219impl BulkPartEncoder {
1220 pub fn encode_record_batch_iter(
1222 &self,
1223 iter: BoxedRecordBatchIterator,
1224 arrow_schema: SchemaRef,
1225 min_timestamp: i64,
1226 max_timestamp: i64,
1227 max_sequence: u64,
1228 metrics: &mut BulkPartEncodeMetrics,
1229 ) -> Result<Option<EncodedBulkPart>> {
1230 let mut buf = Vec::with_capacity(4096);
1231 let mut writer = ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1232 .context(EncodeMemtableSnafu)?;
1233 let mut total_rows = 0;
1234 let mut series_estimator = SeriesEstimator::default();
1235
1236 let mut iter_start = Instant::now();
1238 for batch_result in iter {
1239 metrics.iter_cost += iter_start.elapsed();
1240 let batch = batch_result?;
1241 if batch.num_rows() == 0 {
1242 continue;
1243 }
1244
1245 series_estimator.update_flat(&batch);
1246 metrics.raw_size += record_batch_estimated_size(&batch);
1247 let write_start = Instant::now();
1248 writer.write(&batch).context(EncodeMemtableSnafu)?;
1249 metrics.write_cost += write_start.elapsed();
1250 total_rows += batch.num_rows();
1251 iter_start = Instant::now();
1252 }
1253 metrics.iter_cost += iter_start.elapsed();
1254
1255 if total_rows == 0 {
1256 return Ok(None);
1257 }
1258
1259 let close_start = Instant::now();
1260 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1261 metrics.write_cost += close_start.elapsed();
1262 metrics.encoded_size += buf.len();
1263 metrics.num_rows += total_rows;
1264
1265 let buf = Bytes::from(buf);
1266 let parquet_metadata = Arc::new(file_metadata);
1267 let num_series = series_estimator.finish();
1268
1269 Ok(Some(EncodedBulkPart {
1270 data: buf,
1271 metadata: BulkPartMeta {
1272 num_rows: total_rows,
1273 max_timestamp,
1274 min_timestamp,
1275 parquet_metadata,
1276 region_metadata: self.metadata.clone(),
1277 num_series,
1278 max_sequence,
1279 },
1280 }))
1281 }
1282
1283 pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1285 if part.batch.num_rows() == 0 {
1286 return Ok(None);
1287 }
1288
1289 let mut buf = Vec::with_capacity(4096);
1290 let arrow_schema = part.batch.schema();
1291
1292 let file_metadata = {
1293 let mut writer =
1294 ArrowWriter::try_new(&mut buf, arrow_schema, self.writer_props.clone())
1295 .context(EncodeMemtableSnafu)?;
1296 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1297 writer.finish().context(EncodeMemtableSnafu)?
1298 };
1299
1300 let buf = Bytes::from(buf);
1301 let parquet_metadata = Arc::new(file_metadata);
1302
1303 Ok(Some(EncodedBulkPart {
1304 data: buf,
1305 metadata: BulkPartMeta {
1306 num_rows: part.batch.num_rows(),
1307 max_timestamp: part.max_timestamp,
1308 min_timestamp: part.min_timestamp,
1309 parquet_metadata,
1310 region_metadata: self.metadata.clone(),
1311 num_series: part.estimated_series_count() as u64,
1312 max_sequence: part.sequence,
1313 },
1314 }))
1315 }
1316}
1317
1318#[derive(Debug, Clone)]
1324struct BatchStats {
1325 num_batches: usize,
1327 first_tag_id: ColumnId,
1329 min_values: ArrayRef,
1331 max_values: ArrayRef,
1333}
1334
1335impl BatchStats {
1336 fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1341 let first_tag_id = *metadata.primary_key.first()?;
1346 let first_tag_column = metadata.column_by_id(first_tag_id)?;
1347 let data_type = &first_tag_column.column_schema.data_type;
1348
1349 let converter = build_primary_key_codec_with_fields(
1350 metadata.primary_key_encoding,
1351 [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1352 );
1353 let pk_index = primary_key_column_index(batches.first()?.num_columns());
1354
1355 let mut min_builder = data_type.create_mutable_vector(batches.len());
1356 let mut max_builder = data_type.create_mutable_vector(batches.len());
1357
1358 for batch in batches {
1359 match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1360 Some((min_val, max_val)) => {
1361 min_builder.push_value_ref(&min_val.as_value_ref());
1362 max_builder.push_value_ref(&max_val.as_value_ref());
1363 }
1364 None => {
1365 min_builder.push_null();
1366 max_builder.push_null();
1367 }
1368 }
1369 }
1370
1371 Some(Self {
1372 num_batches: batches.len(),
1373 first_tag_id,
1374 min_values: min_builder.to_vector().to_arrow_array(),
1375 max_values: max_builder.to_vector().to_arrow_array(),
1376 })
1377 }
1378
1379 fn extract_first_tag_bounds(
1381 batch: &RecordBatch,
1382 pk_index: usize,
1383 converter: &dyn PrimaryKeyCodec,
1384 ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1385 if batch.num_rows() == 0 {
1386 return None;
1387 }
1388
1389 let pk_dict = batch
1390 .column(pk_index)
1391 .as_any()
1392 .downcast_ref::<PrimaryKeyArray>()?;
1393 let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1394
1395 let keys = pk_dict.keys();
1396 let min_key = keys.value(0);
1397 let max_key = keys.value(batch.num_rows() - 1);
1398 let min_bytes = pk_values.value(min_key as usize);
1399 let max_bytes = pk_values.value(max_key as usize);
1400
1401 Some((
1402 converter.decode_leftmost(min_bytes).ok()??,
1403 converter.decode_leftmost(max_bytes).ok()??,
1404 ))
1405 }
1406}
1407
1408struct BatchPruningStats<'a> {
1413 stats: &'a BatchStats,
1414 metadata: &'a RegionMetadataRef,
1415}
1416
1417impl PruningStatistics for BatchPruningStats<'_> {
1418 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1419 let col = self.metadata.column_by_name(&column.name)?;
1420 if col.column_id == self.stats.first_tag_id {
1421 Some(self.stats.min_values.clone())
1422 } else {
1423 None
1424 }
1425 }
1426
1427 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1428 let col = self.metadata.column_by_name(&column.name)?;
1429 if col.column_id == self.stats.first_tag_id {
1430 Some(self.stats.max_values.clone())
1431 } else {
1432 None
1433 }
1434 }
1435
1436 fn num_containers(&self) -> usize {
1437 self.stats.num_batches
1438 }
1439
1440 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1441 None
1442 }
1443
1444 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1445 None
1446 }
1447
1448 fn contained(
1449 &self,
1450 _column: &Column,
1451 _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1452 ) -> Option<BooleanArray> {
1453 None
1454 }
1455}
1456
1457fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1459 let mut columns = HashSet::new();
1460 for expr in predicate.exprs() {
1461 let _ = expr_to_columns(expr, &mut columns);
1462 }
1463 columns.iter().any(|col| col.name == column_name)
1464}
1465
1466pub(crate) fn should_prune_bulk_part(
1470 batch: &RecordBatch,
1471 context: &BulkIterContext,
1472 metadata: &RegionMetadata,
1473) -> bool {
1474 let predicate = match &context.predicate {
1475 Some(p) => p,
1476 None => return false,
1477 };
1478 let first_tag_id = match metadata.primary_key.first() {
1481 Some(id) => *id,
1482 None => return false,
1483 };
1484 let first_tag_name = &metadata
1486 .column_by_id(first_tag_id)
1487 .unwrap()
1488 .column_schema
1489 .name;
1490 if !predicate_references_column(predicate, first_tag_name) {
1491 return false;
1492 }
1493 let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1494 Some(s) => s,
1495 None => return false,
1496 };
1497 let region_meta = context.read_format().metadata();
1498 let pruning_stats = BatchPruningStats {
1499 stats: &stats,
1500 metadata: region_meta,
1501 };
1502 let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1503 !mask.first().copied().unwrap_or(true)
1504}
1505
1506#[derive(Debug, Clone)]
1512pub struct MultiBulkPart {
1513 batches: SmallVec<[RecordBatch; 4]>,
1515 total_rows: usize,
1517 max_timestamp: i64,
1519 min_timestamp: i64,
1521 max_sequence: SequenceNumber,
1523 series_count: usize,
1525 batch_stats: Option<BatchStats>,
1528}
1529
1530impl MultiBulkPart {
1531 pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1533 let num_rows = part.num_rows();
1534 let series_count = part.estimated_series_count();
1535 let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1536 let mut batches = SmallVec::new();
1537 batches.push(part.batch);
1538
1539 Self {
1540 batches,
1541 total_rows: num_rows,
1542 max_timestamp: part.max_timestamp,
1543 min_timestamp: part.min_timestamp,
1544 max_sequence: part.sequence,
1545 series_count,
1546 batch_stats,
1547 }
1548 }
1549
1550 pub fn new(
1563 batches: Vec<RecordBatch>,
1564 min_timestamp: i64,
1565 max_timestamp: i64,
1566 max_sequence: SequenceNumber,
1567 series_count: usize,
1568 metadata: &RegionMetadata,
1569 ) -> Self {
1570 assert!(!batches.is_empty(), "batches must not be empty");
1571
1572 let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1573 let batch_stats = BatchStats::compute(&batches, metadata);
1574
1575 Self {
1576 batches: SmallVec::from_vec(batches),
1577 total_rows,
1578 max_timestamp,
1579 min_timestamp,
1580 max_sequence,
1581 series_count,
1582 batch_stats,
1583 }
1584 }
1585
1586 pub fn num_rows(&self) -> usize {
1588 self.total_rows
1589 }
1590
1591 pub fn min_timestamp(&self) -> i64 {
1593 self.min_timestamp
1594 }
1595
1596 pub fn max_timestamp(&self) -> i64 {
1598 self.max_timestamp
1599 }
1600
1601 pub fn max_sequence(&self) -> SequenceNumber {
1603 self.max_sequence
1604 }
1605
1606 pub fn series_count(&self) -> usize {
1608 self.series_count
1609 }
1610
1611 pub fn num_batches(&self) -> usize {
1613 self.batches.len()
1614 }
1615
1616 pub(crate) fn estimated_size(&self) -> usize {
1618 self.batches.iter().map(record_batch_estimated_size).sum()
1619 }
1620
1621 pub(crate) fn read(
1627 &self,
1628 context: BulkIterContextRef,
1629 sequence: Option<SequenceRange>,
1630 mem_scan_metrics: Option<MemScanMetrics>,
1631 ) -> Result<Option<BoxedRecordBatchIterator>> {
1632 if self.batches.is_empty() {
1633 return Ok(None);
1634 }
1635
1636 let batches_to_read = self.prune_batches(&context);
1637
1638 if batches_to_read.is_empty() {
1639 return Ok(None);
1640 }
1641
1642 let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1643 batches_to_read,
1644 context,
1645 sequence,
1646 self.series_count,
1647 mem_scan_metrics,
1648 );
1649 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1650 }
1651
1652 fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1655 if let Some(stats) = &self.batch_stats
1656 && let Some(predicate) = &context.predicate
1657 {
1658 let region_meta = context.read_format().metadata();
1659 let pruning_stats = BatchPruningStats {
1660 stats,
1661 metadata: region_meta,
1662 };
1663 let mask =
1664 predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1665 self.batches
1666 .iter()
1667 .zip(mask.iter())
1668 .filter_map(
1669 |(batch, &selected)| {
1670 if selected { Some(batch.clone()) } else { None }
1671 },
1672 )
1673 .collect()
1674 } else {
1675 self.batches.iter().cloned().collect()
1676 }
1677 }
1678
1679 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1681 let ts_type = region_metadata.time_index_type();
1682 let min_ts = ts_type.create_timestamp(self.min_timestamp);
1683 let max_ts = ts_type.create_timestamp(self.max_timestamp);
1684
1685 MemtableStats {
1686 estimated_bytes: self.estimated_size(),
1687 time_range: Some((min_ts, max_ts)),
1688 num_rows: self.num_rows(),
1689 num_ranges: 1,
1690 max_sequence: self.max_sequence,
1691 series_count: self.series_count,
1692 }
1693 }
1694}
1695
1696#[cfg(test)]
1697mod tests {
1698 use api::v1::{Row, SemanticType, WriteHint};
1699 use datafusion_common::ScalarValue;
1700 use datatypes::arrow::array::{
1701 BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1702 };
1703 use datatypes::arrow::datatypes::UInt32Type;
1704 use datatypes::prelude::{ConcreteDataType, Value};
1705 use datatypes::schema::ColumnSchema;
1706 use mito_codec::row_converter::build_primary_key_codec;
1707 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1708 use store_api::storage::RegionId;
1709 use store_api::storage::consts::ReservedColumnId;
1710 use table::predicate::Predicate;
1711
1712 use super::*;
1713 use crate::memtable::bulk::context::BulkIterContext;
1714 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1715 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1716
1717 struct MutationInput<'a> {
1718 k0: &'a str,
1719 k1: u32,
1720 timestamps: &'a [i64],
1721 v1: &'a [Option<f64>],
1722 sequence: u64,
1723 }
1724
1725 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1726 let metadata = metadata_for_test();
1727 let kvs = input
1728 .iter()
1729 .map(|m| {
1730 build_key_values_with_ts_seq_values(
1731 &metadata,
1732 m.k0.to_string(),
1733 m.k1,
1734 m.timestamps.iter().copied(),
1735 m.v1.iter().copied(),
1736 m.sequence,
1737 )
1738 })
1739 .collect::<Vec<_>>();
1740 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1741 let primary_key_codec = build_primary_key_codec(&metadata);
1742 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1743 for kv in kvs {
1744 converter.append_key_values(&kv).unwrap();
1745 }
1746 let part = converter.convert().unwrap();
1747 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1748 encoder.encode_part(&part).unwrap().unwrap()
1749 }
1750
1751 #[test]
1752 fn test_write_and_read_part_projection() {
1753 let part = encode(&[
1754 MutationInput {
1755 k0: "a",
1756 k1: 0,
1757 timestamps: &[1],
1758 v1: &[Some(0.1)],
1759 sequence: 0,
1760 },
1761 MutationInput {
1762 k0: "b",
1763 k1: 0,
1764 timestamps: &[1],
1765 v1: &[Some(0.0)],
1766 sequence: 0,
1767 },
1768 MutationInput {
1769 k0: "a",
1770 k1: 0,
1771 timestamps: &[2],
1772 v1: &[Some(0.2)],
1773 sequence: 1,
1774 },
1775 ]);
1776
1777 let projection = &[4u32];
1778 let reader = part
1779 .read(
1780 Arc::new(
1781 BulkIterContext::new(
1782 part.metadata.region_metadata.clone(),
1783 Some(projection.as_slice()),
1784 None,
1785 false,
1786 )
1787 .unwrap(),
1788 ),
1789 None,
1790 None,
1791 )
1792 .unwrap()
1793 .expect("expect at least one row group");
1794
1795 let mut total_rows_read = 0;
1796 let mut field: Vec<f64> = vec![];
1797 for res in reader {
1798 let batch = res.unwrap();
1799 assert_eq!(5, batch.num_columns());
1800 field.extend_from_slice(
1801 batch
1802 .column(0)
1803 .as_any()
1804 .downcast_ref::<Float64Array>()
1805 .unwrap()
1806 .values(),
1807 );
1808 total_rows_read += batch.num_rows();
1809 }
1810 assert_eq!(3, total_rows_read);
1811 assert_eq!(vec![0.1, 0.2, 0.0], field);
1812 }
1813
1814 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1815 let metadata = metadata_for_test();
1816 let kvs = key_values
1817 .into_iter()
1818 .map(|(k0, k1, (start, end), sequence)| {
1819 let ts = start..end;
1820 let v1 = (start..end).map(|_| None);
1821 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1822 })
1823 .collect::<Vec<_>>();
1824 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1825 let primary_key_codec = build_primary_key_codec(&metadata);
1826 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1827 for kv in kvs {
1828 converter.append_key_values(&kv).unwrap();
1829 }
1830 let part = converter.convert().unwrap();
1831 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1832 encoder.encode_part(&part).unwrap().unwrap()
1833 }
1834
1835 fn check_prune_row_group(
1836 part: &EncodedBulkPart,
1837 predicate: Option<Predicate>,
1838 expected_rows: usize,
1839 ) {
1840 let context = Arc::new(
1841 BulkIterContext::new(
1842 part.metadata.region_metadata.clone(),
1843 None,
1844 predicate,
1845 false,
1846 )
1847 .unwrap(),
1848 );
1849 let reader = part
1850 .read(context, None, None)
1851 .unwrap()
1852 .expect("expect at least one row group");
1853 let mut total_rows_read = 0;
1854 for res in reader {
1855 let batch = res.unwrap();
1856 total_rows_read += batch.num_rows();
1857 }
1858 assert_eq!(expected_rows, total_rows_read);
1860 }
1861
1862 #[test]
1863 fn test_prune_row_groups() {
1864 let part = prepare(vec![
1865 ("a", 0, (0, 40), 1),
1866 ("a", 1, (0, 60), 1),
1867 ("b", 0, (0, 100), 2),
1868 ("b", 1, (100, 180), 3),
1869 ("b", 1, (180, 210), 4),
1870 ]);
1871
1872 let context = Arc::new(
1873 BulkIterContext::new(
1874 part.metadata.region_metadata.clone(),
1875 None,
1876 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1877 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1878 )])),
1879 false,
1880 )
1881 .unwrap(),
1882 );
1883 assert!(part.read(context, None, None).unwrap().is_none());
1884
1885 check_prune_row_group(&part, None, 310);
1886
1887 check_prune_row_group(
1888 &part,
1889 Some(Predicate::new(vec![
1890 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1891 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1892 ])),
1893 40,
1894 );
1895
1896 check_prune_row_group(
1897 &part,
1898 Some(Predicate::new(vec![
1899 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1900 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1901 ])),
1902 60,
1903 );
1904
1905 check_prune_row_group(
1906 &part,
1907 Some(Predicate::new(vec![
1908 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1909 ])),
1910 100,
1911 );
1912
1913 check_prune_row_group(
1914 &part,
1915 Some(Predicate::new(vec![
1916 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1917 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1918 ])),
1919 100,
1920 );
1921
1922 check_prune_row_group(
1924 &part,
1925 Some(Predicate::new(vec![
1926 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1927 ])),
1928 1,
1929 );
1930 }
1931
1932 #[test]
1933 fn test_bulk_part_converter_append_and_convert() {
1934 let metadata = metadata_for_test();
1935 let capacity = 100;
1936 let primary_key_codec = build_primary_key_codec(&metadata);
1937 let schema = to_flat_sst_arrow_schema(
1938 &metadata,
1939 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1940 );
1941
1942 let mut converter =
1943 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1944
1945 let key_values1 = build_key_values_with_ts_seq_values(
1946 &metadata,
1947 "key1".to_string(),
1948 1u32,
1949 vec![1000, 2000].into_iter(),
1950 vec![Some(1.0), Some(2.0)].into_iter(),
1951 1,
1952 );
1953
1954 let key_values2 = build_key_values_with_ts_seq_values(
1955 &metadata,
1956 "key2".to_string(),
1957 2u32,
1958 vec![1500].into_iter(),
1959 vec![Some(3.0)].into_iter(),
1960 2,
1961 );
1962
1963 converter.append_key_values(&key_values1).unwrap();
1964 converter.append_key_values(&key_values2).unwrap();
1965
1966 let bulk_part = converter.convert().unwrap();
1967
1968 assert_eq!(bulk_part.num_rows(), 3);
1969 assert_eq!(bulk_part.min_timestamp, 1000);
1970 assert_eq!(bulk_part.max_timestamp, 2000);
1971 assert_eq!(bulk_part.sequence, 2);
1972 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1973
1974 let schema = bulk_part.batch.schema();
1977 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1978 assert_eq!(
1979 field_names,
1980 vec![
1981 "k0",
1982 "k1",
1983 "v0",
1984 "v1",
1985 "ts",
1986 "__primary_key",
1987 "__sequence",
1988 "__op_type"
1989 ]
1990 );
1991 }
1992
1993 #[test]
1994 fn test_bulk_part_converter_sorting() {
1995 let metadata = metadata_for_test();
1996 let capacity = 100;
1997 let primary_key_codec = build_primary_key_codec(&metadata);
1998 let schema = to_flat_sst_arrow_schema(
1999 &metadata,
2000 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2001 );
2002
2003 let mut converter =
2004 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2005
2006 let key_values1 = build_key_values_with_ts_seq_values(
2007 &metadata,
2008 "z_key".to_string(),
2009 3u32,
2010 vec![3000].into_iter(),
2011 vec![Some(3.0)].into_iter(),
2012 3,
2013 );
2014
2015 let key_values2 = build_key_values_with_ts_seq_values(
2016 &metadata,
2017 "a_key".to_string(),
2018 1u32,
2019 vec![1000].into_iter(),
2020 vec![Some(1.0)].into_iter(),
2021 1,
2022 );
2023
2024 let key_values3 = build_key_values_with_ts_seq_values(
2025 &metadata,
2026 "m_key".to_string(),
2027 2u32,
2028 vec![2000].into_iter(),
2029 vec![Some(2.0)].into_iter(),
2030 2,
2031 );
2032
2033 converter.append_key_values(&key_values1).unwrap();
2034 converter.append_key_values(&key_values2).unwrap();
2035 converter.append_key_values(&key_values3).unwrap();
2036
2037 let bulk_part = converter.convert().unwrap();
2038
2039 assert_eq!(bulk_part.num_rows(), 3);
2040
2041 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
2042 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
2043
2044 let ts_array = ts_column
2045 .as_any()
2046 .downcast_ref::<TimestampMillisecondArray>()
2047 .unwrap();
2048 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2049
2050 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2051 assert_eq!(seq_array.values(), &[1, 2, 3]);
2052
2053 let schema = bulk_part.batch.schema();
2055 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2056 assert_eq!(
2057 field_names,
2058 vec![
2059 "k0",
2060 "k1",
2061 "v0",
2062 "v1",
2063 "ts",
2064 "__primary_key",
2065 "__sequence",
2066 "__op_type"
2067 ]
2068 );
2069 }
2070
2071 #[test]
2072 fn test_bulk_part_converter_empty() {
2073 let metadata = metadata_for_test();
2074 let capacity = 10;
2075 let primary_key_codec = build_primary_key_codec(&metadata);
2076 let schema = to_flat_sst_arrow_schema(
2077 &metadata,
2078 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2079 );
2080
2081 let converter =
2082 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2083
2084 let bulk_part = converter.convert().unwrap();
2085
2086 assert_eq!(bulk_part.num_rows(), 0);
2087 assert_eq!(bulk_part.min_timestamp, i64::MAX);
2088 assert_eq!(bulk_part.max_timestamp, i64::MIN);
2089 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2090
2091 let schema = bulk_part.batch.schema();
2093 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2094 assert_eq!(
2095 field_names,
2096 vec![
2097 "k0",
2098 "k1",
2099 "v0",
2100 "v1",
2101 "ts",
2102 "__primary_key",
2103 "__sequence",
2104 "__op_type"
2105 ]
2106 );
2107 }
2108
2109 #[test]
2110 fn test_bulk_part_converter_without_primary_key_columns() {
2111 let metadata = metadata_for_test();
2112 let primary_key_codec = build_primary_key_codec(&metadata);
2113 let schema = to_flat_sst_arrow_schema(
2114 &metadata,
2115 &FlatSchemaOptions {
2116 raw_pk_columns: false,
2117 string_pk_use_dict: true,
2118 },
2119 );
2120
2121 let capacity = 100;
2122 let mut converter =
2123 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2124
2125 let key_values1 = build_key_values_with_ts_seq_values(
2126 &metadata,
2127 "key1".to_string(),
2128 1u32,
2129 vec![1000, 2000].into_iter(),
2130 vec![Some(1.0), Some(2.0)].into_iter(),
2131 1,
2132 );
2133
2134 let key_values2 = build_key_values_with_ts_seq_values(
2135 &metadata,
2136 "key2".to_string(),
2137 2u32,
2138 vec![1500].into_iter(),
2139 vec![Some(3.0)].into_iter(),
2140 2,
2141 );
2142
2143 converter.append_key_values(&key_values1).unwrap();
2144 converter.append_key_values(&key_values2).unwrap();
2145
2146 let bulk_part = converter.convert().unwrap();
2147
2148 assert_eq!(bulk_part.num_rows(), 3);
2149 assert_eq!(bulk_part.min_timestamp, 1000);
2150 assert_eq!(bulk_part.max_timestamp, 2000);
2151 assert_eq!(bulk_part.sequence, 2);
2152 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2153
2154 let schema = bulk_part.batch.schema();
2156 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2157 assert_eq!(
2158 field_names,
2159 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2160 );
2161 }
2162
2163 #[allow(clippy::too_many_arguments)]
2164 fn build_key_values_with_sparse_encoding(
2165 metadata: &RegionMetadataRef,
2166 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2167 table_id: u32,
2168 tsid: u64,
2169 k0: String,
2170 k1: String,
2171 timestamps: impl Iterator<Item = i64>,
2172 values: impl Iterator<Item = Option<f64>>,
2173 sequence: SequenceNumber,
2174 ) -> KeyValues {
2175 let pk_values = vec![
2177 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2178 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2179 (0, Value::String(k0.clone().into())),
2180 (1, Value::String(k1.clone().into())),
2181 ];
2182 let mut encoded_key = Vec::new();
2183 primary_key_codec
2184 .encode_values(&pk_values, &mut encoded_key)
2185 .unwrap();
2186 assert!(!encoded_key.is_empty());
2187
2188 let column_schema = vec![
2190 api::v1::ColumnSchema {
2191 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2192 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2193 ConcreteDataType::binary_datatype(),
2194 )
2195 .unwrap()
2196 .datatype() as i32,
2197 semantic_type: api::v1::SemanticType::Tag as i32,
2198 ..Default::default()
2199 },
2200 api::v1::ColumnSchema {
2201 column_name: "ts".to_string(),
2202 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2203 ConcreteDataType::timestamp_millisecond_datatype(),
2204 )
2205 .unwrap()
2206 .datatype() as i32,
2207 semantic_type: api::v1::SemanticType::Timestamp as i32,
2208 ..Default::default()
2209 },
2210 api::v1::ColumnSchema {
2211 column_name: "v0".to_string(),
2212 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2213 ConcreteDataType::int64_datatype(),
2214 )
2215 .unwrap()
2216 .datatype() as i32,
2217 semantic_type: api::v1::SemanticType::Field as i32,
2218 ..Default::default()
2219 },
2220 api::v1::ColumnSchema {
2221 column_name: "v1".to_string(),
2222 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2223 ConcreteDataType::float64_datatype(),
2224 )
2225 .unwrap()
2226 .datatype() as i32,
2227 semantic_type: api::v1::SemanticType::Field as i32,
2228 ..Default::default()
2229 },
2230 ];
2231
2232 let rows = timestamps
2233 .zip(values)
2234 .map(|(ts, v)| Row {
2235 values: vec![
2236 api::v1::Value {
2237 value_data: Some(api::v1::value::ValueData::BinaryValue(
2238 encoded_key.clone(),
2239 )),
2240 },
2241 api::v1::Value {
2242 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2243 },
2244 api::v1::Value {
2245 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2246 },
2247 api::v1::Value {
2248 value_data: v.map(api::v1::value::ValueData::F64Value),
2249 },
2250 ],
2251 })
2252 .collect();
2253
2254 let mutation = api::v1::Mutation {
2255 op_type: 1,
2256 sequence,
2257 rows: Some(api::v1::Rows {
2258 schema: column_schema,
2259 rows,
2260 }),
2261 write_hint: Some(WriteHint {
2262 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2263 }),
2264 };
2265 KeyValues::new(metadata.as_ref(), mutation).unwrap()
2266 }
2267
2268 #[test]
2269 fn test_bulk_part_converter_sparse_primary_key_encoding() {
2270 use api::v1::SemanticType;
2271 use datatypes::schema::ColumnSchema;
2272 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2273 use store_api::storage::RegionId;
2274
2275 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2276 builder
2277 .push_column_metadata(ColumnMetadata {
2278 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2279 semantic_type: SemanticType::Tag,
2280 column_id: 0,
2281 })
2282 .push_column_metadata(ColumnMetadata {
2283 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2284 semantic_type: SemanticType::Tag,
2285 column_id: 1,
2286 })
2287 .push_column_metadata(ColumnMetadata {
2288 column_schema: ColumnSchema::new(
2289 "ts",
2290 ConcreteDataType::timestamp_millisecond_datatype(),
2291 false,
2292 ),
2293 semantic_type: SemanticType::Timestamp,
2294 column_id: 2,
2295 })
2296 .push_column_metadata(ColumnMetadata {
2297 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2298 semantic_type: SemanticType::Field,
2299 column_id: 3,
2300 })
2301 .push_column_metadata(ColumnMetadata {
2302 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2303 semantic_type: SemanticType::Field,
2304 column_id: 4,
2305 })
2306 .primary_key(vec![0, 1])
2307 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2308 let metadata = Arc::new(builder.build().unwrap());
2309
2310 let primary_key_codec = build_primary_key_codec(&metadata);
2311 let schema = to_flat_sst_arrow_schema(
2312 &metadata,
2313 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2314 );
2315
2316 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2317 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2318
2319 let capacity = 100;
2320 let mut converter =
2321 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2322
2323 let key_values1 = build_key_values_with_sparse_encoding(
2324 &metadata,
2325 &primary_key_codec,
2326 2048u32, 100u64, "key11".to_string(),
2329 "key21".to_string(),
2330 vec![1000, 2000].into_iter(),
2331 vec![Some(1.0), Some(2.0)].into_iter(),
2332 1,
2333 );
2334
2335 let key_values2 = build_key_values_with_sparse_encoding(
2336 &metadata,
2337 &primary_key_codec,
2338 4096u32, 200u64, "key12".to_string(),
2341 "key22".to_string(),
2342 vec![1500].into_iter(),
2343 vec![Some(3.0)].into_iter(),
2344 2,
2345 );
2346
2347 converter.append_key_values(&key_values1).unwrap();
2348 converter.append_key_values(&key_values2).unwrap();
2349
2350 let bulk_part = converter.convert().unwrap();
2351
2352 assert_eq!(bulk_part.num_rows(), 3);
2353 assert_eq!(bulk_part.min_timestamp, 1000);
2354 assert_eq!(bulk_part.max_timestamp, 2000);
2355 assert_eq!(bulk_part.sequence, 2);
2356 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2357
2358 let schema = bulk_part.batch.schema();
2362 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2363 assert_eq!(
2364 field_names,
2365 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2366 );
2367
2368 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2370 let dict_array = primary_key_column
2371 .as_any()
2372 .downcast_ref::<DictionaryArray<UInt32Type>>()
2373 .unwrap();
2374
2375 assert!(!dict_array.is_empty());
2377 assert_eq!(dict_array.len(), 3); let values = dict_array
2381 .values()
2382 .as_any()
2383 .downcast_ref::<BinaryArray>()
2384 .unwrap();
2385 for i in 0..values.len() {
2386 assert!(
2387 !values.value(i).is_empty(),
2388 "Encoded primary key should not be empty"
2389 );
2390 }
2391 }
2392
2393 #[test]
2394 fn test_convert_bulk_part_empty() {
2395 let metadata = metadata_for_test();
2396 let schema = to_flat_sst_arrow_schema(
2397 &metadata,
2398 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2399 );
2400 let primary_key_codec = build_primary_key_codec(&metadata);
2401
2402 let empty_batch = RecordBatch::new_empty(schema.clone());
2404 let empty_part = BulkPart {
2405 batch: empty_batch,
2406 max_timestamp: 0,
2407 min_timestamp: 0,
2408 sequence: 0,
2409 timestamp_index: 0,
2410 raw_data: None,
2411 };
2412
2413 let result =
2414 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2415 assert!(result.is_none());
2416 }
2417
2418 #[test]
2419 fn test_convert_bulk_part_dense_with_pk_columns() {
2420 let metadata = metadata_for_test();
2421 let primary_key_codec = build_primary_key_codec(&metadata);
2422
2423 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2424 "key1", "key2", "key1",
2425 ]));
2426 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2427 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2428 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2429 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2430
2431 let input_schema = Arc::new(Schema::new(vec![
2432 Field::new("k0", ArrowDataType::Utf8, false),
2433 Field::new("k1", ArrowDataType::UInt32, false),
2434 Field::new("v0", ArrowDataType::Int64, true),
2435 Field::new("v1", ArrowDataType::Float64, true),
2436 Field::new(
2437 "ts",
2438 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2439 false,
2440 ),
2441 ]));
2442
2443 let input_batch = RecordBatch::try_new(
2444 input_schema,
2445 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2446 )
2447 .unwrap();
2448
2449 let part = BulkPart {
2450 batch: input_batch,
2451 max_timestamp: 2000,
2452 min_timestamp: 1000,
2453 sequence: 5,
2454 timestamp_index: 4,
2455 raw_data: None,
2456 };
2457
2458 let output_schema = to_flat_sst_arrow_schema(
2459 &metadata,
2460 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2461 );
2462
2463 let result = convert_bulk_part(
2464 part,
2465 &metadata,
2466 primary_key_codec,
2467 output_schema,
2468 true, )
2470 .unwrap();
2471
2472 let converted = result.unwrap();
2473
2474 assert_eq!(converted.num_rows(), 3);
2475 assert_eq!(converted.max_timestamp, 2000);
2476 assert_eq!(converted.min_timestamp, 1000);
2477 assert_eq!(converted.sequence, 5);
2478
2479 let schema = converted.batch.schema();
2480 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2481 assert_eq!(
2482 field_names,
2483 vec![
2484 "k0",
2485 "k1",
2486 "v0",
2487 "v1",
2488 "ts",
2489 "__primary_key",
2490 "__sequence",
2491 "__op_type"
2492 ]
2493 );
2494
2495 let k0_col = converted.batch.column_by_name("k0").unwrap();
2496 assert!(matches!(
2497 k0_col.data_type(),
2498 ArrowDataType::Dictionary(_, _)
2499 ));
2500
2501 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2502 let dict_array = pk_col
2503 .as_any()
2504 .downcast_ref::<DictionaryArray<UInt32Type>>()
2505 .unwrap();
2506 let keys = dict_array.keys();
2507
2508 assert_eq!(keys.len(), 3);
2509 }
2510
2511 #[test]
2512 fn test_convert_bulk_part_dense_without_pk_columns() {
2513 let metadata = metadata_for_test();
2514 let primary_key_codec = build_primary_key_codec(&metadata);
2515
2516 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2518 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2519 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2520 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2521 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2522
2523 let input_schema = Arc::new(Schema::new(vec![
2524 Field::new("k0", ArrowDataType::Utf8, false),
2525 Field::new("k1", ArrowDataType::UInt32, false),
2526 Field::new("v0", ArrowDataType::Int64, true),
2527 Field::new("v1", ArrowDataType::Float64, true),
2528 Field::new(
2529 "ts",
2530 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2531 false,
2532 ),
2533 ]));
2534
2535 let input_batch = RecordBatch::try_new(
2536 input_schema,
2537 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2538 )
2539 .unwrap();
2540
2541 let part = BulkPart {
2542 batch: input_batch,
2543 max_timestamp: 2000,
2544 min_timestamp: 1000,
2545 sequence: 3,
2546 timestamp_index: 4,
2547 raw_data: None,
2548 };
2549
2550 let output_schema = to_flat_sst_arrow_schema(
2551 &metadata,
2552 &FlatSchemaOptions {
2553 raw_pk_columns: false,
2554 string_pk_use_dict: true,
2555 },
2556 );
2557
2558 let result = convert_bulk_part(
2559 part,
2560 &metadata,
2561 primary_key_codec,
2562 output_schema,
2563 false, )
2565 .unwrap();
2566
2567 let converted = result.unwrap();
2568
2569 assert_eq!(converted.num_rows(), 2);
2570 assert_eq!(converted.max_timestamp, 2000);
2571 assert_eq!(converted.min_timestamp, 1000);
2572 assert_eq!(converted.sequence, 3);
2573
2574 let schema = converted.batch.schema();
2576 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2577 assert_eq!(
2578 field_names,
2579 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2580 );
2581
2582 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2584 assert!(matches!(
2585 pk_col.data_type(),
2586 ArrowDataType::Dictionary(_, _)
2587 ));
2588 }
2589
2590 #[test]
2591 fn test_convert_bulk_part_sparse_encoding() {
2592 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2593 builder
2594 .push_column_metadata(ColumnMetadata {
2595 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2596 semantic_type: SemanticType::Tag,
2597 column_id: 0,
2598 })
2599 .push_column_metadata(ColumnMetadata {
2600 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2601 semantic_type: SemanticType::Tag,
2602 column_id: 1,
2603 })
2604 .push_column_metadata(ColumnMetadata {
2605 column_schema: ColumnSchema::new(
2606 "ts",
2607 ConcreteDataType::timestamp_millisecond_datatype(),
2608 false,
2609 ),
2610 semantic_type: SemanticType::Timestamp,
2611 column_id: 2,
2612 })
2613 .push_column_metadata(ColumnMetadata {
2614 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2615 semantic_type: SemanticType::Field,
2616 column_id: 3,
2617 })
2618 .push_column_metadata(ColumnMetadata {
2619 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2620 semantic_type: SemanticType::Field,
2621 column_id: 4,
2622 })
2623 .primary_key(vec![0, 1])
2624 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2625 let metadata = Arc::new(builder.build().unwrap());
2626
2627 let primary_key_codec = build_primary_key_codec(&metadata);
2628
2629 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2631 b"encoded_key_1".as_slice(),
2632 b"encoded_key_2".as_slice(),
2633 ]));
2634 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2635 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2636 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2637
2638 let input_schema = Arc::new(Schema::new(vec![
2639 Field::new("__primary_key", ArrowDataType::Binary, false),
2640 Field::new("v0", ArrowDataType::Int64, true),
2641 Field::new("v1", ArrowDataType::Float64, true),
2642 Field::new(
2643 "ts",
2644 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2645 false,
2646 ),
2647 ]));
2648
2649 let input_batch =
2650 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2651 .unwrap();
2652
2653 let part = BulkPart {
2654 batch: input_batch,
2655 max_timestamp: 2000,
2656 min_timestamp: 1000,
2657 sequence: 7,
2658 timestamp_index: 3,
2659 raw_data: None,
2660 };
2661
2662 let output_schema = to_flat_sst_arrow_schema(
2663 &metadata,
2664 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2665 );
2666
2667 let result = convert_bulk_part(
2668 part,
2669 &metadata,
2670 primary_key_codec,
2671 output_schema,
2672 true, )
2674 .unwrap();
2675
2676 let converted = result.unwrap();
2677
2678 assert_eq!(converted.num_rows(), 2);
2679 assert_eq!(converted.max_timestamp, 2000);
2680 assert_eq!(converted.min_timestamp, 1000);
2681 assert_eq!(converted.sequence, 7);
2682
2683 let schema = converted.batch.schema();
2685 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2686 assert_eq!(
2687 field_names,
2688 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2689 );
2690
2691 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2693 assert!(matches!(
2694 pk_col.data_type(),
2695 ArrowDataType::Dictionary(_, _)
2696 ));
2697 }
2698
2699 #[test]
2700 fn test_convert_bulk_part_sorting_with_multiple_series() {
2701 let metadata = metadata_for_test();
2702 let primary_key_codec = build_primary_key_codec(&metadata);
2703
2704 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2706 "series_b", "series_a", "series_b", "series_a",
2707 ]));
2708 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2709 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2710 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2711 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2712 2000, 1000, 4000, 3000,
2713 ]));
2714
2715 let input_schema = Arc::new(Schema::new(vec![
2716 Field::new("k0", ArrowDataType::Utf8, false),
2717 Field::new("k1", ArrowDataType::UInt32, false),
2718 Field::new("v0", ArrowDataType::Int64, true),
2719 Field::new("v1", ArrowDataType::Float64, true),
2720 Field::new(
2721 "ts",
2722 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2723 false,
2724 ),
2725 ]));
2726
2727 let input_batch = RecordBatch::try_new(
2728 input_schema,
2729 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2730 )
2731 .unwrap();
2732
2733 let part = BulkPart {
2734 batch: input_batch,
2735 max_timestamp: 4000,
2736 min_timestamp: 1000,
2737 sequence: 10,
2738 timestamp_index: 4,
2739 raw_data: None,
2740 };
2741
2742 let output_schema = to_flat_sst_arrow_schema(
2743 &metadata,
2744 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2745 );
2746
2747 let result =
2748 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2749
2750 let converted = result.unwrap();
2751
2752 assert_eq!(converted.num_rows(), 4);
2753
2754 let ts_col = converted.batch.column(converted.timestamp_index);
2756 let ts_array = ts_col
2757 .as_any()
2758 .downcast_ref::<TimestampMillisecondArray>()
2759 .unwrap();
2760
2761 let timestamps: Vec<i64> = ts_array.values().to_vec();
2765 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2766 }
2767
2768 fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2770 let metadata = metadata_for_test();
2771 let kvs = inputs
2772 .iter()
2773 .map(|m| {
2774 build_key_values_with_ts_seq_values(
2775 &metadata,
2776 m.k0.to_string(),
2777 m.k1,
2778 m.timestamps.iter().copied(),
2779 m.v1.iter().copied(),
2780 m.sequence,
2781 )
2782 })
2783 .collect::<Vec<_>>();
2784 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2785 let primary_key_codec = build_primary_key_codec(&metadata);
2786 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2787 for kv in kvs {
2788 converter.append_key_values(&kv).unwrap();
2789 }
2790 converter.convert().unwrap()
2791 }
2792
2793 fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2795 let metadata = metadata_for_test();
2796 let mut all_batches = Vec::new();
2797 let mut min_ts = i64::MAX;
2798 let mut max_ts = i64::MIN;
2799 let mut max_seq = 0u64;
2800
2801 for inputs in groups {
2802 let part = build_converted_bulk_part(inputs);
2803 min_ts = min_ts.min(part.min_timestamp);
2804 max_ts = max_ts.max(part.max_timestamp);
2805 max_seq = max_seq.max(part.sequence);
2806 all_batches.push(part.batch);
2807 }
2808
2809 let multi = MultiBulkPart::new(
2810 all_batches,
2811 min_ts,
2812 max_ts,
2813 max_seq,
2814 groups.len(),
2815 &metadata,
2816 );
2817 (multi, metadata)
2818 }
2819
2820 #[test]
2821 fn test_multi_bulk_part_prune_batches() {
2822 let (multi, metadata) = build_multi_bulk_part(&[
2824 &[MutationInput {
2825 k0: "a",
2826 k1: 0,
2827 timestamps: &[1, 2],
2828 v1: &[Some(1.0), Some(2.0)],
2829 sequence: 0,
2830 }],
2831 &[MutationInput {
2832 k0: "m",
2833 k1: 0,
2834 timestamps: &[3, 4],
2835 v1: &[Some(3.0), Some(4.0)],
2836 sequence: 1,
2837 }],
2838 &[MutationInput {
2839 k0: "z",
2840 k1: 0,
2841 timestamps: &[5, 6],
2842 v1: &[Some(5.0), Some(6.0)],
2843 sequence: 2,
2844 }],
2845 ]);
2846 assert_eq!(multi.num_rows(), 6);
2847 assert_eq!(multi.num_batches(), 3);
2848
2849 let context = Arc::new(
2851 BulkIterContext::new(
2852 metadata.clone(),
2853 None,
2854 Some(Predicate::new(vec![
2855 datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2856 ])),
2857 false,
2858 )
2859 .unwrap(),
2860 );
2861 let reader = multi
2862 .read(context, None, None)
2863 .unwrap()
2864 .expect("should have results");
2865 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2866 assert_eq!(total_rows, 2);
2867
2868 let context = Arc::new(
2870 BulkIterContext::new(
2871 metadata.clone(),
2872 None,
2873 Some(Predicate::new(vec![
2874 datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2875 ])),
2876 false,
2877 )
2878 .unwrap(),
2879 );
2880 assert!(multi.read(context, None, None).unwrap().is_none());
2881
2882 let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2884 let reader = multi
2885 .read(context, None, None)
2886 .unwrap()
2887 .expect("should have results");
2888 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2889 assert_eq!(total_rows, 6);
2890 }
2891}