1use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use mito_codec::row_converter::PrimaryKeyFilter;
21use parquet::arrow::ProjectionMask;
22use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
23use snafu::ResultExt;
24use store_api::storage::SequenceRange;
25
26use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
27use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
28use crate::memtable::bulk::part::EncodedBulkPart;
29use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
30use crate::memtable::{MemScanMetrics, MemScanMetricsData};
31use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
32use crate::sst::parquet::file_range::TagDecodeState;
33use crate::sst::parquet::flat_format::{primary_key_column_index, sequence_column_index};
34use crate::sst::parquet::prefilter::{CachedPrimaryKeyFilter, prefilter_flat_batch_by_primary_key};
35
36pub struct EncodedBulkPartIter {
38 context: BulkIterContextRef,
39 row_groups_to_read: VecDeque<usize>,
40 current_reader: Option<ParquetRecordBatchReader>,
41 builder: MemtableRowGroupReaderBuilder,
42 sequence: Option<SequenceRange>,
44 current_skip_fields: bool,
46 pk_filter: Option<CachedPrimaryKeyFilter>,
48 metrics: MemScanMetricsData,
50 mem_scan_metrics: Option<MemScanMetrics>,
52}
53
54impl EncodedBulkPartIter {
55 pub fn try_new(
57 encoded_part: &EncodedBulkPart,
58 context: BulkIterContextRef,
59 mut row_groups_to_read: VecDeque<usize>,
60 sequence: Option<SequenceRange>,
61 mem_scan_metrics: Option<MemScanMetrics>,
62 ) -> error::Result<Self> {
63 assert!(context.read_format().as_flat().is_some());
64
65 let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
66 let data = encoded_part.data().clone();
67 let series_count = encoded_part.metadata().num_series as usize;
68
69 let projection_mask = ProjectionMask::roots(
70 parquet_meta.file_metadata().schema_descr(),
71 context.read_format().projection_indices().iter().copied(),
72 );
73 let builder =
74 MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
75
76 let pk_filter = context.build_pk_filter();
78
79 let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
80 Some(first_row_group) => {
81 let skip_fields = context.pre_filter_mode().skip_fields();
82 let reader = builder.build_row_group_reader(first_row_group, None)?;
83 (Some(reader), skip_fields)
84 }
85 None => (None, false),
86 };
87
88 Ok(Self {
89 context,
90 row_groups_to_read,
91 current_reader: init_reader,
92 builder,
93 sequence,
94 current_skip_fields,
95 pk_filter,
96 metrics: MemScanMetricsData {
97 total_series: series_count,
98 ..Default::default()
99 },
100 mem_scan_metrics,
101 })
102 }
103
104 fn report_mem_scan_metrics(&mut self) {
105 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
106 mem_scan_metrics.merge_inner(&self.metrics);
107 }
108 }
109
110 pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
112 let start = Instant::now();
113
114 let Some(current) = &mut self.current_reader else {
115 self.metrics.scan_cost += start.elapsed();
117 return Ok(None);
118 };
119
120 for batch in current {
121 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
122 if let Some(batch) = apply_combined_filters(
123 &self.context,
124 &self.sequence,
125 batch,
126 self.current_skip_fields,
127 self.pk_filter
128 .as_mut()
129 .map(|f| f as &mut dyn PrimaryKeyFilter),
130 &mut self.metrics,
131 )? {
132 self.metrics.num_batches += 1;
134 self.metrics.num_rows += batch.num_rows();
135 self.metrics.scan_cost += start.elapsed();
136 return Ok(Some(batch));
137 }
138 }
139
140 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
142 self.current_skip_fields = self.context.pre_filter_mode().skip_fields();
144
145 let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
146 let current = self.current_reader.insert(next_reader);
147
148 for batch in current {
149 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
150 if let Some(batch) = apply_combined_filters(
151 &self.context,
152 &self.sequence,
153 batch,
154 self.current_skip_fields,
155 self.pk_filter
156 .as_mut()
157 .map(|f| f as &mut dyn PrimaryKeyFilter),
158 &mut self.metrics,
159 )? {
160 self.metrics.num_batches += 1;
162 self.metrics.num_rows += batch.num_rows();
163 self.metrics.scan_cost += start.elapsed();
164 return Ok(Some(batch));
165 }
166 }
167 }
168
169 self.metrics.scan_cost += start.elapsed();
170 Ok(None)
171 }
172}
173
174impl Iterator for EncodedBulkPartIter {
175 type Item = error::Result<RecordBatch>;
176
177 fn next(&mut self) -> Option<Self::Item> {
178 let result = self.next_record_batch().transpose();
179
180 if result.is_none() {
182 self.report_mem_scan_metrics();
183 }
184
185 result
186 }
187}
188
189impl Drop for EncodedBulkPartIter {
190 fn drop(&mut self) {
191 common_telemetry::debug!(
192 "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
193 self.context.region_id(),
194 self.metrics.total_series,
195 self.metrics.num_rows,
196 self.metrics.num_batches,
197 self.metrics.scan_cost,
198 self.metrics.prefilter_cost,
199 self.metrics.prefilter_rows_filtered
200 );
201
202 self.report_mem_scan_metrics();
204
205 READ_ROWS_TOTAL
206 .with_label_values(&["bulk_memtable"])
207 .inc_by(self.metrics.num_rows as u64);
208 READ_STAGE_ELAPSED
209 .with_label_values(&["scan_memtable"])
210 .observe(self.metrics.scan_cost.as_secs_f64());
211 }
212}
213
214pub struct BulkPartBatchIter {
218 batches: VecDeque<RecordBatch>,
220 context: BulkIterContextRef,
222 sequence: Option<SequenceRange>,
224 pk_filter: Option<CachedPrimaryKeyFilter>,
226 metrics: MemScanMetricsData,
228 mem_scan_metrics: Option<MemScanMetrics>,
230}
231
232impl BulkPartBatchIter {
233 pub fn new(
235 batches: Vec<RecordBatch>,
236 context: BulkIterContextRef,
237 sequence: Option<SequenceRange>,
238 series_count: usize,
239 mem_scan_metrics: Option<MemScanMetrics>,
240 ) -> Self {
241 assert!(context.read_format().as_flat().is_some());
242
243 let pk_filter = context.build_pk_filter();
244
245 Self {
246 batches: VecDeque::from(batches),
247 context,
248 sequence,
249 pk_filter,
250 metrics: MemScanMetricsData {
251 total_series: series_count,
252 ..Default::default()
253 },
254 mem_scan_metrics,
255 }
256 }
257
258 pub fn from_single(
260 record_batch: RecordBatch,
261 context: BulkIterContextRef,
262 sequence: Option<SequenceRange>,
263 series_count: usize,
264 mem_scan_metrics: Option<MemScanMetrics>,
265 ) -> Self {
266 Self::new(
267 vec![record_batch],
268 context,
269 sequence,
270 series_count,
271 mem_scan_metrics,
272 )
273 }
274
275 fn report_mem_scan_metrics(&mut self) {
276 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
277 mem_scan_metrics.merge_inner(&self.metrics);
278 }
279 }
280
281 fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
283 let projection_indices = self.context.read_format().projection_indices();
284 if projection_indices.len() == record_batch.num_columns() {
285 return Ok(record_batch);
286 }
287
288 record_batch
289 .project(projection_indices)
290 .context(ComputeArrowSnafu)
291 }
292
293 fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
294 let start = Instant::now();
295
296 let projected_batch = self.apply_projection(record_batch)?;
298
299 let skip_fields = self.context.pre_filter_mode().skip_fields();
301
302 let Some(filtered_batch) = apply_combined_filters(
303 &self.context,
304 &self.sequence,
305 projected_batch,
306 skip_fields,
307 self.pk_filter
308 .as_mut()
309 .map(|f| f as &mut dyn PrimaryKeyFilter),
310 &mut self.metrics,
311 )?
312 else {
313 self.metrics.scan_cost += start.elapsed();
314 return Ok(None);
315 };
316
317 self.metrics.num_batches += 1;
319 self.metrics.num_rows += filtered_batch.num_rows();
320 self.metrics.scan_cost += start.elapsed();
321
322 Ok(Some(filtered_batch))
323 }
324}
325
326impl Iterator for BulkPartBatchIter {
327 type Item = error::Result<RecordBatch>;
328
329 fn next(&mut self) -> Option<Self::Item> {
330 while let Some(batch) = self.batches.pop_front() {
332 match self.process_batch(batch) {
333 Ok(Some(result)) => return Some(Ok(result)),
334 Ok(None) => continue, Err(e) => {
336 self.report_mem_scan_metrics();
337 return Some(Err(e));
338 }
339 }
340 }
341
342 self.report_mem_scan_metrics();
344 None
345 }
346}
347
348impl Drop for BulkPartBatchIter {
349 fn drop(&mut self) {
350 common_telemetry::debug!(
351 "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}, prefilter_cost={:?}, prefilter_rows_filtered={}",
352 self.context.region_id(),
353 self.metrics.total_series,
354 self.metrics.num_rows,
355 self.metrics.num_batches,
356 self.metrics.scan_cost,
357 self.metrics.prefilter_cost,
358 self.metrics.prefilter_rows_filtered
359 );
360
361 self.report_mem_scan_metrics();
363
364 READ_ROWS_TOTAL
365 .with_label_values(&["bulk_memtable"])
366 .inc_by(self.metrics.num_rows as u64);
367 READ_STAGE_ELAPSED
368 .with_label_values(&["scan_memtable"])
369 .observe(self.metrics.scan_cost.as_secs_f64());
370 }
371}
372
373fn apply_combined_filters(
379 context: &BulkIterContext,
380 sequence: &Option<SequenceRange>,
381 record_batch: RecordBatch,
382 skip_fields: bool,
383 pk_filter: Option<&mut dyn PrimaryKeyFilter>,
384 metrics: &mut MemScanMetricsData,
385) -> error::Result<Option<RecordBatch>> {
386 let has_pk_prefilter = pk_filter.is_some();
388 let record_batch = if let Some(pk_filter) = pk_filter {
389 let rows_before = record_batch.num_rows();
390 let prefilter_start = Instant::now();
391 let pk_col_idx = primary_key_column_index(record_batch.num_columns());
392 match prefilter_flat_batch_by_primary_key(record_batch, pk_col_idx, pk_filter)? {
393 Some(batch) => {
394 metrics.prefilter_cost += prefilter_start.elapsed();
395 metrics.prefilter_rows_filtered += rows_before - batch.num_rows();
396 batch
397 }
398 None => {
399 metrics.prefilter_cost += prefilter_start.elapsed();
400 metrics.prefilter_rows_filtered += rows_before;
401 return Ok(None);
402 }
403 }
404 } else {
405 record_batch
406 };
407
408 let format = context.read_format().as_flat().unwrap();
410 let record_batch = format.convert_batch(record_batch, None)?;
411
412 let num_rows = record_batch.num_rows();
413 let mut combined_filter = None;
414 let mut tag_decode_state = TagDecodeState::new();
415
416 if !context.base.filters.is_empty() {
418 let predicate_mask = context.base.compute_filter_mask_flat(
419 &record_batch,
420 skip_fields,
421 has_pk_prefilter,
422 &mut tag_decode_state,
423 )?;
424 let Some(mask) = predicate_mask else {
426 return Ok(None);
427 };
428 combined_filter = Some(BooleanArray::from(mask));
429 }
430
431 if let Some(sequence) = sequence {
433 let sequence_column =
434 record_batch.column(sequence_column_index(record_batch.num_columns()));
435 let sequence_filter = sequence
436 .filter(&sequence_column)
437 .context(ComputeArrowSnafu)?;
438 combined_filter = match combined_filter {
440 None => Some(sequence_filter),
441 Some(existing_filter) => {
442 let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
443 .context(ComputeArrowSnafu)?;
444 Some(and_result)
445 }
446 };
447 }
448
449 let Some(filter_array) = combined_filter else {
451 return Ok(Some(record_batch));
453 };
454 let select_count = filter_array.true_count();
455 if select_count == 0 {
456 return Ok(None);
457 }
458 if select_count == num_rows {
459 return Ok(Some(record_batch));
460 }
461 let filtered_batch =
462 datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
463 .context(ComputeArrowSnafu)?;
464
465 Ok(Some(filtered_batch))
466}
467
468#[cfg(test)]
469mod tests {
470 use std::sync::Arc;
471
472 use api::v1::SemanticType;
473 use datafusion_expr::{col, lit};
474 use datatypes::arrow::array::{
475 ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
476 UInt64Array,
477 };
478 use datatypes::arrow::datatypes::{DataType, Field, Schema};
479 use datatypes::data_type::ConcreteDataType;
480 use datatypes::schema::ColumnSchema;
481 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
482 use store_api::storage::RegionId;
483 use table::predicate::Predicate;
484
485 use super::*;
486 use crate::memtable::bulk::context::BulkIterContext;
487 use crate::test_util::sst_util::new_primary_key;
488
489 #[test]
490 fn test_bulk_part_batch_iter() {
491 let schema = Arc::new(Schema::new(vec![
493 Field::new("key1", DataType::Utf8, false),
494 Field::new("field1", DataType::Int64, false),
495 Field::new(
496 "timestamp",
497 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
498 false,
499 ),
500 Field::new(
501 "__primary_key",
502 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
503 false,
504 ),
505 Field::new("__sequence", DataType::UInt64, false),
506 Field::new("__op_type", DataType::UInt8, false),
507 ]));
508
509 let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
511 let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
512 let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
513 vec![1000, 2000, 3000],
514 ));
515
516 use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
518 let pk1 = new_primary_key(&["key1"]);
519 let pk2 = new_primary_key(&["key2"]);
520 let pk3 = new_primary_key(&["key3"]);
521 let values = Arc::new(BinaryArray::from_iter_values([
522 pk1.as_slice(),
523 pk2.as_slice(),
524 pk3.as_slice(),
525 ]));
526 let keys = UInt32Array::from(vec![0, 1, 2]);
527 let primary_key = Arc::new(DictionaryArray::new(keys, values));
528
529 let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
530 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); let record_batch = RecordBatch::try_new(
533 schema,
534 vec![
535 key1,
536 field1,
537 timestamp,
538 primary_key.clone(),
539 sequence,
540 op_type,
541 ],
542 )
543 .unwrap();
544
545 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
547 builder
548 .push_column_metadata(ColumnMetadata {
549 column_schema: ColumnSchema::new(
550 "key1",
551 ConcreteDataType::string_datatype(),
552 false,
553 ),
554 semantic_type: SemanticType::Tag,
555 column_id: 0,
556 })
557 .push_column_metadata(ColumnMetadata {
558 column_schema: ColumnSchema::new(
559 "field1",
560 ConcreteDataType::int64_datatype(),
561 false,
562 ),
563 semantic_type: SemanticType::Field,
564 column_id: 1,
565 })
566 .push_column_metadata(ColumnMetadata {
567 column_schema: ColumnSchema::new(
568 "timestamp",
569 ConcreteDataType::timestamp_millisecond_datatype(),
570 false,
571 ),
572 semantic_type: SemanticType::Timestamp,
573 column_id: 2,
574 })
575 .primary_key(vec![0]);
576
577 let region_metadata = builder.build().unwrap();
578
579 let context = Arc::new(
581 BulkIterContext::new(
582 Arc::new(region_metadata.clone()),
583 None, None, false,
586 )
587 .unwrap(),
588 );
589 let iter =
591 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
592 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
593 assert_eq!(1, result.len());
594 assert_eq!(3, result[0].num_rows());
595 assert_eq!(6, result[0].num_columns(),);
596
597 let iter = BulkPartBatchIter::from_single(
599 record_batch.clone(),
600 context,
601 Some(SequenceRange::LtEq { max: 2 }),
602 0,
603 None,
604 );
605 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
606 assert_eq!(1, result.len());
607 let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
608 assert_eq!(
609 &expect_sequence,
610 result[0].column(result[0].num_columns() - 2)
611 );
612 assert_eq!(6, result[0].num_columns());
613
614 let context = Arc::new(
615 BulkIterContext::new(
616 Arc::new(region_metadata),
617 Some(&[0, 2]),
618 Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
619 false,
620 )
621 .unwrap(),
622 );
623 let iter =
625 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
626 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
627 assert_eq!(1, result.len());
628 assert_eq!(1, result[0].num_rows());
629 assert_eq!(5, result[0].num_columns());
630 let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
631 assert_eq!(
632 &expect_sequence,
633 result[0].column(result[0].num_columns() - 2)
634 );
635 }
636
637 #[test]
638 fn test_bulk_part_batch_iter_multiple_batches() {
639 let schema = Arc::new(Schema::new(vec![
641 Field::new("key1", DataType::Utf8, false),
642 Field::new("field1", DataType::Int64, false),
643 Field::new(
644 "timestamp",
645 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
646 false,
647 ),
648 Field::new(
649 "__primary_key",
650 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
651 false,
652 ),
653 Field::new("__sequence", DataType::UInt64, false),
654 Field::new("__op_type", DataType::UInt8, false),
655 ]));
656
657 let pk1 = new_primary_key(&["key1"]);
659 let pk2 = new_primary_key(&["key2"]);
660 let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
661 let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
662 let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
663 vec![1000, 2000],
664 ));
665 let values_1 = Arc::new(BinaryArray::from_iter_values([
666 pk1.as_slice(),
667 pk2.as_slice(),
668 ]));
669 let keys_1 = UInt32Array::from(vec![0, 1]);
670 let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
671 let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
672 let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
673
674 let batch1 = RecordBatch::try_new(
675 schema.clone(),
676 vec![
677 key1_1,
678 field1_1,
679 timestamp_1,
680 primary_key_1,
681 sequence_1,
682 op_type_1,
683 ],
684 )
685 .unwrap();
686
687 let pk3 = new_primary_key(&["key3"]);
689 let pk4 = new_primary_key(&["key4"]);
690 let pk5 = new_primary_key(&["key5"]);
691 let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
692 let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
693 let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
694 vec![3000, 4000, 5000],
695 ));
696 let values_2 = Arc::new(BinaryArray::from_iter_values([
697 pk3.as_slice(),
698 pk4.as_slice(),
699 pk5.as_slice(),
700 ]));
701 let keys_2 = UInt32Array::from(vec![0, 1, 2]);
702 let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
703 let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
704 let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
705
706 let batch2 = RecordBatch::try_new(
707 schema.clone(),
708 vec![
709 key1_2,
710 field1_2,
711 timestamp_2,
712 primary_key_2,
713 sequence_2,
714 op_type_2,
715 ],
716 )
717 .unwrap();
718
719 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
721 builder
722 .push_column_metadata(ColumnMetadata {
723 column_schema: ColumnSchema::new(
724 "key1",
725 ConcreteDataType::string_datatype(),
726 false,
727 ),
728 semantic_type: SemanticType::Tag,
729 column_id: 0,
730 })
731 .push_column_metadata(ColumnMetadata {
732 column_schema: ColumnSchema::new(
733 "field1",
734 ConcreteDataType::int64_datatype(),
735 false,
736 ),
737 semantic_type: SemanticType::Field,
738 column_id: 1,
739 })
740 .push_column_metadata(ColumnMetadata {
741 column_schema: ColumnSchema::new(
742 "timestamp",
743 ConcreteDataType::timestamp_millisecond_datatype(),
744 false,
745 ),
746 semantic_type: SemanticType::Timestamp,
747 column_id: 2,
748 })
749 .primary_key(vec![0]);
750
751 let region_metadata = builder.build().unwrap();
752
753 let context = Arc::new(
755 BulkIterContext::new(
756 Arc::new(region_metadata),
757 None, None, false,
760 )
761 .unwrap(),
762 );
763
764 let expect_batches = vec![batch1, batch2];
766 let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
767
768 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
770 assert_eq!(expect_batches, result);
771 }
772}