1use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use parquet::arrow::ProjectionMask;
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use snafu::ResultExt;
23use store_api::storage::SequenceRange;
24
25use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
26use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
27use crate::memtable::bulk::part::EncodedBulkPart;
28use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
29use crate::memtable::{MemScanMetrics, MemScanMetricsData};
30use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
31use crate::sst::parquet::file_range::{PreFilterMode, TagDecodeState};
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::RowGroupReaderContext;
34
35pub struct EncodedBulkPartIter {
37 context: BulkIterContextRef,
38 row_groups_to_read: VecDeque<usize>,
39 current_reader: Option<ParquetRecordBatchReader>,
40 builder: MemtableRowGroupReaderBuilder,
41 sequence: Option<SequenceRange>,
43 current_skip_fields: bool,
45 metrics: MemScanMetricsData,
47 mem_scan_metrics: Option<MemScanMetrics>,
49}
50
51impl EncodedBulkPartIter {
52 pub(crate) fn try_new(
54 encoded_part: &EncodedBulkPart,
55 context: BulkIterContextRef,
56 mut row_groups_to_read: VecDeque<usize>,
57 sequence: Option<SequenceRange>,
58 mem_scan_metrics: Option<MemScanMetrics>,
59 ) -> error::Result<Self> {
60 assert!(context.read_format().as_flat().is_some());
61
62 let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
63 let data = encoded_part.data().clone();
64 let series_count = encoded_part.metadata().num_series as usize;
65
66 let projection_mask = ProjectionMask::roots(
67 parquet_meta.file_metadata().schema_descr(),
68 context.read_format().projection_indices().iter().copied(),
69 );
70 let builder =
71 MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
72
73 let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
74 Some(first_row_group) => {
75 let skip_fields = builder.compute_skip_fields(&context, first_row_group);
76 let reader = builder.build_row_group_reader(first_row_group, None)?;
77 (Some(reader), skip_fields)
78 }
79 None => (None, false),
80 };
81
82 Ok(Self {
83 context,
84 row_groups_to_read,
85 current_reader: init_reader,
86 builder,
87 sequence,
88 current_skip_fields,
89 metrics: MemScanMetricsData {
90 total_series: series_count,
91 ..Default::default()
92 },
93 mem_scan_metrics,
94 })
95 }
96
97 fn report_mem_scan_metrics(&mut self) {
98 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
99 mem_scan_metrics.merge_inner(&self.metrics);
100 }
101 }
102
103 pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
105 let start = Instant::now();
106
107 let Some(current) = &mut self.current_reader else {
108 self.metrics.scan_cost += start.elapsed();
110 return Ok(None);
111 };
112
113 for batch in current {
114 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
115 if let Some(batch) = apply_combined_filters(
116 &self.context,
117 &self.sequence,
118 batch,
119 self.current_skip_fields,
120 )? {
121 self.metrics.num_batches += 1;
123 self.metrics.num_rows += batch.num_rows();
124 self.metrics.scan_cost += start.elapsed();
125 return Ok(Some(batch));
126 }
127 }
128
129 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
131 self.current_skip_fields = self
133 .builder
134 .compute_skip_fields(&self.context, next_row_group);
135
136 let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
137 let current = self.current_reader.insert(next_reader);
138
139 for batch in current {
140 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
141 if let Some(batch) = apply_combined_filters(
142 &self.context,
143 &self.sequence,
144 batch,
145 self.current_skip_fields,
146 )? {
147 self.metrics.num_batches += 1;
149 self.metrics.num_rows += batch.num_rows();
150 self.metrics.scan_cost += start.elapsed();
151 return Ok(Some(batch));
152 }
153 }
154 }
155
156 self.metrics.scan_cost += start.elapsed();
157 Ok(None)
158 }
159}
160
161impl Iterator for EncodedBulkPartIter {
162 type Item = error::Result<RecordBatch>;
163
164 fn next(&mut self) -> Option<Self::Item> {
165 let result = self.next_record_batch().transpose();
166
167 if result.is_none() {
169 self.report_mem_scan_metrics();
170 }
171
172 result
173 }
174}
175
176impl Drop for EncodedBulkPartIter {
177 fn drop(&mut self) {
178 common_telemetry::debug!(
179 "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
180 self.context.region_id(),
181 self.metrics.total_series,
182 self.metrics.num_rows,
183 self.metrics.num_batches,
184 self.metrics.scan_cost
185 );
186
187 self.report_mem_scan_metrics();
189
190 READ_ROWS_TOTAL
191 .with_label_values(&["bulk_memtable"])
192 .inc_by(self.metrics.num_rows as u64);
193 READ_STAGE_ELAPSED
194 .with_label_values(&["scan_memtable"])
195 .observe(self.metrics.scan_cost.as_secs_f64());
196 }
197}
198
199pub struct BulkPartBatchIter {
203 batches: VecDeque<RecordBatch>,
205 context: BulkIterContextRef,
207 sequence: Option<SequenceRange>,
209 metrics: MemScanMetricsData,
211 mem_scan_metrics: Option<MemScanMetrics>,
213}
214
215impl BulkPartBatchIter {
216 pub fn new(
218 batches: Vec<RecordBatch>,
219 context: BulkIterContextRef,
220 sequence: Option<SequenceRange>,
221 series_count: usize,
222 mem_scan_metrics: Option<MemScanMetrics>,
223 ) -> Self {
224 assert!(context.read_format().as_flat().is_some());
225
226 Self {
227 batches: VecDeque::from(batches),
228 context,
229 sequence,
230 metrics: MemScanMetricsData {
231 total_series: series_count,
232 ..Default::default()
233 },
234 mem_scan_metrics,
235 }
236 }
237
238 pub fn from_single(
240 record_batch: RecordBatch,
241 context: BulkIterContextRef,
242 sequence: Option<SequenceRange>,
243 series_count: usize,
244 mem_scan_metrics: Option<MemScanMetrics>,
245 ) -> Self {
246 Self::new(
247 vec![record_batch],
248 context,
249 sequence,
250 series_count,
251 mem_scan_metrics,
252 )
253 }
254
255 fn report_mem_scan_metrics(&mut self) {
256 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
257 mem_scan_metrics.merge_inner(&self.metrics);
258 }
259 }
260
261 fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
263 let projection_indices = self.context.read_format().projection_indices();
264 if projection_indices.len() == record_batch.num_columns() {
265 return Ok(record_batch);
266 }
267
268 record_batch
269 .project(projection_indices)
270 .context(ComputeArrowSnafu)
271 }
272
273 fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
274 let start = Instant::now();
275
276 let projected_batch = self.apply_projection(record_batch)?;
278
279 let skip_fields = match self.context.pre_filter_mode() {
281 PreFilterMode::All => false,
282 PreFilterMode::SkipFields => true,
283 PreFilterMode::SkipFieldsOnDelete => true,
284 };
285
286 let Some(filtered_batch) =
287 apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
288 else {
289 self.metrics.scan_cost += start.elapsed();
290 return Ok(None);
291 };
292
293 self.metrics.num_batches += 1;
295 self.metrics.num_rows += filtered_batch.num_rows();
296 self.metrics.scan_cost += start.elapsed();
297
298 Ok(Some(filtered_batch))
299 }
300}
301
302impl Iterator for BulkPartBatchIter {
303 type Item = error::Result<RecordBatch>;
304
305 fn next(&mut self) -> Option<Self::Item> {
306 while let Some(batch) = self.batches.pop_front() {
308 match self.process_batch(batch) {
309 Ok(Some(result)) => return Some(Ok(result)),
310 Ok(None) => continue, Err(e) => {
312 self.report_mem_scan_metrics();
313 return Some(Err(e));
314 }
315 }
316 }
317
318 self.report_mem_scan_metrics();
320 None
321 }
322}
323
324impl Drop for BulkPartBatchIter {
325 fn drop(&mut self) {
326 common_telemetry::debug!(
327 "BulkPartBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
328 self.context.region_id(),
329 self.metrics.total_series,
330 self.metrics.num_rows,
331 self.metrics.num_batches,
332 self.metrics.scan_cost
333 );
334
335 self.report_mem_scan_metrics();
337
338 READ_ROWS_TOTAL
339 .with_label_values(&["bulk_memtable"])
340 .inc_by(self.metrics.num_rows as u64);
341 READ_STAGE_ELAPSED
342 .with_label_values(&["scan_memtable"])
343 .observe(self.metrics.scan_cost.as_secs_f64());
344 }
345}
346
347fn apply_combined_filters(
353 context: &BulkIterContext,
354 sequence: &Option<SequenceRange>,
355 record_batch: RecordBatch,
356 skip_fields: bool,
357) -> error::Result<Option<RecordBatch>> {
358 let format = context.read_format().as_flat().unwrap();
360 let record_batch = format.convert_batch(record_batch, None)?;
361
362 let num_rows = record_batch.num_rows();
363 let mut combined_filter = None;
364 let mut tag_decode_state = TagDecodeState::new();
365
366 if !context.base.filters.is_empty() {
368 let predicate_mask = context.base.compute_filter_mask_flat(
369 &record_batch,
370 skip_fields,
371 &mut tag_decode_state,
372 )?;
373 let Some(mask) = predicate_mask else {
375 return Ok(None);
376 };
377 combined_filter = Some(BooleanArray::from(mask));
378 }
379
380 if let Some(sequence) = sequence {
382 let sequence_column =
383 record_batch.column(sequence_column_index(record_batch.num_columns()));
384 let sequence_filter = sequence
385 .filter(&sequence_column)
386 .context(ComputeArrowSnafu)?;
387 combined_filter = match combined_filter {
389 None => Some(sequence_filter),
390 Some(existing_filter) => {
391 let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
392 .context(ComputeArrowSnafu)?;
393 Some(and_result)
394 }
395 };
396 }
397
398 let Some(filter_array) = combined_filter else {
400 return Ok(Some(record_batch));
402 };
403 let select_count = filter_array.true_count();
404 if select_count == 0 {
405 return Ok(None);
406 }
407 if select_count == num_rows {
408 return Ok(Some(record_batch));
409 }
410 let filtered_batch =
411 datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
412 .context(ComputeArrowSnafu)?;
413
414 Ok(Some(filtered_batch))
415}
416
417#[cfg(test)]
418mod tests {
419 use std::sync::Arc;
420
421 use api::v1::SemanticType;
422 use datafusion_expr::{col, lit};
423 use datatypes::arrow::array::{
424 ArrayRef, BinaryArray, DictionaryArray, Int64Array, StringArray, UInt8Array, UInt32Array,
425 UInt64Array,
426 };
427 use datatypes::arrow::datatypes::{DataType, Field, Schema};
428 use datatypes::data_type::ConcreteDataType;
429 use datatypes::schema::ColumnSchema;
430 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
431 use store_api::storage::RegionId;
432 use table::predicate::Predicate;
433
434 use super::*;
435 use crate::memtable::bulk::context::BulkIterContext;
436
437 #[test]
438 fn test_bulk_part_batch_iter() {
439 let schema = Arc::new(Schema::new(vec![
441 Field::new("key1", DataType::Utf8, false),
442 Field::new("field1", DataType::Int64, false),
443 Field::new(
444 "timestamp",
445 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
446 false,
447 ),
448 Field::new(
449 "__primary_key",
450 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
451 false,
452 ),
453 Field::new("__sequence", DataType::UInt64, false),
454 Field::new("__op_type", DataType::UInt8, false),
455 ]));
456
457 let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
459 let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
460 let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
461 vec![1000, 2000, 3000],
462 ));
463
464 use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
466 let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
467 let keys = UInt32Array::from(vec![0, 1, 2]);
468 let primary_key = Arc::new(DictionaryArray::new(keys, values));
469
470 let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
471 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); let record_batch = RecordBatch::try_new(
474 schema,
475 vec![
476 key1,
477 field1,
478 timestamp,
479 primary_key.clone(),
480 sequence,
481 op_type,
482 ],
483 )
484 .unwrap();
485
486 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
488 builder
489 .push_column_metadata(ColumnMetadata {
490 column_schema: ColumnSchema::new(
491 "key1",
492 ConcreteDataType::string_datatype(),
493 false,
494 ),
495 semantic_type: SemanticType::Tag,
496 column_id: 0,
497 })
498 .push_column_metadata(ColumnMetadata {
499 column_schema: ColumnSchema::new(
500 "field1",
501 ConcreteDataType::int64_datatype(),
502 false,
503 ),
504 semantic_type: SemanticType::Field,
505 column_id: 1,
506 })
507 .push_column_metadata(ColumnMetadata {
508 column_schema: ColumnSchema::new(
509 "timestamp",
510 ConcreteDataType::timestamp_millisecond_datatype(),
511 false,
512 ),
513 semantic_type: SemanticType::Timestamp,
514 column_id: 2,
515 })
516 .primary_key(vec![0]);
517
518 let region_metadata = builder.build().unwrap();
519
520 let context = Arc::new(
522 BulkIterContext::new(
523 Arc::new(region_metadata.clone()),
524 None, None, false,
527 )
528 .unwrap(),
529 );
530 let iter =
532 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
533 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
534 assert_eq!(1, result.len());
535 assert_eq!(3, result[0].num_rows());
536 assert_eq!(6, result[0].num_columns(),);
537
538 let iter = BulkPartBatchIter::from_single(
540 record_batch.clone(),
541 context,
542 Some(SequenceRange::LtEq { max: 2 }),
543 0,
544 None,
545 );
546 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
547 assert_eq!(1, result.len());
548 let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
549 assert_eq!(
550 &expect_sequence,
551 result[0].column(result[0].num_columns() - 2)
552 );
553 assert_eq!(6, result[0].num_columns());
554
555 let context = Arc::new(
556 BulkIterContext::new(
557 Arc::new(region_metadata),
558 Some(&[0, 2]),
559 Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
560 false,
561 )
562 .unwrap(),
563 );
564 let iter =
566 BulkPartBatchIter::from_single(record_batch.clone(), context.clone(), None, 0, None);
567 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
568 assert_eq!(1, result.len());
569 assert_eq!(1, result[0].num_rows());
570 assert_eq!(5, result[0].num_columns());
571 let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
572 assert_eq!(
573 &expect_sequence,
574 result[0].column(result[0].num_columns() - 2)
575 );
576 }
577
578 #[test]
579 fn test_bulk_part_batch_iter_multiple_batches() {
580 let schema = Arc::new(Schema::new(vec![
582 Field::new("key1", DataType::Utf8, false),
583 Field::new("field1", DataType::Int64, false),
584 Field::new(
585 "timestamp",
586 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
587 false,
588 ),
589 Field::new(
590 "__primary_key",
591 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
592 false,
593 ),
594 Field::new("__sequence", DataType::UInt64, false),
595 Field::new("__op_type", DataType::UInt8, false),
596 ]));
597
598 let key1_1 = Arc::new(StringArray::from_iter_values(["key1", "key2"]));
600 let field1_1 = Arc::new(Int64Array::from(vec![11, 12]));
601 let timestamp_1 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
602 vec![1000, 2000],
603 ));
604 let values_1 = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2"]));
605 let keys_1 = UInt32Array::from(vec![0, 1]);
606 let primary_key_1 = Arc::new(DictionaryArray::new(keys_1, values_1));
607 let sequence_1 = Arc::new(UInt64Array::from(vec![1, 2]));
608 let op_type_1 = Arc::new(UInt8Array::from(vec![1, 1]));
609
610 let batch1 = RecordBatch::try_new(
611 schema.clone(),
612 vec![
613 key1_1,
614 field1_1,
615 timestamp_1,
616 primary_key_1,
617 sequence_1,
618 op_type_1,
619 ],
620 )
621 .unwrap();
622
623 let key1_2 = Arc::new(StringArray::from_iter_values(["key3", "key4", "key5"]));
625 let field1_2 = Arc::new(Int64Array::from(vec![13, 14, 15]));
626 let timestamp_2 = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
627 vec![3000, 4000, 5000],
628 ));
629 let values_2 = Arc::new(BinaryArray::from_iter_values([b"key3", b"key4", b"key5"]));
630 let keys_2 = UInt32Array::from(vec![0, 1, 2]);
631 let primary_key_2 = Arc::new(DictionaryArray::new(keys_2, values_2));
632 let sequence_2 = Arc::new(UInt64Array::from(vec![3, 4, 5]));
633 let op_type_2 = Arc::new(UInt8Array::from(vec![1, 1, 1]));
634
635 let batch2 = RecordBatch::try_new(
636 schema.clone(),
637 vec![
638 key1_2,
639 field1_2,
640 timestamp_2,
641 primary_key_2,
642 sequence_2,
643 op_type_2,
644 ],
645 )
646 .unwrap();
647
648 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
650 builder
651 .push_column_metadata(ColumnMetadata {
652 column_schema: ColumnSchema::new(
653 "key1",
654 ConcreteDataType::string_datatype(),
655 false,
656 ),
657 semantic_type: SemanticType::Tag,
658 column_id: 0,
659 })
660 .push_column_metadata(ColumnMetadata {
661 column_schema: ColumnSchema::new(
662 "field1",
663 ConcreteDataType::int64_datatype(),
664 false,
665 ),
666 semantic_type: SemanticType::Field,
667 column_id: 1,
668 })
669 .push_column_metadata(ColumnMetadata {
670 column_schema: ColumnSchema::new(
671 "timestamp",
672 ConcreteDataType::timestamp_millisecond_datatype(),
673 false,
674 ),
675 semantic_type: SemanticType::Timestamp,
676 column_id: 2,
677 })
678 .primary_key(vec![0]);
679
680 let region_metadata = builder.build().unwrap();
681
682 let context = Arc::new(
684 BulkIterContext::new(
685 Arc::new(region_metadata),
686 None, None, false,
689 )
690 .unwrap(),
691 );
692
693 let expect_batches = vec![batch1, batch2];
695 let iter = BulkPartBatchIter::new(expect_batches.clone(), context.clone(), None, 0, None);
696
697 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
699 assert_eq!(expect_batches, result);
700 }
701}