1use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use api::helper::{ColumnDataTypeWrapper, to_grpc_value};
22use api::v1::bulk_wal_entry::Body;
23use api::v1::{ArrowIpc, BulkWalEntry, Mutation, OpType};
24use bytes::Bytes;
25use common_grpc::flight::{FlightDecoder, FlightEncoder, FlightMessage};
26use common_recordbatch::DfRecordBatch as RecordBatch;
27use common_time::Timestamp;
28use datafusion_common::Column;
29use datafusion_common::pruning::PruningStatistics;
30use datafusion_expr::utils::expr_to_columns;
31use datatypes::arrow;
32use datatypes::arrow::array::{
33 Array, ArrayRef, BinaryArray, BooleanArray, StringDictionaryBuilder, UInt8Array, UInt64Array,
34};
35use datatypes::arrow::compute::{SortColumn, SortOptions, concat_batches};
36use datatypes::arrow::datatypes::{
37 DataType as ArrowDataType, Field, Schema, SchemaRef, UInt32Type,
38};
39use datatypes::data_type::DataType;
40use datatypes::extension::json::is_structured_json_field;
41use datatypes::prelude::{MutableVector, Vector};
42use datatypes::value::ValueRef;
43use datatypes::vectors::Helper;
44use mito_codec::key_values::{KeyValue, KeyValues};
45use mito_codec::row_converter::{PrimaryKeyCodec, SortField, build_primary_key_codec_with_fields};
46use parquet::arrow::ArrowWriter;
47use parquet::basic::{Compression, ZstdLevel};
48use parquet::file::metadata::ParquetMetaData;
49use parquet::file::properties::WriterProperties;
50use smallvec::SmallVec;
51use snafu::{OptionExt, ResultExt};
52use store_api::codec::PrimaryKeyEncoding;
53use store_api::metadata::{RegionMetadata, RegionMetadataRef};
54use store_api::storage::consts::PRIMARY_KEY_COLUMN_NAME;
55use store_api::storage::{ColumnId, FileId, SequenceNumber, SequenceRange};
56
57use crate::error::{
58 self, ColumnNotFoundSnafu, ComputeArrowSnafu, CreateDefaultSnafu, DataTypeMismatchSnafu,
59 EncodeMemtableSnafu, EncodeSnafu, InvalidMetadataSnafu, InvalidRequestSnafu,
60 NewRecordBatchSnafu, Result,
61};
62use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
63use crate::memtable::bulk::json_align::Json2Aligner;
64use crate::memtable::bulk::part_reader::EncodedBulkPartIter;
65use crate::memtable::time_series::{ValueBuilder, Values};
66use crate::memtable::{BoxedRecordBatchIterator, MemScanMetrics, MemtableStats};
67use crate::sst::SeriesEstimator;
68use crate::sst::index::IndexOutput;
69use crate::sst::parquet::flat_format::primary_key_column_index;
70use crate::sst::parquet::format::{PrimaryKeyArray, PrimaryKeyArrayBuilder};
71use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo};
72
73const INIT_DICT_VALUE_CAPACITY: usize = 8;
74
75#[derive(Clone)]
77pub struct BulkPart {
78 pub batch: RecordBatch,
79 pub max_timestamp: i64,
80 pub min_timestamp: i64,
81 pub sequence: u64,
82 pub timestamp_index: usize,
83 pub raw_data: Option<ArrowIpc>,
84}
85
86impl TryFrom<BulkWalEntry> for BulkPart {
87 type Error = error::Error;
88
89 fn try_from(value: BulkWalEntry) -> std::result::Result<Self, Self::Error> {
90 match value.body.expect("Entry payload should be present") {
91 Body::ArrowIpc(ipc) => {
92 let mut decoder = FlightDecoder::try_from_schema_bytes(&ipc.schema)
93 .context(error::ConvertBulkWalEntrySnafu)?;
94 let batch = decoder
95 .try_decode_record_batch(&ipc.data_header, &ipc.payload)
96 .context(error::ConvertBulkWalEntrySnafu)?;
97 Ok(Self {
98 batch,
99 max_timestamp: value.max_ts,
100 min_timestamp: value.min_ts,
101 sequence: value.sequence,
102 timestamp_index: value.timestamp_index as usize,
103 raw_data: Some(ipc),
104 })
105 }
106 }
107 }
108}
109
110impl TryFrom<&BulkPart> for BulkWalEntry {
111 type Error = error::Error;
112
113 fn try_from(value: &BulkPart) -> Result<Self> {
114 if let Some(ipc) = &value.raw_data {
115 Ok(BulkWalEntry {
116 sequence: value.sequence,
117 max_ts: value.max_timestamp,
118 min_ts: value.min_timestamp,
119 timestamp_index: value.timestamp_index as u32,
120 body: Some(Body::ArrowIpc(ipc.clone())),
121 })
122 } else {
123 let mut encoder = FlightEncoder::default();
124 let schema_bytes = encoder
125 .encode_schema(value.batch.schema().as_ref())
126 .data_header;
127 let [rb_data] = encoder
128 .encode(FlightMessage::RecordBatch(value.batch.clone()))
129 .try_into()
130 .map_err(|_| {
131 error::UnsupportedOperationSnafu {
132 err_msg: "create BulkWalEntry from RecordBatch with dictionary arrays",
133 }
134 .build()
135 })?;
136 Ok(BulkWalEntry {
137 sequence: value.sequence,
138 max_ts: value.max_timestamp,
139 min_ts: value.min_timestamp,
140 timestamp_index: value.timestamp_index as u32,
141 body: Some(Body::ArrowIpc(ArrowIpc {
142 schema: schema_bytes,
143 data_header: rb_data.data_header,
144 payload: rb_data.data_body,
145 })),
146 })
147 }
148 }
149}
150
151impl BulkPart {
152 pub(crate) fn schema(&self) -> SchemaRef {
153 self.batch.schema()
154 }
155
156 pub(crate) fn estimated_size(&self) -> usize {
157 record_batch_estimated_size(&self.batch)
158 }
159
160 pub fn estimated_series_count(&self) -> usize {
163 let pk_column_idx = primary_key_column_index(self.batch.num_columns());
164 let pk_column = self.batch.column(pk_column_idx);
165 if let Some(dict_array) = pk_column.as_any().downcast_ref::<PrimaryKeyArray>() {
166 dict_array.values().len()
167 } else {
168 0
169 }
170 }
171
172 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
174 let ts_type = region_metadata.time_index_type();
175 let min_ts = ts_type.create_timestamp(self.min_timestamp);
176 let max_ts = ts_type.create_timestamp(self.max_timestamp);
177
178 MemtableStats {
179 estimated_bytes: self.estimated_size(),
180 time_range: Some((min_ts, max_ts)),
181 num_rows: self.num_rows(),
182 num_ranges: 1,
183 max_sequence: self.sequence,
184 series_count: self.estimated_series_count(),
185 }
186 }
187
188 pub fn fill_missing_columns(&mut self, region_metadata: &RegionMetadata) -> Result<()> {
198 let batch_schema = self.batch.schema();
200 let batch_columns: HashSet<_> = batch_schema
201 .fields()
202 .iter()
203 .map(|f| f.name().as_str())
204 .collect();
205
206 let mut columns_to_fill = Vec::new();
208 for column_meta in ®ion_metadata.column_metadatas {
209 if !batch_columns.contains(column_meta.column_schema.name.as_str()) {
212 columns_to_fill.push(column_meta);
213 }
214 }
215
216 if columns_to_fill.is_empty() {
217 return Ok(());
218 }
219
220 let num_rows = self.batch.num_rows();
221
222 let mut new_columns = Vec::new();
223 let mut new_fields = Vec::new();
224
225 new_fields.extend(batch_schema.fields().iter().cloned());
227 new_columns.extend_from_slice(self.batch.columns());
228
229 let region_id = region_metadata.region_id;
230 for column_meta in columns_to_fill {
232 let default_vector = column_meta
233 .column_schema
234 .create_default_vector(num_rows)
235 .context(CreateDefaultSnafu {
236 region_id,
237 column: &column_meta.column_schema.name,
238 })?
239 .with_context(|| InvalidRequestSnafu {
240 region_id,
241 reason: format!(
242 "column {} does not have default value",
243 column_meta.column_schema.name
244 ),
245 })?;
246 let arrow_array = default_vector.to_arrow_array();
247 column_meta.column_schema.data_type.as_arrow_type();
248
249 new_fields.push(Arc::new(Field::new(
250 column_meta.column_schema.name.clone(),
251 column_meta.column_schema.data_type.as_arrow_type(),
252 column_meta.column_schema.is_nullable(),
253 )));
254 new_columns.push(arrow_array);
255 }
256
257 let new_schema = Arc::new(Schema::new(new_fields));
259 let new_batch =
260 RecordBatch::try_new(new_schema, new_columns).context(NewRecordBatchSnafu)?;
261
262 self.batch = new_batch;
264
265 Ok(())
266 }
267
268 pub(crate) fn to_mutation(&self, region_metadata: &RegionMetadataRef) -> Result<Mutation> {
270 let vectors = region_metadata
271 .schema
272 .column_schemas()
273 .iter()
274 .map(|col| match self.batch.column_by_name(&col.name) {
275 None => Ok(None),
276 Some(col) => Helper::try_into_vector(col).map(Some),
277 })
278 .collect::<datatypes::error::Result<Vec<_>>>()
279 .context(error::ComputeVectorSnafu)?;
280
281 let rows = (0..self.num_rows())
282 .map(|row_idx| {
283 let values = (0..self.batch.num_columns())
284 .map(|col_idx| {
285 if let Some(v) = &vectors[col_idx] {
286 to_grpc_value(v.get(row_idx))
287 } else {
288 api::v1::Value { value_data: None }
289 }
290 })
291 .collect::<Vec<_>>();
292 api::v1::Row { values }
293 })
294 .collect::<Vec<_>>();
295
296 let schema = region_metadata
297 .column_metadatas
298 .iter()
299 .map(|c| {
300 let data_type_wrapper =
301 ColumnDataTypeWrapper::try_from(c.column_schema.data_type.clone())?;
302 Ok(api::v1::ColumnSchema {
303 column_name: c.column_schema.name.clone(),
304 datatype: data_type_wrapper.datatype() as i32,
305 semantic_type: c.semantic_type as i32,
306 ..Default::default()
307 })
308 })
309 .collect::<api::error::Result<Vec<_>>>()
310 .context(error::ConvertColumnDataTypeSnafu {
311 reason: "failed to convert region metadata to column schema",
312 })?;
313
314 let rows = api::v1::Rows { schema, rows };
315
316 Ok(Mutation {
317 op_type: OpType::Put as i32,
318 sequence: self.sequence,
319 rows: Some(rows),
320 write_hint: None,
321 })
322 }
323
324 pub fn timestamps(&self) -> &ArrayRef {
325 self.batch.column(self.timestamp_index)
326 }
327
328 pub fn num_rows(&self) -> usize {
329 self.batch.num_rows()
330 }
331}
332
333pub struct UnorderedPart {
336 parts: Vec<BulkPart>,
338 total_rows: usize,
340 min_timestamp: i64,
342 max_timestamp: i64,
344 max_sequence: u64,
346 threshold: usize,
348 compact_threshold: usize,
350}
351
352impl Default for UnorderedPart {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358impl UnorderedPart {
359 pub fn new() -> Self {
361 Self {
362 parts: Vec::new(),
363 total_rows: 0,
364 min_timestamp: i64::MAX,
365 max_timestamp: i64::MIN,
366 max_sequence: 0,
367 threshold: 1024,
368 compact_threshold: 4096,
369 }
370 }
371
372 pub fn set_threshold(&mut self, threshold: usize) {
374 self.threshold = threshold;
375 }
376
377 pub fn set_compact_threshold(&mut self, compact_threshold: usize) {
379 self.compact_threshold = compact_threshold;
380 }
381
382 pub fn threshold(&self) -> usize {
384 self.threshold
385 }
386
387 pub fn compact_threshold(&self) -> usize {
389 self.compact_threshold
390 }
391
392 pub fn should_accept(&self, num_rows: usize) -> bool {
394 num_rows < self.threshold
395 }
396
397 pub fn should_compact(&self) -> bool {
399 self.total_rows >= self.compact_threshold
400 }
401
402 pub fn push(&mut self, part: BulkPart) {
404 self.total_rows += part.num_rows();
405 self.min_timestamp = self.min_timestamp.min(part.min_timestamp);
406 self.max_timestamp = self.max_timestamp.max(part.max_timestamp);
407 self.max_sequence = self.max_sequence.max(part.sequence);
408 self.parts.push(part);
409 }
410
411 pub fn num_rows(&self) -> usize {
413 self.total_rows
414 }
415
416 pub fn is_empty(&self) -> bool {
418 self.parts.is_empty()
419 }
420
421 pub fn num_parts(&self) -> usize {
423 self.parts.len()
424 }
425
426 pub fn concat_and_sort(&self) -> Result<Option<RecordBatch>> {
429 if self.parts.is_empty() {
430 return Ok(None);
431 }
432
433 if self.parts.len() == 1 {
434 return Ok(Some(self.parts[0].batch.clone()));
436 }
437
438 let schema = self.parts[0].batch.schema();
440 let concatenated = if schema.fields().iter().any(is_structured_json_field) {
441 let aligner = Json2Aligner::try_new(self.parts.iter().map(|part| part.batch.schema()))?;
442 let aligned_batches =
443 aligner.align_batches(self.parts.iter().map(|part| part.batch.clone()))?;
444 concat_batches(aligner.schema(), &aligned_batches).context(ComputeArrowSnafu)?
445 } else {
446 concat_batches(&schema, self.parts.iter().map(|x| &x.batch))
447 .context(ComputeArrowSnafu)?
448 };
449
450 let sorted_batch = sort_primary_key_record_batch(&concatenated)?;
452
453 Ok(Some(sorted_batch))
454 }
455
456 pub fn to_bulk_part(&self) -> Result<Option<BulkPart>> {
459 let Some(sorted_batch) = self.concat_and_sort()? else {
460 return Ok(None);
461 };
462
463 let timestamp_index = self.parts[0].timestamp_index;
464
465 Ok(Some(BulkPart {
466 batch: sorted_batch,
467 max_timestamp: self.max_timestamp,
468 min_timestamp: self.min_timestamp,
469 sequence: self.max_sequence,
470 timestamp_index,
471 raw_data: None,
472 }))
473 }
474
475 pub fn clear(&mut self) {
477 self.parts.clear();
478 self.total_rows = 0;
479 self.min_timestamp = i64::MAX;
480 self.max_timestamp = i64::MIN;
481 self.max_sequence = 0;
482 }
483}
484
485pub fn record_batch_estimated_size(batch: &RecordBatch) -> usize {
487 batch
488 .columns()
489 .iter()
490 .map(|c| c.to_data().get_slice_memory_size().unwrap_or(0))
492 .sum()
493}
494
495enum PrimaryKeyColumnBuilder {
497 StringDict(StringDictionaryBuilder<UInt32Type>),
499 Vector(Box<dyn MutableVector>),
501}
502
503impl PrimaryKeyColumnBuilder {
504 fn push_value_ref(&mut self, value: ValueRef) -> Result<()> {
506 match self {
507 PrimaryKeyColumnBuilder::StringDict(builder) => {
508 if let Some(s) = value.try_into_string().context(DataTypeMismatchSnafu)? {
509 builder.append_value(s);
511 } else {
512 builder.append_null();
513 }
514 }
515 PrimaryKeyColumnBuilder::Vector(builder) => {
516 builder.push_value_ref(&value);
517 }
518 }
519 Ok(())
520 }
521
522 fn into_arrow_array(self) -> ArrayRef {
524 match self {
525 PrimaryKeyColumnBuilder::StringDict(mut builder) => Arc::new(builder.finish()),
526 PrimaryKeyColumnBuilder::Vector(mut builder) => builder.to_vector().to_arrow_array(),
527 }
528 }
529}
530
531pub struct BulkPartConverter {
533 schema: SchemaRef,
535 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
537 key_buf: Vec<u8>,
539 key_array_builder: PrimaryKeyArrayBuilder,
541 value_builder: ValueBuilder,
543 primary_key_column_builders: Vec<PrimaryKeyColumnBuilder>,
546
547 max_ts: i64,
549 min_ts: i64,
551 max_sequence: SequenceNumber,
553}
554
555impl BulkPartConverter {
556 pub fn new(
561 region_metadata: &RegionMetadataRef,
562 schema: SchemaRef,
563 capacity: usize,
564 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
565 store_primary_key_columns: bool,
566 ) -> Self {
567 debug_assert_eq!(
568 region_metadata.primary_key_encoding,
569 primary_key_codec.encoding()
570 );
571
572 let primary_key_column_builders = if store_primary_key_columns
573 && region_metadata.primary_key_encoding != PrimaryKeyEncoding::Sparse
574 {
575 new_primary_key_column_builders(region_metadata, capacity)
576 } else {
577 Vec::new()
578 };
579
580 Self {
581 schema,
582 primary_key_codec,
583 key_buf: Vec::new(),
584 key_array_builder: PrimaryKeyArrayBuilder::new(),
585 value_builder: ValueBuilder::new(region_metadata, capacity),
586 primary_key_column_builders,
587 min_ts: i64::MAX,
588 max_ts: i64::MIN,
589 max_sequence: SequenceNumber::MIN,
590 }
591 }
592
593 pub fn append_key_values(&mut self, key_values: &KeyValues) -> Result<()> {
595 for kv in key_values.iter() {
596 self.append_key_value(&kv)?;
597 }
598
599 Ok(())
600 }
601
602 fn append_key_value(&mut self, kv: &KeyValue) -> Result<()> {
606 if self.primary_key_codec.encoding() == PrimaryKeyEncoding::Sparse {
608 let mut primary_keys = kv.primary_keys();
611 if let Some(encoded) = primary_keys
612 .next()
613 .context(ColumnNotFoundSnafu {
614 column: PRIMARY_KEY_COLUMN_NAME,
615 })?
616 .try_into_binary()
617 .context(DataTypeMismatchSnafu)?
618 {
619 self.key_array_builder
620 .append(encoded)
621 .context(ComputeArrowSnafu)?;
622 } else {
623 self.key_array_builder
624 .append("")
625 .context(ComputeArrowSnafu)?;
626 }
627 } else {
628 self.key_buf.clear();
630 self.primary_key_codec
631 .encode_key_value(kv, &mut self.key_buf)
632 .context(EncodeSnafu)?;
633 self.key_array_builder
634 .append(&self.key_buf)
635 .context(ComputeArrowSnafu)?;
636 };
637
638 if !self.primary_key_column_builders.is_empty() {
640 for (builder, pk_value) in self
641 .primary_key_column_builders
642 .iter_mut()
643 .zip(kv.primary_keys())
644 {
645 builder.push_value_ref(pk_value)?;
646 }
647 }
648
649 self.value_builder.push(
651 kv.timestamp(),
652 kv.sequence(),
653 kv.op_type() as u8,
654 kv.fields(),
655 );
656
657 let ts = kv
660 .timestamp()
661 .try_into_timestamp()
662 .unwrap()
663 .unwrap()
664 .value();
665 self.min_ts = self.min_ts.min(ts);
666 self.max_ts = self.max_ts.max(ts);
667 self.max_sequence = self.max_sequence.max(kv.sequence());
668
669 Ok(())
670 }
671
672 pub fn convert(mut self) -> Result<BulkPart> {
676 let values = Values::from(self.value_builder);
677 let mut columns =
678 Vec::with_capacity(4 + values.fields.len() + self.primary_key_column_builders.len());
679
680 for builder in self.primary_key_column_builders {
682 columns.push(builder.into_arrow_array());
683 }
684 columns.extend(values.fields.iter().map(|field| field.to_arrow_array()));
686 let timestamp_index = columns.len();
688 columns.push(values.timestamp.to_arrow_array());
689 let pk_array = self.key_array_builder.finish();
691 columns.push(Arc::new(pk_array));
692 columns.push(values.sequence.to_arrow_array());
694 columns.push(values.op_type.to_arrow_array());
695
696 let schema = align_schema_with_json_array(self.schema, &columns);
699 let batch = RecordBatch::try_new(schema, columns).context(NewRecordBatchSnafu)?;
700 let batch = sort_primary_key_record_batch(&batch)?;
702
703 Ok(BulkPart {
704 batch,
705 max_timestamp: self.max_ts,
706 min_timestamp: self.min_ts,
707 sequence: self.max_sequence,
708 timestamp_index,
709 raw_data: None,
710 })
711 }
712}
713
714fn align_schema_with_json_array(schema: SchemaRef, columns: &[ArrayRef]) -> SchemaRef {
715 if schema.fields().iter().all(|f| !is_structured_json_field(f)) {
716 return schema;
717 }
718
719 let mut fields = Vec::with_capacity(schema.fields().len());
720 for (field, array) in schema.fields().iter().zip(columns) {
721 if !is_structured_json_field(field) {
722 fields.push(field.clone());
723 continue;
724 }
725
726 let mut field = field.as_ref().clone();
727 field.set_data_type(array.data_type().clone());
728 fields.push(Arc::new(field));
729 }
730
731 Arc::new(Schema::new_with_metadata(fields, schema.metadata().clone()))
732}
733
734fn new_primary_key_column_builders(
735 metadata: &RegionMetadata,
736 capacity: usize,
737) -> Vec<PrimaryKeyColumnBuilder> {
738 metadata
739 .primary_key_columns()
740 .map(|col| {
741 if col.column_schema.data_type.is_string() {
742 PrimaryKeyColumnBuilder::StringDict(StringDictionaryBuilder::with_capacity(
743 capacity,
744 INIT_DICT_VALUE_CAPACITY,
745 capacity,
746 ))
747 } else {
748 PrimaryKeyColumnBuilder::Vector(
749 col.column_schema.data_type.create_mutable_vector(capacity),
750 )
751 }
752 })
753 .collect()
754}
755
756pub fn sort_primary_key_record_batch(batch: &RecordBatch) -> Result<RecordBatch> {
758 let total_columns = batch.num_columns();
759 let sort_columns = vec![
760 SortColumn {
762 values: batch.column(total_columns - 3).clone(),
763 options: Some(SortOptions {
764 descending: false,
765 nulls_first: true,
766 }),
767 },
768 SortColumn {
770 values: batch.column(total_columns - 4).clone(),
771 options: Some(SortOptions {
772 descending: false,
773 nulls_first: true,
774 }),
775 },
776 SortColumn {
778 values: batch.column(total_columns - 2).clone(),
779 options: Some(SortOptions {
780 descending: true,
781 nulls_first: true,
782 }),
783 },
784 ];
785
786 let indices = datatypes::arrow::compute::lexsort_to_indices(&sort_columns, None)
787 .context(ComputeArrowSnafu)?;
788
789 datatypes::arrow::compute::take_record_batch(batch, &indices).context(ComputeArrowSnafu)
790}
791
792pub fn convert_bulk_part(
818 part: BulkPart,
819 region_metadata: &RegionMetadataRef,
820 primary_key_codec: Arc<dyn PrimaryKeyCodec>,
821 schema: SchemaRef,
822 store_primary_key_columns: bool,
823) -> Result<Option<BulkPart>> {
824 if part.num_rows() == 0 {
825 return Ok(None);
826 }
827
828 let num_rows = part.num_rows();
829 let is_sparse = region_metadata.primary_key_encoding == PrimaryKeyEncoding::Sparse;
830
831 let input_schema = part.batch.schema();
833 let column_indices: HashMap<&str, usize> = input_schema
834 .fields()
835 .iter()
836 .enumerate()
837 .map(|(idx, field)| (field.name().as_str(), idx))
838 .collect();
839
840 let mut output_columns = Vec::new();
842
843 let pk_array = if is_sparse {
845 None
848 } else {
849 let pk_vectors: Result<Vec<_>> = region_metadata
851 .primary_key_columns()
852 .map(|col_meta| {
853 let col_idx = column_indices
854 .get(col_meta.column_schema.name.as_str())
855 .context(ColumnNotFoundSnafu {
856 column: &col_meta.column_schema.name,
857 })?;
858 let col = part.batch.column(*col_idx);
859 Helper::try_into_vector(col).context(error::ComputeVectorSnafu)
860 })
861 .collect();
862 let pk_vectors = pk_vectors?;
863
864 let mut key_array_builder = PrimaryKeyArrayBuilder::new();
865 let mut encode_buf = Vec::new();
866
867 for row_idx in 0..num_rows {
868 encode_buf.clear();
869
870 let pk_values_with_ids: Vec<_> = region_metadata
872 .primary_key
873 .iter()
874 .zip(pk_vectors.iter())
875 .map(|(col_id, vector)| (*col_id, vector.get_ref(row_idx)))
876 .collect();
877
878 primary_key_codec
880 .encode_value_refs(&pk_values_with_ids, &mut encode_buf)
881 .context(EncodeSnafu)?;
882
883 key_array_builder
884 .append(&encode_buf)
885 .context(ComputeArrowSnafu)?;
886 }
887
888 Some(key_array_builder.finish())
889 };
890
891 if store_primary_key_columns && !is_sparse {
893 for col_meta in region_metadata.primary_key_columns() {
894 let col_idx = column_indices
895 .get(col_meta.column_schema.name.as_str())
896 .context(ColumnNotFoundSnafu {
897 column: &col_meta.column_schema.name,
898 })?;
899 let col = part.batch.column(*col_idx);
900
901 let col = if col_meta.column_schema.data_type.is_string() {
903 let target_type = ArrowDataType::Dictionary(
904 Box::new(ArrowDataType::UInt32),
905 Box::new(ArrowDataType::Utf8),
906 );
907 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
908 } else {
909 col.clone()
910 };
911 output_columns.push(col);
912 }
913 }
914
915 for col_meta in region_metadata.field_columns() {
917 let col_idx = column_indices
918 .get(col_meta.column_schema.name.as_str())
919 .context(ColumnNotFoundSnafu {
920 column: &col_meta.column_schema.name,
921 })?;
922 output_columns.push(part.batch.column(*col_idx).clone());
923 }
924
925 let new_timestamp_index = output_columns.len();
927 let ts_col_idx = column_indices
928 .get(
929 region_metadata
930 .time_index_column()
931 .column_schema
932 .name
933 .as_str(),
934 )
935 .context(ColumnNotFoundSnafu {
936 column: ®ion_metadata.time_index_column().column_schema.name,
937 })?;
938 output_columns.push(part.batch.column(*ts_col_idx).clone());
939
940 let pk_dictionary = if let Some(pk_dict_array) = pk_array {
942 Arc::new(pk_dict_array) as ArrayRef
943 } else {
944 let pk_col_idx =
945 column_indices
946 .get(PRIMARY_KEY_COLUMN_NAME)
947 .context(ColumnNotFoundSnafu {
948 column: PRIMARY_KEY_COLUMN_NAME,
949 })?;
950 let col = part.batch.column(*pk_col_idx);
951
952 let target_type = ArrowDataType::Dictionary(
954 Box::new(ArrowDataType::UInt32),
955 Box::new(ArrowDataType::Binary),
956 );
957 arrow::compute::cast(col, &target_type).context(ComputeArrowSnafu)?
958 };
959 output_columns.push(pk_dictionary);
960
961 let sequence_array = UInt64Array::from(vec![part.sequence; num_rows]);
962 output_columns.push(Arc::new(sequence_array) as ArrayRef);
963
964 let op_type_array = UInt8Array::from(vec![OpType::Put as u8; num_rows]);
965 output_columns.push(Arc::new(op_type_array) as ArrayRef);
966
967 let batch = RecordBatch::try_new(schema, output_columns).context(NewRecordBatchSnafu)?;
968
969 let sorted_batch = sort_primary_key_record_batch(&batch)?;
971
972 Ok(Some(BulkPart {
973 batch: sorted_batch,
974 max_timestamp: part.max_timestamp,
975 min_timestamp: part.min_timestamp,
976 sequence: part.sequence,
977 timestamp_index: new_timestamp_index,
978 raw_data: None,
979 }))
980}
981
982#[derive(Debug, Clone)]
983pub struct EncodedBulkPart {
984 data: Bytes,
985 metadata: BulkPartMeta,
986 schema: SchemaRef,
988}
989
990impl EncodedBulkPart {
991 pub fn new(data: Bytes, metadata: BulkPartMeta, schema: SchemaRef) -> Self {
992 Self {
993 data,
994 metadata,
995 schema,
996 }
997 }
998
999 pub fn metadata(&self) -> &BulkPartMeta {
1000 &self.metadata
1001 }
1002
1003 pub(crate) fn schema(&self) -> SchemaRef {
1004 self.schema.clone()
1005 }
1006
1007 pub(crate) fn size_bytes(&self) -> usize {
1009 self.data.len()
1010 }
1011
1012 pub fn data(&self) -> &Bytes {
1014 &self.data
1015 }
1016
1017 pub fn to_memtable_stats(&self) -> MemtableStats {
1019 let meta = &self.metadata;
1020 let ts_type = meta.region_metadata.time_index_type();
1021 let min_ts = ts_type.create_timestamp(meta.min_timestamp);
1022 let max_ts = ts_type.create_timestamp(meta.max_timestamp);
1023
1024 MemtableStats {
1025 estimated_bytes: self.size_bytes(),
1026 time_range: Some((min_ts, max_ts)),
1027 num_rows: meta.num_rows,
1028 num_ranges: 1,
1029 max_sequence: meta.max_sequence,
1030 series_count: meta.num_series as usize,
1031 }
1032 }
1033
1034 pub(crate) fn to_sst_info(&self, file_id: FileId) -> SstInfo {
1042 let unit = self.metadata.region_metadata.time_index_type().unit();
1043 let max_row_group_uncompressed_size: u64 = self
1044 .metadata
1045 .parquet_metadata
1046 .row_groups()
1047 .iter()
1048 .map(|rg| {
1049 rg.columns()
1050 .iter()
1051 .map(|c| c.uncompressed_size() as u64)
1052 .sum::<u64>()
1053 })
1054 .max()
1055 .unwrap_or(0);
1056 SstInfo {
1057 file_id,
1058 time_range: (
1059 Timestamp::new(self.metadata.min_timestamp, unit),
1060 Timestamp::new(self.metadata.max_timestamp, unit),
1061 ),
1062 file_size: self.data.len() as u64,
1063 max_row_group_uncompressed_size,
1064 num_rows: self.metadata.num_rows,
1065 num_row_groups: self.metadata.parquet_metadata.num_row_groups() as u64,
1066 file_metadata: Some(self.metadata.parquet_metadata.clone()),
1067 index_metadata: IndexOutput::default(),
1068 num_series: self.metadata.num_series,
1069 }
1070 }
1071
1072 pub(crate) fn read(
1073 &self,
1074 context: BulkIterContextRef,
1075 sequence: Option<SequenceRange>,
1076 mem_scan_metrics: Option<MemScanMetrics>,
1077 ) -> Result<Option<BoxedRecordBatchIterator>> {
1078 let skip_fields_for_pruning = context.pre_filter_mode().skip_fields();
1080
1081 let row_groups_to_read =
1083 context.row_groups_to_read(&self.metadata.parquet_metadata, skip_fields_for_pruning);
1084
1085 if row_groups_to_read.is_empty() {
1086 return Ok(None);
1088 }
1089
1090 let iter = EncodedBulkPartIter::try_new(
1091 self,
1092 context,
1093 row_groups_to_read,
1094 sequence,
1095 mem_scan_metrics,
1096 )?;
1097 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1098 }
1099}
1100
1101#[derive(Debug, Clone)]
1103pub struct BulkPartMeta {
1104 pub num_rows: usize,
1106 pub max_timestamp: i64,
1108 pub min_timestamp: i64,
1110 pub parquet_metadata: Arc<ParquetMetaData>,
1112 pub region_metadata: RegionMetadataRef,
1114 pub num_series: u64,
1116 pub max_sequence: u64,
1118}
1119
1120#[derive(Default, Debug)]
1122pub struct BulkPartEncodeMetrics {
1123 pub iter_cost: Duration,
1125 pub write_cost: Duration,
1127 pub raw_size: usize,
1129 pub encoded_size: usize,
1131 pub num_rows: usize,
1133}
1134
1135pub struct BulkPartEncoder {
1136 metadata: RegionMetadataRef,
1137 writer_props: Option<WriterProperties>,
1138}
1139
1140impl BulkPartEncoder {
1141 pub fn new(metadata: RegionMetadataRef, row_group_size: usize) -> Result<BulkPartEncoder> {
1142 let json = metadata.to_json().context(InvalidMetadataSnafu)?;
1144 let key_value_meta =
1145 parquet::file::metadata::KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
1146
1147 let writer_props = Some(
1149 WriterProperties::builder()
1150 .set_key_value_metadata(Some(vec![key_value_meta]))
1151 .set_write_batch_size(row_group_size)
1152 .set_max_row_group_row_count(Some(row_group_size))
1153 .set_compression(Compression::ZSTD(ZstdLevel::default()))
1154 .set_column_index_truncate_length(None)
1155 .set_statistics_truncate_length(None)
1156 .build(),
1157 );
1158
1159 Ok(Self {
1160 metadata,
1161 writer_props,
1162 })
1163 }
1164}
1165
1166impl BulkPartEncoder {
1167 pub fn encode_record_batch_iter(
1169 &self,
1170 iter: BoxedRecordBatchIterator,
1171 arrow_schema: SchemaRef,
1172 min_timestamp: i64,
1173 max_timestamp: i64,
1174 max_sequence: u64,
1175 metrics: &mut BulkPartEncodeMetrics,
1176 ) -> Result<Option<EncodedBulkPart>> {
1177 let mut buf = Vec::with_capacity(4096);
1178 let mut writer =
1179 ArrowWriter::try_new(&mut buf, arrow_schema.clone(), self.writer_props.clone())
1180 .context(EncodeMemtableSnafu)?;
1181 let mut total_rows = 0;
1182 let mut series_estimator = SeriesEstimator::default();
1183
1184 let mut iter_start = Instant::now();
1186 for batch_result in iter {
1187 metrics.iter_cost += iter_start.elapsed();
1188 let batch = batch_result?;
1189 if batch.num_rows() == 0 {
1190 continue;
1191 }
1192
1193 series_estimator.update_flat(&batch);
1194 metrics.raw_size += record_batch_estimated_size(&batch);
1195 let write_start = Instant::now();
1196 writer.write(&batch).context(EncodeMemtableSnafu)?;
1197 metrics.write_cost += write_start.elapsed();
1198 total_rows += batch.num_rows();
1199 iter_start = Instant::now();
1200 }
1201 metrics.iter_cost += iter_start.elapsed();
1202
1203 if total_rows == 0 {
1204 return Ok(None);
1205 }
1206
1207 let close_start = Instant::now();
1208 let file_metadata = writer.close().context(EncodeMemtableSnafu)?;
1209 metrics.write_cost += close_start.elapsed();
1210 metrics.encoded_size += buf.len();
1211 metrics.num_rows += total_rows;
1212
1213 let buf = Bytes::from(buf);
1214 let parquet_metadata = Arc::new(file_metadata);
1215 let num_series = series_estimator.finish();
1216
1217 Ok(Some(EncodedBulkPart {
1218 data: buf,
1219 metadata: BulkPartMeta {
1220 num_rows: total_rows,
1221 max_timestamp,
1222 min_timestamp,
1223 parquet_metadata,
1224 region_metadata: self.metadata.clone(),
1225 num_series,
1226 max_sequence,
1227 },
1228 schema: arrow_schema,
1229 }))
1230 }
1231
1232 pub fn encode_part(&self, part: &BulkPart) -> Result<Option<EncodedBulkPart>> {
1234 if part.batch.num_rows() == 0 {
1235 return Ok(None);
1236 }
1237
1238 let mut buf = Vec::with_capacity(4096);
1239 let arrow_schema = part.batch.schema();
1240
1241 let file_metadata = {
1242 let mut writer =
1243 ArrowWriter::try_new(&mut buf, arrow_schema.clone(), self.writer_props.clone())
1244 .context(EncodeMemtableSnafu)?;
1245 writer.write(&part.batch).context(EncodeMemtableSnafu)?;
1246 writer.finish().context(EncodeMemtableSnafu)?
1247 };
1248
1249 let buf = Bytes::from(buf);
1250 let parquet_metadata = Arc::new(file_metadata);
1251
1252 Ok(Some(EncodedBulkPart {
1253 data: buf,
1254 metadata: BulkPartMeta {
1255 num_rows: part.batch.num_rows(),
1256 max_timestamp: part.max_timestamp,
1257 min_timestamp: part.min_timestamp,
1258 parquet_metadata,
1259 region_metadata: self.metadata.clone(),
1260 num_series: part.estimated_series_count() as u64,
1261 max_sequence: part.sequence,
1262 },
1263 schema: arrow_schema,
1264 }))
1265 }
1266}
1267
1268#[derive(Debug, Clone)]
1274struct BatchStats {
1275 num_batches: usize,
1277 first_tag_id: ColumnId,
1279 min_values: ArrayRef,
1281 max_values: ArrayRef,
1283}
1284
1285impl BatchStats {
1286 fn compute(batches: &[RecordBatch], metadata: &RegionMetadata) -> Option<Self> {
1291 let first_tag_id = *metadata.primary_key.first()?;
1296 let first_tag_column = metadata.column_by_id(first_tag_id)?;
1297 let data_type = &first_tag_column.column_schema.data_type;
1298
1299 let converter = build_primary_key_codec_with_fields(
1300 metadata.primary_key_encoding,
1301 [(first_tag_id, SortField::new(data_type.clone()))].into_iter(),
1302 );
1303 let pk_index = primary_key_column_index(batches.first()?.num_columns());
1304
1305 let mut min_builder = data_type.create_mutable_vector(batches.len());
1306 let mut max_builder = data_type.create_mutable_vector(batches.len());
1307
1308 for batch in batches {
1309 match Self::extract_first_tag_bounds(batch, pk_index, &*converter) {
1310 Some((min_val, max_val)) => {
1311 min_builder.push_value_ref(&min_val.as_value_ref());
1312 max_builder.push_value_ref(&max_val.as_value_ref());
1313 }
1314 None => {
1315 min_builder.push_null();
1316 max_builder.push_null();
1317 }
1318 }
1319 }
1320
1321 Some(Self {
1322 num_batches: batches.len(),
1323 first_tag_id,
1324 min_values: min_builder.to_vector().to_arrow_array(),
1325 max_values: max_builder.to_vector().to_arrow_array(),
1326 })
1327 }
1328
1329 fn extract_first_tag_bounds(
1331 batch: &RecordBatch,
1332 pk_index: usize,
1333 converter: &dyn PrimaryKeyCodec,
1334 ) -> Option<(datatypes::value::Value, datatypes::value::Value)> {
1335 if batch.num_rows() == 0 {
1336 return None;
1337 }
1338
1339 let pk_dict = batch
1340 .column(pk_index)
1341 .as_any()
1342 .downcast_ref::<PrimaryKeyArray>()?;
1343 let pk_values = pk_dict.values().as_any().downcast_ref::<BinaryArray>()?;
1344
1345 let keys = pk_dict.keys();
1346 let min_key = keys.value(0);
1347 let max_key = keys.value(batch.num_rows() - 1);
1348 let min_bytes = pk_values.value(min_key as usize);
1349 let max_bytes = pk_values.value(max_key as usize);
1350
1351 Some((
1352 converter.decode_leftmost(min_bytes).ok()??,
1353 converter.decode_leftmost(max_bytes).ok()??,
1354 ))
1355 }
1356}
1357
1358struct BatchPruningStats<'a> {
1363 stats: &'a BatchStats,
1364 metadata: &'a RegionMetadataRef,
1365}
1366
1367impl PruningStatistics for BatchPruningStats<'_> {
1368 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1369 let col = self.metadata.column_by_name(&column.name)?;
1370 if col.column_id == self.stats.first_tag_id {
1371 Some(self.stats.min_values.clone())
1372 } else {
1373 None
1374 }
1375 }
1376
1377 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1378 let col = self.metadata.column_by_name(&column.name)?;
1379 if col.column_id == self.stats.first_tag_id {
1380 Some(self.stats.max_values.clone())
1381 } else {
1382 None
1383 }
1384 }
1385
1386 fn num_containers(&self) -> usize {
1387 self.stats.num_batches
1388 }
1389
1390 fn null_counts(&self, _column: &Column) -> Option<ArrayRef> {
1391 None
1392 }
1393
1394 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1395 None
1396 }
1397
1398 fn contained(
1399 &self,
1400 _column: &Column,
1401 _values: &std::collections::HashSet<datafusion_common::ScalarValue>,
1402 ) -> Option<BooleanArray> {
1403 None
1404 }
1405}
1406
1407fn predicate_references_column(predicate: &table::predicate::Predicate, column_name: &str) -> bool {
1409 let mut columns = HashSet::new();
1410 for expr in predicate.exprs() {
1411 let _ = expr_to_columns(expr, &mut columns);
1412 }
1413 columns.iter().any(|col| col.name == column_name)
1414}
1415
1416pub(crate) fn should_prune_bulk_part(
1420 batch: &RecordBatch,
1421 context: &BulkIterContext,
1422 metadata: &RegionMetadata,
1423) -> bool {
1424 let predicate = match &context.predicate {
1425 Some(p) => p,
1426 None => return false,
1427 };
1428 let first_tag_id = match metadata.primary_key.first() {
1431 Some(id) => *id,
1432 None => return false,
1433 };
1434 let first_tag_name = &metadata
1436 .column_by_id(first_tag_id)
1437 .unwrap()
1438 .column_schema
1439 .name;
1440 if !predicate_references_column(predicate, first_tag_name) {
1441 return false;
1442 }
1443 let stats = match BatchStats::compute(std::slice::from_ref(batch), metadata) {
1444 Some(s) => s,
1445 None => return false,
1446 };
1447 let region_meta = context.read_format().metadata();
1448 let pruning_stats = BatchPruningStats {
1449 stats: &stats,
1450 metadata: region_meta,
1451 };
1452 let mask = predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1453 !mask.first().copied().unwrap_or(true)
1454}
1455
1456#[derive(Debug, Clone)]
1462pub struct MultiBulkPart {
1463 batches: SmallVec<[RecordBatch; 4]>,
1465 total_rows: usize,
1467 max_timestamp: i64,
1469 min_timestamp: i64,
1471 max_sequence: SequenceNumber,
1473 series_count: usize,
1475 batch_stats: Option<BatchStats>,
1478}
1479
1480impl MultiBulkPart {
1481 pub fn from_bulk_part(part: BulkPart, metadata: &RegionMetadata) -> Self {
1483 let num_rows = part.num_rows();
1484 let series_count = part.estimated_series_count();
1485 let batch_stats = BatchStats::compute(std::slice::from_ref(&part.batch), metadata);
1486 let mut batches = SmallVec::new();
1487 batches.push(part.batch);
1488
1489 Self {
1490 batches,
1491 total_rows: num_rows,
1492 max_timestamp: part.max_timestamp,
1493 min_timestamp: part.min_timestamp,
1494 max_sequence: part.sequence,
1495 series_count,
1496 batch_stats,
1497 }
1498 }
1499
1500 pub fn new(
1513 batches: Vec<RecordBatch>,
1514 min_timestamp: i64,
1515 max_timestamp: i64,
1516 max_sequence: SequenceNumber,
1517 series_count: usize,
1518 metadata: &RegionMetadata,
1519 ) -> Self {
1520 assert!(!batches.is_empty(), "batches must not be empty");
1521
1522 let total_rows = batches.iter().map(|b| b.num_rows()).sum();
1523 let batch_stats = BatchStats::compute(&batches, metadata);
1524
1525 Self {
1526 batches: SmallVec::from_vec(batches),
1527 total_rows,
1528 max_timestamp,
1529 min_timestamp,
1530 max_sequence,
1531 series_count,
1532 batch_stats,
1533 }
1534 }
1535
1536 pub fn num_rows(&self) -> usize {
1538 self.total_rows
1539 }
1540
1541 pub(crate) fn schemas(&self) -> impl Iterator<Item = SchemaRef> + '_ {
1542 self.batches.iter().map(|batch| batch.schema())
1543 }
1544
1545 pub fn min_timestamp(&self) -> i64 {
1547 self.min_timestamp
1548 }
1549
1550 pub fn max_timestamp(&self) -> i64 {
1552 self.max_timestamp
1553 }
1554
1555 pub fn max_sequence(&self) -> SequenceNumber {
1557 self.max_sequence
1558 }
1559
1560 pub fn series_count(&self) -> usize {
1562 self.series_count
1563 }
1564
1565 pub fn num_batches(&self) -> usize {
1567 self.batches.len()
1568 }
1569
1570 pub(crate) fn estimated_size(&self) -> usize {
1572 self.batches.iter().map(record_batch_estimated_size).sum()
1573 }
1574
1575 pub(crate) fn read(
1581 &self,
1582 context: BulkIterContextRef,
1583 sequence: Option<SequenceRange>,
1584 mem_scan_metrics: Option<MemScanMetrics>,
1585 ) -> Result<Option<BoxedRecordBatchIterator>> {
1586 if self.batches.is_empty() {
1587 return Ok(None);
1588 }
1589
1590 let batches_to_read = self.prune_batches(&context);
1591
1592 if batches_to_read.is_empty() {
1593 return Ok(None);
1594 }
1595
1596 let iter = crate::memtable::bulk::part_reader::BulkPartBatchIter::new(
1597 batches_to_read,
1598 context,
1599 sequence,
1600 self.series_count,
1601 mem_scan_metrics,
1602 );
1603 Ok(Some(Box::new(iter) as BoxedRecordBatchIterator))
1604 }
1605
1606 fn prune_batches(&self, context: &BulkIterContextRef) -> Vec<RecordBatch> {
1609 if let Some(stats) = &self.batch_stats
1610 && let Some(predicate) = &context.predicate
1611 {
1612 let region_meta = context.read_format().metadata();
1613 let pruning_stats = BatchPruningStats {
1614 stats,
1615 metadata: region_meta,
1616 };
1617 let mask =
1618 predicate.prune_with_stats(&pruning_stats, region_meta.schema.arrow_schema());
1619 self.batches
1620 .iter()
1621 .zip(mask.iter())
1622 .filter_map(
1623 |(batch, &selected)| {
1624 if selected { Some(batch.clone()) } else { None }
1625 },
1626 )
1627 .collect()
1628 } else {
1629 self.batches.iter().cloned().collect()
1630 }
1631 }
1632
1633 pub fn to_memtable_stats(&self, region_metadata: &RegionMetadataRef) -> MemtableStats {
1635 let ts_type = region_metadata.time_index_type();
1636 let min_ts = ts_type.create_timestamp(self.min_timestamp);
1637 let max_ts = ts_type.create_timestamp(self.max_timestamp);
1638
1639 MemtableStats {
1640 estimated_bytes: self.estimated_size(),
1641 time_range: Some((min_ts, max_ts)),
1642 num_rows: self.num_rows(),
1643 num_ranges: 1,
1644 max_sequence: self.max_sequence,
1645 series_count: self.series_count,
1646 }
1647 }
1648}
1649
1650#[cfg(test)]
1651mod tests {
1652 use api::v1::{Row, SemanticType, WriteHint};
1653 use datafusion_common::ScalarValue;
1654 use datatypes::arrow::array::{
1655 BinaryArray, DictionaryArray, Float64Array, TimestampMillisecondArray,
1656 };
1657 use datatypes::arrow::datatypes::UInt32Type;
1658 use datatypes::prelude::{ConcreteDataType, Value};
1659 use datatypes::schema::ColumnSchema;
1660 use mito_codec::row_converter::build_primary_key_codec;
1661 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1662 use store_api::storage::RegionId;
1663 use store_api::storage::consts::ReservedColumnId;
1664 use table::predicate::Predicate;
1665
1666 use super::*;
1667 use crate::memtable::bulk::context::BulkIterContext;
1668 use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
1669 use crate::test_util::memtable_util::{build_key_values_with_ts_seq_values, metadata_for_test};
1670
1671 struct MutationInput<'a> {
1672 k0: &'a str,
1673 k1: u32,
1674 timestamps: &'a [i64],
1675 v1: &'a [Option<f64>],
1676 sequence: u64,
1677 }
1678
1679 fn encode(input: &[MutationInput]) -> EncodedBulkPart {
1680 let metadata = metadata_for_test();
1681 let kvs = input
1682 .iter()
1683 .map(|m| {
1684 build_key_values_with_ts_seq_values(
1685 &metadata,
1686 m.k0.to_string(),
1687 m.k1,
1688 m.timestamps.iter().copied(),
1689 m.v1.iter().copied(),
1690 m.sequence,
1691 )
1692 })
1693 .collect::<Vec<_>>();
1694 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1695 let primary_key_codec = build_primary_key_codec(&metadata);
1696 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1697 for kv in kvs {
1698 converter.append_key_values(&kv).unwrap();
1699 }
1700 let part = converter.convert().unwrap();
1701 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1702 encoder.encode_part(&part).unwrap().unwrap()
1703 }
1704
1705 #[test]
1706 fn test_write_and_read_part_projection() {
1707 let part = encode(&[
1708 MutationInput {
1709 k0: "a",
1710 k1: 0,
1711 timestamps: &[1],
1712 v1: &[Some(0.1)],
1713 sequence: 0,
1714 },
1715 MutationInput {
1716 k0: "b",
1717 k1: 0,
1718 timestamps: &[1],
1719 v1: &[Some(0.0)],
1720 sequence: 0,
1721 },
1722 MutationInput {
1723 k0: "a",
1724 k1: 0,
1725 timestamps: &[2],
1726 v1: &[Some(0.2)],
1727 sequence: 1,
1728 },
1729 ]);
1730
1731 let projection = &[4u32];
1732 let reader = part
1733 .read(
1734 Arc::new(
1735 BulkIterContext::new(
1736 part.metadata.region_metadata.clone(),
1737 Some(projection.as_slice()),
1738 None,
1739 false,
1740 )
1741 .unwrap(),
1742 ),
1743 None,
1744 None,
1745 )
1746 .unwrap()
1747 .expect("expect at least one row group");
1748
1749 let mut total_rows_read = 0;
1750 let mut field: Vec<f64> = vec![];
1751 for res in reader {
1752 let batch = res.unwrap();
1753 assert_eq!(5, batch.num_columns());
1754 field.extend_from_slice(
1755 batch
1756 .column(0)
1757 .as_any()
1758 .downcast_ref::<Float64Array>()
1759 .unwrap()
1760 .values(),
1761 );
1762 total_rows_read += batch.num_rows();
1763 }
1764 assert_eq!(3, total_rows_read);
1765 assert_eq!(vec![0.1, 0.2, 0.0], field);
1766 }
1767
1768 fn prepare(key_values: Vec<(&str, u32, (i64, i64), u64)>) -> EncodedBulkPart {
1769 let metadata = metadata_for_test();
1770 let kvs = key_values
1771 .into_iter()
1772 .map(|(k0, k1, (start, end), sequence)| {
1773 let ts = start..end;
1774 let v1 = (start..end).map(|_| None);
1775 build_key_values_with_ts_seq_values(&metadata, k0.to_string(), k1, ts, v1, sequence)
1776 })
1777 .collect::<Vec<_>>();
1778 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
1779 let primary_key_codec = build_primary_key_codec(&metadata);
1780 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
1781 for kv in kvs {
1782 converter.append_key_values(&kv).unwrap();
1783 }
1784 let part = converter.convert().unwrap();
1785 let encoder = BulkPartEncoder::new(metadata, 1024).unwrap();
1786 encoder.encode_part(&part).unwrap().unwrap()
1787 }
1788
1789 fn check_prune_row_group(
1790 part: &EncodedBulkPart,
1791 predicate: Option<Predicate>,
1792 expected_rows: usize,
1793 ) {
1794 let context = Arc::new(
1795 BulkIterContext::new(
1796 part.metadata.region_metadata.clone(),
1797 None,
1798 predicate,
1799 false,
1800 )
1801 .unwrap(),
1802 );
1803 let reader = part
1804 .read(context, None, None)
1805 .unwrap()
1806 .expect("expect at least one row group");
1807 let mut total_rows_read = 0;
1808 for res in reader {
1809 let batch = res.unwrap();
1810 total_rows_read += batch.num_rows();
1811 }
1812 assert_eq!(expected_rows, total_rows_read);
1814 }
1815
1816 #[test]
1817 fn test_prune_row_groups() {
1818 let part = prepare(vec![
1819 ("a", 0, (0, 40), 1),
1820 ("a", 1, (0, 60), 1),
1821 ("b", 0, (0, 100), 2),
1822 ("b", 1, (100, 180), 3),
1823 ("b", 1, (180, 210), 4),
1824 ]);
1825
1826 let context = Arc::new(
1827 BulkIterContext::new(
1828 part.metadata.region_metadata.clone(),
1829 None,
1830 Some(Predicate::new(vec![datafusion_expr::col("ts").eq(
1831 datafusion_expr::lit(ScalarValue::TimestampMillisecond(Some(300), None)),
1832 )])),
1833 false,
1834 )
1835 .unwrap(),
1836 );
1837 assert!(part.read(context, None, None).unwrap().is_none());
1838
1839 check_prune_row_group(&part, None, 310);
1840
1841 check_prune_row_group(
1842 &part,
1843 Some(Predicate::new(vec![
1844 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1845 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1846 ])),
1847 40,
1848 );
1849
1850 check_prune_row_group(
1851 &part,
1852 Some(Predicate::new(vec![
1853 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1854 datafusion_expr::col("k1").eq(datafusion_expr::lit(1u32)),
1855 ])),
1856 60,
1857 );
1858
1859 check_prune_row_group(
1860 &part,
1861 Some(Predicate::new(vec![
1862 datafusion_expr::col("k0").eq(datafusion_expr::lit("a")),
1863 ])),
1864 100,
1865 );
1866
1867 check_prune_row_group(
1868 &part,
1869 Some(Predicate::new(vec![
1870 datafusion_expr::col("k0").eq(datafusion_expr::lit("b")),
1871 datafusion_expr::col("k1").eq(datafusion_expr::lit(0u32)),
1872 ])),
1873 100,
1874 );
1875
1876 check_prune_row_group(
1878 &part,
1879 Some(Predicate::new(vec![
1880 datafusion_expr::col("v0").eq(datafusion_expr::lit(150i64)),
1881 ])),
1882 1,
1883 );
1884 }
1885
1886 #[test]
1887 fn test_bulk_part_converter_append_and_convert() {
1888 let metadata = metadata_for_test();
1889 let capacity = 100;
1890 let primary_key_codec = build_primary_key_codec(&metadata);
1891 let schema = to_flat_sst_arrow_schema(
1892 &metadata,
1893 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1894 );
1895
1896 let mut converter =
1897 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1898
1899 let key_values1 = build_key_values_with_ts_seq_values(
1900 &metadata,
1901 "key1".to_string(),
1902 1u32,
1903 vec![1000, 2000].into_iter(),
1904 vec![Some(1.0), Some(2.0)].into_iter(),
1905 1,
1906 );
1907
1908 let key_values2 = build_key_values_with_ts_seq_values(
1909 &metadata,
1910 "key2".to_string(),
1911 2u32,
1912 vec![1500].into_iter(),
1913 vec![Some(3.0)].into_iter(),
1914 2,
1915 );
1916
1917 converter.append_key_values(&key_values1).unwrap();
1918 converter.append_key_values(&key_values2).unwrap();
1919
1920 let bulk_part = converter.convert().unwrap();
1921
1922 assert_eq!(bulk_part.num_rows(), 3);
1923 assert_eq!(bulk_part.min_timestamp, 1000);
1924 assert_eq!(bulk_part.max_timestamp, 2000);
1925 assert_eq!(bulk_part.sequence, 2);
1926 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
1927
1928 let schema = bulk_part.batch.schema();
1931 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
1932 assert_eq!(
1933 field_names,
1934 vec![
1935 "k0",
1936 "k1",
1937 "v0",
1938 "v1",
1939 "ts",
1940 "__primary_key",
1941 "__sequence",
1942 "__op_type"
1943 ]
1944 );
1945 }
1946
1947 #[test]
1948 fn test_bulk_part_converter_sorting() {
1949 let metadata = metadata_for_test();
1950 let capacity = 100;
1951 let primary_key_codec = build_primary_key_codec(&metadata);
1952 let schema = to_flat_sst_arrow_schema(
1953 &metadata,
1954 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
1955 );
1956
1957 let mut converter =
1958 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
1959
1960 let key_values1 = build_key_values_with_ts_seq_values(
1961 &metadata,
1962 "z_key".to_string(),
1963 3u32,
1964 vec![3000].into_iter(),
1965 vec![Some(3.0)].into_iter(),
1966 3,
1967 );
1968
1969 let key_values2 = build_key_values_with_ts_seq_values(
1970 &metadata,
1971 "a_key".to_string(),
1972 1u32,
1973 vec![1000].into_iter(),
1974 vec![Some(1.0)].into_iter(),
1975 1,
1976 );
1977
1978 let key_values3 = build_key_values_with_ts_seq_values(
1979 &metadata,
1980 "m_key".to_string(),
1981 2u32,
1982 vec![2000].into_iter(),
1983 vec![Some(2.0)].into_iter(),
1984 2,
1985 );
1986
1987 converter.append_key_values(&key_values1).unwrap();
1988 converter.append_key_values(&key_values2).unwrap();
1989 converter.append_key_values(&key_values3).unwrap();
1990
1991 let bulk_part = converter.convert().unwrap();
1992
1993 assert_eq!(bulk_part.num_rows(), 3);
1994
1995 let ts_column = bulk_part.batch.column(bulk_part.timestamp_index);
1996 let seq_column = bulk_part.batch.column(bulk_part.batch.num_columns() - 2);
1997
1998 let ts_array = ts_column
1999 .as_any()
2000 .downcast_ref::<TimestampMillisecondArray>()
2001 .unwrap();
2002 let seq_array = seq_column.as_any().downcast_ref::<UInt64Array>().unwrap();
2003
2004 assert_eq!(ts_array.values(), &[1000, 2000, 3000]);
2005 assert_eq!(seq_array.values(), &[1, 2, 3]);
2006
2007 let schema = bulk_part.batch.schema();
2009 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2010 assert_eq!(
2011 field_names,
2012 vec![
2013 "k0",
2014 "k1",
2015 "v0",
2016 "v1",
2017 "ts",
2018 "__primary_key",
2019 "__sequence",
2020 "__op_type"
2021 ]
2022 );
2023 }
2024
2025 #[test]
2026 fn test_bulk_part_converter_empty() {
2027 let metadata = metadata_for_test();
2028 let capacity = 10;
2029 let primary_key_codec = build_primary_key_codec(&metadata);
2030 let schema = to_flat_sst_arrow_schema(
2031 &metadata,
2032 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2033 );
2034
2035 let converter =
2036 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, true);
2037
2038 let bulk_part = converter.convert().unwrap();
2039
2040 assert_eq!(bulk_part.num_rows(), 0);
2041 assert_eq!(bulk_part.min_timestamp, i64::MAX);
2042 assert_eq!(bulk_part.max_timestamp, i64::MIN);
2043 assert_eq!(bulk_part.sequence, SequenceNumber::MIN);
2044
2045 let schema = bulk_part.batch.schema();
2047 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2048 assert_eq!(
2049 field_names,
2050 vec![
2051 "k0",
2052 "k1",
2053 "v0",
2054 "v1",
2055 "ts",
2056 "__primary_key",
2057 "__sequence",
2058 "__op_type"
2059 ]
2060 );
2061 }
2062
2063 #[test]
2064 fn test_bulk_part_converter_without_primary_key_columns() {
2065 let metadata = metadata_for_test();
2066 let primary_key_codec = build_primary_key_codec(&metadata);
2067 let schema = to_flat_sst_arrow_schema(
2068 &metadata,
2069 &FlatSchemaOptions {
2070 raw_pk_columns: false,
2071 string_pk_use_dict: true,
2072 ..Default::default()
2073 },
2074 );
2075
2076 let capacity = 100;
2077 let mut converter =
2078 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec, false);
2079
2080 let key_values1 = build_key_values_with_ts_seq_values(
2081 &metadata,
2082 "key1".to_string(),
2083 1u32,
2084 vec![1000, 2000].into_iter(),
2085 vec![Some(1.0), Some(2.0)].into_iter(),
2086 1,
2087 );
2088
2089 let key_values2 = build_key_values_with_ts_seq_values(
2090 &metadata,
2091 "key2".to_string(),
2092 2u32,
2093 vec![1500].into_iter(),
2094 vec![Some(3.0)].into_iter(),
2095 2,
2096 );
2097
2098 converter.append_key_values(&key_values1).unwrap();
2099 converter.append_key_values(&key_values2).unwrap();
2100
2101 let bulk_part = converter.convert().unwrap();
2102
2103 assert_eq!(bulk_part.num_rows(), 3);
2104 assert_eq!(bulk_part.min_timestamp, 1000);
2105 assert_eq!(bulk_part.max_timestamp, 2000);
2106 assert_eq!(bulk_part.sequence, 2);
2107 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2108
2109 let schema = bulk_part.batch.schema();
2111 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2112 assert_eq!(
2113 field_names,
2114 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2115 );
2116 }
2117
2118 #[allow(clippy::too_many_arguments)]
2119 fn build_key_values_with_sparse_encoding(
2120 metadata: &RegionMetadataRef,
2121 primary_key_codec: &Arc<dyn PrimaryKeyCodec>,
2122 table_id: u32,
2123 tsid: u64,
2124 k0: String,
2125 k1: String,
2126 timestamps: impl Iterator<Item = i64>,
2127 values: impl Iterator<Item = Option<f64>>,
2128 sequence: SequenceNumber,
2129 ) -> KeyValues {
2130 let pk_values = vec![
2132 (ReservedColumnId::table_id(), Value::UInt32(table_id)),
2133 (ReservedColumnId::tsid(), Value::UInt64(tsid)),
2134 (0, Value::String(k0.clone().into())),
2135 (1, Value::String(k1.clone().into())),
2136 ];
2137 let mut encoded_key = Vec::new();
2138 primary_key_codec
2139 .encode_values(&pk_values, &mut encoded_key)
2140 .unwrap();
2141 assert!(!encoded_key.is_empty());
2142
2143 let column_schema = vec![
2145 api::v1::ColumnSchema {
2146 column_name: PRIMARY_KEY_COLUMN_NAME.to_string(),
2147 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2148 ConcreteDataType::binary_datatype(),
2149 )
2150 .unwrap()
2151 .datatype() as i32,
2152 semantic_type: api::v1::SemanticType::Tag as i32,
2153 ..Default::default()
2154 },
2155 api::v1::ColumnSchema {
2156 column_name: "ts".to_string(),
2157 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2158 ConcreteDataType::timestamp_millisecond_datatype(),
2159 )
2160 .unwrap()
2161 .datatype() as i32,
2162 semantic_type: api::v1::SemanticType::Timestamp as i32,
2163 ..Default::default()
2164 },
2165 api::v1::ColumnSchema {
2166 column_name: "v0".to_string(),
2167 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2168 ConcreteDataType::int64_datatype(),
2169 )
2170 .unwrap()
2171 .datatype() as i32,
2172 semantic_type: api::v1::SemanticType::Field as i32,
2173 ..Default::default()
2174 },
2175 api::v1::ColumnSchema {
2176 column_name: "v1".to_string(),
2177 datatype: api::helper::ColumnDataTypeWrapper::try_from(
2178 ConcreteDataType::float64_datatype(),
2179 )
2180 .unwrap()
2181 .datatype() as i32,
2182 semantic_type: api::v1::SemanticType::Field as i32,
2183 ..Default::default()
2184 },
2185 ];
2186
2187 let rows = timestamps
2188 .zip(values)
2189 .map(|(ts, v)| Row {
2190 values: vec![
2191 api::v1::Value {
2192 value_data: Some(api::v1::value::ValueData::BinaryValue(
2193 encoded_key.clone(),
2194 )),
2195 },
2196 api::v1::Value {
2197 value_data: Some(api::v1::value::ValueData::TimestampMillisecondValue(ts)),
2198 },
2199 api::v1::Value {
2200 value_data: Some(api::v1::value::ValueData::I64Value(ts)),
2201 },
2202 api::v1::Value {
2203 value_data: v.map(api::v1::value::ValueData::F64Value),
2204 },
2205 ],
2206 })
2207 .collect();
2208
2209 let mutation = api::v1::Mutation {
2210 op_type: 1,
2211 sequence,
2212 rows: Some(api::v1::Rows {
2213 schema: column_schema,
2214 rows,
2215 }),
2216 write_hint: Some(WriteHint {
2217 primary_key_encoding: api::v1::PrimaryKeyEncoding::Sparse.into(),
2218 }),
2219 };
2220 KeyValues::new(metadata.as_ref(), mutation).unwrap()
2221 }
2222
2223 #[test]
2224 fn test_bulk_part_converter_sparse_primary_key_encoding() {
2225 use api::v1::SemanticType;
2226 use datatypes::schema::ColumnSchema;
2227 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2228 use store_api::storage::RegionId;
2229
2230 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2231 builder
2232 .push_column_metadata(ColumnMetadata {
2233 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2234 semantic_type: SemanticType::Tag,
2235 column_id: 0,
2236 })
2237 .push_column_metadata(ColumnMetadata {
2238 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2239 semantic_type: SemanticType::Tag,
2240 column_id: 1,
2241 })
2242 .push_column_metadata(ColumnMetadata {
2243 column_schema: ColumnSchema::new(
2244 "ts",
2245 ConcreteDataType::timestamp_millisecond_datatype(),
2246 false,
2247 ),
2248 semantic_type: SemanticType::Timestamp,
2249 column_id: 2,
2250 })
2251 .push_column_metadata(ColumnMetadata {
2252 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2253 semantic_type: SemanticType::Field,
2254 column_id: 3,
2255 })
2256 .push_column_metadata(ColumnMetadata {
2257 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2258 semantic_type: SemanticType::Field,
2259 column_id: 4,
2260 })
2261 .primary_key(vec![0, 1])
2262 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2263 let metadata = Arc::new(builder.build().unwrap());
2264
2265 let primary_key_codec = build_primary_key_codec(&metadata);
2266 let schema = to_flat_sst_arrow_schema(
2267 &metadata,
2268 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2269 );
2270
2271 assert_eq!(metadata.primary_key_encoding, PrimaryKeyEncoding::Sparse);
2272 assert_eq!(primary_key_codec.encoding(), PrimaryKeyEncoding::Sparse);
2273
2274 let capacity = 100;
2275 let mut converter =
2276 BulkPartConverter::new(&metadata, schema, capacity, primary_key_codec.clone(), true);
2277
2278 let key_values1 = build_key_values_with_sparse_encoding(
2279 &metadata,
2280 &primary_key_codec,
2281 2048u32, 100u64, "key11".to_string(),
2284 "key21".to_string(),
2285 vec![1000, 2000].into_iter(),
2286 vec![Some(1.0), Some(2.0)].into_iter(),
2287 1,
2288 );
2289
2290 let key_values2 = build_key_values_with_sparse_encoding(
2291 &metadata,
2292 &primary_key_codec,
2293 4096u32, 200u64, "key12".to_string(),
2296 "key22".to_string(),
2297 vec![1500].into_iter(),
2298 vec![Some(3.0)].into_iter(),
2299 2,
2300 );
2301
2302 converter.append_key_values(&key_values1).unwrap();
2303 converter.append_key_values(&key_values2).unwrap();
2304
2305 let bulk_part = converter.convert().unwrap();
2306
2307 assert_eq!(bulk_part.num_rows(), 3);
2308 assert_eq!(bulk_part.min_timestamp, 1000);
2309 assert_eq!(bulk_part.max_timestamp, 2000);
2310 assert_eq!(bulk_part.sequence, 2);
2311 assert_eq!(bulk_part.timestamp_index, bulk_part.batch.num_columns() - 4);
2312
2313 let schema = bulk_part.batch.schema();
2317 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2318 assert_eq!(
2319 field_names,
2320 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2321 );
2322
2323 let primary_key_column = bulk_part.batch.column_by_name("__primary_key").unwrap();
2325 let dict_array = primary_key_column
2326 .as_any()
2327 .downcast_ref::<DictionaryArray<UInt32Type>>()
2328 .unwrap();
2329
2330 assert!(!dict_array.is_empty());
2332 assert_eq!(dict_array.len(), 3); let values = dict_array
2336 .values()
2337 .as_any()
2338 .downcast_ref::<BinaryArray>()
2339 .unwrap();
2340 for i in 0..values.len() {
2341 assert!(
2342 !values.value(i).is_empty(),
2343 "Encoded primary key should not be empty"
2344 );
2345 }
2346 }
2347
2348 #[test]
2349 fn test_convert_bulk_part_empty() {
2350 let metadata = metadata_for_test();
2351 let schema = to_flat_sst_arrow_schema(
2352 &metadata,
2353 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2354 );
2355 let primary_key_codec = build_primary_key_codec(&metadata);
2356
2357 let empty_batch = RecordBatch::new_empty(schema.clone());
2359 let empty_part = BulkPart {
2360 batch: empty_batch,
2361 max_timestamp: 0,
2362 min_timestamp: 0,
2363 sequence: 0,
2364 timestamp_index: 0,
2365 raw_data: None,
2366 };
2367
2368 let result =
2369 convert_bulk_part(empty_part, &metadata, primary_key_codec, schema, true).unwrap();
2370 assert!(result.is_none());
2371 }
2372
2373 #[test]
2374 fn test_convert_bulk_part_dense_with_pk_columns() {
2375 let metadata = metadata_for_test();
2376 let primary_key_codec = build_primary_key_codec(&metadata);
2377
2378 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2379 "key1", "key2", "key1",
2380 ]));
2381 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2, 1]));
2382 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200, 300]));
2383 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0, 3.0]));
2384 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000, 1500]));
2385
2386 let input_schema = Arc::new(Schema::new(vec![
2387 Field::new("k0", ArrowDataType::Utf8, false),
2388 Field::new("k1", ArrowDataType::UInt32, false),
2389 Field::new("v0", ArrowDataType::Int64, true),
2390 Field::new("v1", ArrowDataType::Float64, true),
2391 Field::new(
2392 "ts",
2393 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2394 false,
2395 ),
2396 ]));
2397
2398 let input_batch = RecordBatch::try_new(
2399 input_schema,
2400 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2401 )
2402 .unwrap();
2403
2404 let part = BulkPart {
2405 batch: input_batch,
2406 max_timestamp: 2000,
2407 min_timestamp: 1000,
2408 sequence: 5,
2409 timestamp_index: 4,
2410 raw_data: None,
2411 };
2412
2413 let output_schema = to_flat_sst_arrow_schema(
2414 &metadata,
2415 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2416 );
2417
2418 let result = convert_bulk_part(
2419 part,
2420 &metadata,
2421 primary_key_codec,
2422 output_schema,
2423 true, )
2425 .unwrap();
2426
2427 let converted = result.unwrap();
2428
2429 assert_eq!(converted.num_rows(), 3);
2430 assert_eq!(converted.max_timestamp, 2000);
2431 assert_eq!(converted.min_timestamp, 1000);
2432 assert_eq!(converted.sequence, 5);
2433
2434 let schema = converted.batch.schema();
2435 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2436 assert_eq!(
2437 field_names,
2438 vec![
2439 "k0",
2440 "k1",
2441 "v0",
2442 "v1",
2443 "ts",
2444 "__primary_key",
2445 "__sequence",
2446 "__op_type"
2447 ]
2448 );
2449
2450 let k0_col = converted.batch.column_by_name("k0").unwrap();
2451 assert!(matches!(
2452 k0_col.data_type(),
2453 ArrowDataType::Dictionary(_, _)
2454 ));
2455
2456 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2457 let dict_array = pk_col
2458 .as_any()
2459 .downcast_ref::<DictionaryArray<UInt32Type>>()
2460 .unwrap();
2461 let keys = dict_array.keys();
2462
2463 assert_eq!(keys.len(), 3);
2464 }
2465
2466 #[test]
2467 fn test_convert_bulk_part_dense_without_pk_columns() {
2468 let metadata = metadata_for_test();
2469 let primary_key_codec = build_primary_key_codec(&metadata);
2470
2471 let k0_array = Arc::new(arrow::array::StringArray::from(vec!["key1", "key2"]));
2473 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![1, 2]));
2474 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2475 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2476 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2477
2478 let input_schema = Arc::new(Schema::new(vec![
2479 Field::new("k0", ArrowDataType::Utf8, false),
2480 Field::new("k1", ArrowDataType::UInt32, false),
2481 Field::new("v0", ArrowDataType::Int64, true),
2482 Field::new("v1", ArrowDataType::Float64, true),
2483 Field::new(
2484 "ts",
2485 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2486 false,
2487 ),
2488 ]));
2489
2490 let input_batch = RecordBatch::try_new(
2491 input_schema,
2492 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2493 )
2494 .unwrap();
2495
2496 let part = BulkPart {
2497 batch: input_batch,
2498 max_timestamp: 2000,
2499 min_timestamp: 1000,
2500 sequence: 3,
2501 timestamp_index: 4,
2502 raw_data: None,
2503 };
2504
2505 let output_schema = to_flat_sst_arrow_schema(
2506 &metadata,
2507 &FlatSchemaOptions {
2508 raw_pk_columns: false,
2509 string_pk_use_dict: true,
2510 ..Default::default()
2511 },
2512 );
2513
2514 let result = convert_bulk_part(
2515 part,
2516 &metadata,
2517 primary_key_codec,
2518 output_schema,
2519 false, )
2521 .unwrap();
2522
2523 let converted = result.unwrap();
2524
2525 assert_eq!(converted.num_rows(), 2);
2526 assert_eq!(converted.max_timestamp, 2000);
2527 assert_eq!(converted.min_timestamp, 1000);
2528 assert_eq!(converted.sequence, 3);
2529
2530 let schema = converted.batch.schema();
2532 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2533 assert_eq!(
2534 field_names,
2535 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2536 );
2537
2538 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2540 assert!(matches!(
2541 pk_col.data_type(),
2542 ArrowDataType::Dictionary(_, _)
2543 ));
2544 }
2545
2546 #[test]
2547 fn test_convert_bulk_part_sparse_encoding() {
2548 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2549 builder
2550 .push_column_metadata(ColumnMetadata {
2551 column_schema: ColumnSchema::new("k0", ConcreteDataType::string_datatype(), false),
2552 semantic_type: SemanticType::Tag,
2553 column_id: 0,
2554 })
2555 .push_column_metadata(ColumnMetadata {
2556 column_schema: ColumnSchema::new("k1", ConcreteDataType::string_datatype(), false),
2557 semantic_type: SemanticType::Tag,
2558 column_id: 1,
2559 })
2560 .push_column_metadata(ColumnMetadata {
2561 column_schema: ColumnSchema::new(
2562 "ts",
2563 ConcreteDataType::timestamp_millisecond_datatype(),
2564 false,
2565 ),
2566 semantic_type: SemanticType::Timestamp,
2567 column_id: 2,
2568 })
2569 .push_column_metadata(ColumnMetadata {
2570 column_schema: ColumnSchema::new("v0", ConcreteDataType::int64_datatype(), true),
2571 semantic_type: SemanticType::Field,
2572 column_id: 3,
2573 })
2574 .push_column_metadata(ColumnMetadata {
2575 column_schema: ColumnSchema::new("v1", ConcreteDataType::float64_datatype(), true),
2576 semantic_type: SemanticType::Field,
2577 column_id: 4,
2578 })
2579 .primary_key(vec![0, 1])
2580 .primary_key_encoding(PrimaryKeyEncoding::Sparse);
2581 let metadata = Arc::new(builder.build().unwrap());
2582
2583 let primary_key_codec = build_primary_key_codec(&metadata);
2584
2585 let pk_array = Arc::new(arrow::array::BinaryArray::from(vec![
2587 b"encoded_key_1".as_slice(),
2588 b"encoded_key_2".as_slice(),
2589 ]));
2590 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![100, 200]));
2591 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![1.0, 2.0]));
2592 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![1000, 2000]));
2593
2594 let input_schema = Arc::new(Schema::new(vec![
2595 Field::new("__primary_key", ArrowDataType::Binary, false),
2596 Field::new("v0", ArrowDataType::Int64, true),
2597 Field::new("v1", ArrowDataType::Float64, true),
2598 Field::new(
2599 "ts",
2600 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2601 false,
2602 ),
2603 ]));
2604
2605 let input_batch =
2606 RecordBatch::try_new(input_schema, vec![pk_array, v0_array, v1_array, ts_array])
2607 .unwrap();
2608
2609 let part = BulkPart {
2610 batch: input_batch,
2611 max_timestamp: 2000,
2612 min_timestamp: 1000,
2613 sequence: 7,
2614 timestamp_index: 3,
2615 raw_data: None,
2616 };
2617
2618 let output_schema = to_flat_sst_arrow_schema(
2619 &metadata,
2620 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2621 );
2622
2623 let result = convert_bulk_part(
2624 part,
2625 &metadata,
2626 primary_key_codec,
2627 output_schema,
2628 true, )
2630 .unwrap();
2631
2632 let converted = result.unwrap();
2633
2634 assert_eq!(converted.num_rows(), 2);
2635 assert_eq!(converted.max_timestamp, 2000);
2636 assert_eq!(converted.min_timestamp, 1000);
2637 assert_eq!(converted.sequence, 7);
2638
2639 let schema = converted.batch.schema();
2641 let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
2642 assert_eq!(
2643 field_names,
2644 vec!["v0", "v1", "ts", "__primary_key", "__sequence", "__op_type"]
2645 );
2646
2647 let pk_col = converted.batch.column_by_name("__primary_key").unwrap();
2649 assert!(matches!(
2650 pk_col.data_type(),
2651 ArrowDataType::Dictionary(_, _)
2652 ));
2653 }
2654
2655 #[test]
2656 fn test_convert_bulk_part_sorting_with_multiple_series() {
2657 let metadata = metadata_for_test();
2658 let primary_key_codec = build_primary_key_codec(&metadata);
2659
2660 let k0_array = Arc::new(arrow::array::StringArray::from(vec![
2662 "series_b", "series_a", "series_b", "series_a",
2663 ]));
2664 let k1_array = Arc::new(arrow::array::UInt32Array::from(vec![2, 1, 2, 1]));
2665 let v0_array = Arc::new(arrow::array::Int64Array::from(vec![200, 100, 400, 300]));
2666 let v1_array = Arc::new(arrow::array::Float64Array::from(vec![2.0, 1.0, 4.0, 3.0]));
2667 let ts_array = Arc::new(TimestampMillisecondArray::from(vec![
2668 2000, 1000, 4000, 3000,
2669 ]));
2670
2671 let input_schema = Arc::new(Schema::new(vec![
2672 Field::new("k0", ArrowDataType::Utf8, false),
2673 Field::new("k1", ArrowDataType::UInt32, false),
2674 Field::new("v0", ArrowDataType::Int64, true),
2675 Field::new("v1", ArrowDataType::Float64, true),
2676 Field::new(
2677 "ts",
2678 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Millisecond, None),
2679 false,
2680 ),
2681 ]));
2682
2683 let input_batch = RecordBatch::try_new(
2684 input_schema,
2685 vec![k0_array, k1_array, v0_array, v1_array, ts_array],
2686 )
2687 .unwrap();
2688
2689 let part = BulkPart {
2690 batch: input_batch,
2691 max_timestamp: 4000,
2692 min_timestamp: 1000,
2693 sequence: 10,
2694 timestamp_index: 4,
2695 raw_data: None,
2696 };
2697
2698 let output_schema = to_flat_sst_arrow_schema(
2699 &metadata,
2700 &FlatSchemaOptions::from_encoding(metadata.primary_key_encoding),
2701 );
2702
2703 let result =
2704 convert_bulk_part(part, &metadata, primary_key_codec, output_schema, true).unwrap();
2705
2706 let converted = result.unwrap();
2707
2708 assert_eq!(converted.num_rows(), 4);
2709
2710 let ts_col = converted.batch.column(converted.timestamp_index);
2712 let ts_array = ts_col
2713 .as_any()
2714 .downcast_ref::<TimestampMillisecondArray>()
2715 .unwrap();
2716
2717 let timestamps: Vec<i64> = ts_array.values().to_vec();
2721 assert_eq!(timestamps, vec![1000, 3000, 2000, 4000]);
2722 }
2723
2724 fn build_converted_bulk_part(inputs: &[MutationInput]) -> BulkPart {
2726 let metadata = metadata_for_test();
2727 let kvs = inputs
2728 .iter()
2729 .map(|m| {
2730 build_key_values_with_ts_seq_values(
2731 &metadata,
2732 m.k0.to_string(),
2733 m.k1,
2734 m.timestamps.iter().copied(),
2735 m.v1.iter().copied(),
2736 m.sequence,
2737 )
2738 })
2739 .collect::<Vec<_>>();
2740 let schema = to_flat_sst_arrow_schema(&metadata, &FlatSchemaOptions::default());
2741 let primary_key_codec = build_primary_key_codec(&metadata);
2742 let mut converter = BulkPartConverter::new(&metadata, schema, 64, primary_key_codec, true);
2743 for kv in kvs {
2744 converter.append_key_values(&kv).unwrap();
2745 }
2746 converter.convert().unwrap()
2747 }
2748
2749 fn build_multi_bulk_part(groups: &[&[MutationInput]]) -> (MultiBulkPart, RegionMetadataRef) {
2751 let metadata = metadata_for_test();
2752 let mut all_batches = Vec::new();
2753 let mut min_ts = i64::MAX;
2754 let mut max_ts = i64::MIN;
2755 let mut max_seq = 0u64;
2756
2757 for inputs in groups {
2758 let part = build_converted_bulk_part(inputs);
2759 min_ts = min_ts.min(part.min_timestamp);
2760 max_ts = max_ts.max(part.max_timestamp);
2761 max_seq = max_seq.max(part.sequence);
2762 all_batches.push(part.batch);
2763 }
2764
2765 let multi = MultiBulkPart::new(
2766 all_batches,
2767 min_ts,
2768 max_ts,
2769 max_seq,
2770 groups.len(),
2771 &metadata,
2772 );
2773 (multi, metadata)
2774 }
2775
2776 #[test]
2777 fn test_multi_bulk_part_prune_batches() {
2778 let (multi, metadata) = build_multi_bulk_part(&[
2780 &[MutationInput {
2781 k0: "a",
2782 k1: 0,
2783 timestamps: &[1, 2],
2784 v1: &[Some(1.0), Some(2.0)],
2785 sequence: 0,
2786 }],
2787 &[MutationInput {
2788 k0: "m",
2789 k1: 0,
2790 timestamps: &[3, 4],
2791 v1: &[Some(3.0), Some(4.0)],
2792 sequence: 1,
2793 }],
2794 &[MutationInput {
2795 k0: "z",
2796 k1: 0,
2797 timestamps: &[5, 6],
2798 v1: &[Some(5.0), Some(6.0)],
2799 sequence: 2,
2800 }],
2801 ]);
2802 assert_eq!(multi.num_rows(), 6);
2803 assert_eq!(multi.num_batches(), 3);
2804
2805 let context = Arc::new(
2807 BulkIterContext::new(
2808 metadata.clone(),
2809 None,
2810 Some(Predicate::new(vec![
2811 datafusion_expr::col("k0").eq(datafusion_expr::lit("m")),
2812 ])),
2813 false,
2814 )
2815 .unwrap(),
2816 );
2817 let reader = multi
2818 .read(context, None, None)
2819 .unwrap()
2820 .expect("should have results");
2821 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2822 assert_eq!(total_rows, 2);
2823
2824 let context = Arc::new(
2826 BulkIterContext::new(
2827 metadata.clone(),
2828 None,
2829 Some(Predicate::new(vec![
2830 datafusion_expr::col("k0").eq(datafusion_expr::lit("nonexistent")),
2831 ])),
2832 false,
2833 )
2834 .unwrap(),
2835 );
2836 assert!(multi.read(context, None, None).unwrap().is_none());
2837
2838 let context = Arc::new(BulkIterContext::new(metadata.clone(), None, None, false).unwrap());
2840 let reader = multi
2841 .read(context, None, None)
2842 .unwrap()
2843 .expect("should have results");
2844 let total_rows: usize = reader.map(|r| r.unwrap().num_rows()).sum();
2845 assert_eq!(total_rows, 6);
2846 }
2847}