1use std::collections::VecDeque;
16use std::time::Instant;
17
18use datatypes::arrow::array::BooleanArray;
19use datatypes::arrow::record_batch::RecordBatch;
20use parquet::arrow::ProjectionMask;
21use parquet::arrow::arrow_reader::ParquetRecordBatchReader;
22use snafu::ResultExt;
23use store_api::storage::SequenceRange;
24
25use crate::error::{self, ComputeArrowSnafu, DecodeArrowRowGroupSnafu};
26use crate::memtable::bulk::context::{BulkIterContext, BulkIterContextRef};
27use crate::memtable::bulk::part::EncodedBulkPart;
28use crate::memtable::bulk::row_group_reader::MemtableRowGroupReaderBuilder;
29use crate::memtable::{MemScanMetrics, MemScanMetricsData};
30use crate::metrics::{READ_ROWS_TOTAL, READ_STAGE_ELAPSED};
31use crate::sst::parquet::file_range::PreFilterMode;
32use crate::sst::parquet::flat_format::sequence_column_index;
33use crate::sst::parquet::reader::RowGroupReaderContext;
34
35pub struct EncodedBulkPartIter {
37 context: BulkIterContextRef,
38 row_groups_to_read: VecDeque<usize>,
39 current_reader: Option<ParquetRecordBatchReader>,
40 builder: MemtableRowGroupReaderBuilder,
41 sequence: Option<SequenceRange>,
43 current_skip_fields: bool,
45 metrics: MemScanMetricsData,
47 mem_scan_metrics: Option<MemScanMetrics>,
49}
50
51impl EncodedBulkPartIter {
52 pub(crate) fn try_new(
54 encoded_part: &EncodedBulkPart,
55 context: BulkIterContextRef,
56 mut row_groups_to_read: VecDeque<usize>,
57 sequence: Option<SequenceRange>,
58 mem_scan_metrics: Option<MemScanMetrics>,
59 ) -> error::Result<Self> {
60 assert!(context.read_format().as_flat().is_some());
61
62 let parquet_meta = encoded_part.metadata().parquet_metadata.clone();
63 let data = encoded_part.data().clone();
64 let series_count = encoded_part.metadata().num_series as usize;
65
66 let projection_mask = ProjectionMask::roots(
67 parquet_meta.file_metadata().schema_descr(),
68 context.read_format().projection_indices().iter().copied(),
69 );
70 let builder =
71 MemtableRowGroupReaderBuilder::try_new(&context, projection_mask, parquet_meta, data)?;
72
73 let (init_reader, current_skip_fields) = match row_groups_to_read.pop_front() {
74 Some(first_row_group) => {
75 let skip_fields = builder.compute_skip_fields(&context, first_row_group);
76 let reader = builder.build_row_group_reader(first_row_group, None)?;
77 (Some(reader), skip_fields)
78 }
79 None => (None, false),
80 };
81
82 Ok(Self {
83 context,
84 row_groups_to_read,
85 current_reader: init_reader,
86 builder,
87 sequence,
88 current_skip_fields,
89 metrics: MemScanMetricsData {
90 total_series: series_count,
91 ..Default::default()
92 },
93 mem_scan_metrics,
94 })
95 }
96
97 fn report_mem_scan_metrics(&mut self) {
98 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
99 mem_scan_metrics.merge_inner(&self.metrics);
100 }
101 }
102
103 pub(crate) fn next_record_batch(&mut self) -> error::Result<Option<RecordBatch>> {
105 let start = Instant::now();
106
107 let Some(current) = &mut self.current_reader else {
108 self.metrics.scan_cost += start.elapsed();
110 return Ok(None);
111 };
112
113 for batch in current {
114 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
115 if let Some(batch) = apply_combined_filters(
116 &self.context,
117 &self.sequence,
118 batch,
119 self.current_skip_fields,
120 )? {
121 self.metrics.num_batches += 1;
123 self.metrics.num_rows += batch.num_rows();
124 self.metrics.scan_cost += start.elapsed();
125 return Ok(Some(batch));
126 }
127 }
128
129 while let Some(next_row_group) = self.row_groups_to_read.pop_front() {
131 self.current_skip_fields = self
133 .builder
134 .compute_skip_fields(&self.context, next_row_group);
135
136 let next_reader = self.builder.build_row_group_reader(next_row_group, None)?;
137 let current = self.current_reader.insert(next_reader);
138
139 for batch in current {
140 let batch = batch.context(DecodeArrowRowGroupSnafu)?;
141 if let Some(batch) = apply_combined_filters(
142 &self.context,
143 &self.sequence,
144 batch,
145 self.current_skip_fields,
146 )? {
147 self.metrics.num_batches += 1;
149 self.metrics.num_rows += batch.num_rows();
150 self.metrics.scan_cost += start.elapsed();
151 return Ok(Some(batch));
152 }
153 }
154 }
155
156 self.metrics.scan_cost += start.elapsed();
157 Ok(None)
158 }
159}
160
161impl Iterator for EncodedBulkPartIter {
162 type Item = error::Result<RecordBatch>;
163
164 fn next(&mut self) -> Option<Self::Item> {
165 let result = self.next_record_batch().transpose();
166
167 if result.is_none() {
169 self.report_mem_scan_metrics();
170 }
171
172 result
173 }
174}
175
176impl Drop for EncodedBulkPartIter {
177 fn drop(&mut self) {
178 common_telemetry::debug!(
179 "EncodedBulkPartIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
180 self.context.region_id(),
181 self.metrics.total_series,
182 self.metrics.num_rows,
183 self.metrics.num_batches,
184 self.metrics.scan_cost
185 );
186
187 self.report_mem_scan_metrics();
189
190 READ_ROWS_TOTAL
191 .with_label_values(&["bulk_memtable"])
192 .inc_by(self.metrics.num_rows as u64);
193 READ_STAGE_ELAPSED
194 .with_label_values(&["scan_memtable"])
195 .observe(self.metrics.scan_cost.as_secs_f64());
196 }
197}
198
199pub struct BulkPartRecordBatchIter {
201 record_batch: Option<RecordBatch>,
203 context: BulkIterContextRef,
205 sequence: Option<SequenceRange>,
207 metrics: MemScanMetricsData,
209 mem_scan_metrics: Option<MemScanMetrics>,
211}
212
213impl BulkPartRecordBatchIter {
214 pub fn new(
216 record_batch: RecordBatch,
217 context: BulkIterContextRef,
218 sequence: Option<SequenceRange>,
219 series_count: usize,
220 mem_scan_metrics: Option<MemScanMetrics>,
221 ) -> Self {
222 assert!(context.read_format().as_flat().is_some());
223
224 Self {
225 record_batch: Some(record_batch),
226 context,
227 sequence,
228 metrics: MemScanMetricsData {
229 total_series: series_count,
230 ..Default::default()
231 },
232 mem_scan_metrics,
233 }
234 }
235
236 fn report_mem_scan_metrics(&mut self) {
237 if let Some(mem_scan_metrics) = self.mem_scan_metrics.take() {
238 mem_scan_metrics.merge_inner(&self.metrics);
239 }
240 }
241
242 fn apply_projection(&self, record_batch: RecordBatch) -> error::Result<RecordBatch> {
244 let projection_indices = self.context.read_format().projection_indices();
245 if projection_indices.len() == record_batch.num_columns() {
246 return Ok(record_batch);
247 }
248
249 record_batch
250 .project(projection_indices)
251 .context(ComputeArrowSnafu)
252 }
253
254 fn process_batch(&mut self, record_batch: RecordBatch) -> error::Result<Option<RecordBatch>> {
255 let start = Instant::now();
256
257 let projected_batch = self.apply_projection(record_batch)?;
259 let skip_fields = match self.context.pre_filter_mode() {
262 PreFilterMode::All => false,
263 PreFilterMode::SkipFields => true,
264 PreFilterMode::SkipFieldsOnDelete => true,
265 };
266 let Some(filtered_batch) =
267 apply_combined_filters(&self.context, &self.sequence, projected_batch, skip_fields)?
268 else {
269 self.metrics.scan_cost += start.elapsed();
270 return Ok(None);
271 };
272
273 self.metrics.num_batches += 1;
275 self.metrics.num_rows += filtered_batch.num_rows();
276 self.metrics.scan_cost += start.elapsed();
277
278 Ok(Some(filtered_batch))
279 }
280}
281
282impl Iterator for BulkPartRecordBatchIter {
283 type Item = error::Result<RecordBatch>;
284
285 fn next(&mut self) -> Option<Self::Item> {
286 let Some(record_batch) = self.record_batch.take() else {
287 self.report_mem_scan_metrics();
289 return None;
290 };
291
292 let result = self.process_batch(record_batch).transpose();
293
294 if result.is_none() {
296 self.report_mem_scan_metrics();
297 }
298
299 result
300 }
301}
302
303impl Drop for BulkPartRecordBatchIter {
304 fn drop(&mut self) {
305 common_telemetry::debug!(
306 "BulkPartRecordBatchIter region: {}, metrics: total_series={}, num_rows={}, num_batches={}, scan_cost={:?}",
307 self.context.region_id(),
308 self.metrics.total_series,
309 self.metrics.num_rows,
310 self.metrics.num_batches,
311 self.metrics.scan_cost
312 );
313
314 self.report_mem_scan_metrics();
316
317 READ_ROWS_TOTAL
318 .with_label_values(&["bulk_memtable"])
319 .inc_by(self.metrics.num_rows as u64);
320 READ_STAGE_ELAPSED
321 .with_label_values(&["scan_memtable"])
322 .observe(self.metrics.scan_cost.as_secs_f64());
323 }
324}
325
326fn apply_combined_filters(
332 context: &BulkIterContext,
333 sequence: &Option<SequenceRange>,
334 record_batch: RecordBatch,
335 skip_fields: bool,
336) -> error::Result<Option<RecordBatch>> {
337 let format = context.read_format().as_flat().unwrap();
339 let record_batch = format.convert_batch(record_batch, None)?;
340
341 let num_rows = record_batch.num_rows();
342 let mut combined_filter = None;
343
344 if !context.base.filters.is_empty() {
346 let predicate_mask = context
347 .base
348 .compute_filter_mask_flat(&record_batch, skip_fields)?;
349 let Some(mask) = predicate_mask else {
351 return Ok(None);
352 };
353 combined_filter = Some(BooleanArray::from(mask));
354 }
355
356 if let Some(sequence) = sequence {
358 let sequence_column =
359 record_batch.column(sequence_column_index(record_batch.num_columns()));
360 let sequence_filter = sequence
361 .filter(&sequence_column)
362 .context(ComputeArrowSnafu)?;
363 combined_filter = match combined_filter {
365 None => Some(sequence_filter),
366 Some(existing_filter) => {
367 let and_result = datatypes::arrow::compute::and(&existing_filter, &sequence_filter)
368 .context(ComputeArrowSnafu)?;
369 Some(and_result)
370 }
371 };
372 }
373
374 let Some(filter_array) = combined_filter else {
376 return Ok(Some(record_batch));
378 };
379 let select_count = filter_array.true_count();
380 if select_count == 0 {
381 return Ok(None);
382 }
383 if select_count == num_rows {
384 return Ok(Some(record_batch));
385 }
386 let filtered_batch =
387 datatypes::arrow::compute::filter_record_batch(&record_batch, &filter_array)
388 .context(ComputeArrowSnafu)?;
389
390 Ok(Some(filtered_batch))
391}
392
393#[cfg(test)]
394mod tests {
395 use std::sync::Arc;
396
397 use api::v1::SemanticType;
398 use datafusion_expr::{col, lit};
399 use datatypes::arrow::array::{ArrayRef, Int64Array, StringArray, UInt8Array, UInt64Array};
400 use datatypes::arrow::datatypes::{DataType, Field, Schema};
401 use datatypes::data_type::ConcreteDataType;
402 use datatypes::schema::ColumnSchema;
403 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
404 use store_api::storage::RegionId;
405 use table::predicate::Predicate;
406
407 use super::*;
408 use crate::memtable::bulk::context::BulkIterContext;
409
410 #[test]
411 fn test_bulk_part_record_batch_iter() {
412 let schema = Arc::new(Schema::new(vec![
414 Field::new("key1", DataType::Utf8, false),
415 Field::new("field1", DataType::Int64, false),
416 Field::new(
417 "timestamp",
418 DataType::Timestamp(datatypes::arrow::datatypes::TimeUnit::Millisecond, None),
419 false,
420 ),
421 Field::new(
422 "__primary_key",
423 DataType::Dictionary(Box::new(DataType::UInt32), Box::new(DataType::Binary)),
424 false,
425 ),
426 Field::new("__sequence", DataType::UInt64, false),
427 Field::new("__op_type", DataType::UInt8, false),
428 ]));
429
430 let key1 = Arc::new(StringArray::from_iter_values(["key1", "key2", "key3"]));
432 let field1 = Arc::new(Int64Array::from(vec![11, 12, 13]));
433 let timestamp = Arc::new(datatypes::arrow::array::TimestampMillisecondArray::from(
434 vec![1000, 2000, 3000],
435 ));
436
437 use datatypes::arrow::array::{BinaryArray, DictionaryArray, UInt32Array};
439 let values = Arc::new(BinaryArray::from_iter_values([b"key1", b"key2", b"key3"]));
440 let keys = UInt32Array::from(vec![0, 1, 2]);
441 let primary_key = Arc::new(DictionaryArray::new(keys, values));
442
443 let sequence = Arc::new(UInt64Array::from(vec![1, 2, 3]));
444 let op_type = Arc::new(UInt8Array::from(vec![1, 1, 1])); let record_batch = RecordBatch::try_new(
447 schema,
448 vec![
449 key1,
450 field1,
451 timestamp,
452 primary_key.clone(),
453 sequence,
454 op_type,
455 ],
456 )
457 .unwrap();
458
459 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
461 builder
462 .push_column_metadata(ColumnMetadata {
463 column_schema: ColumnSchema::new(
464 "key1",
465 ConcreteDataType::string_datatype(),
466 false,
467 ),
468 semantic_type: SemanticType::Tag,
469 column_id: 0,
470 })
471 .push_column_metadata(ColumnMetadata {
472 column_schema: ColumnSchema::new(
473 "field1",
474 ConcreteDataType::int64_datatype(),
475 false,
476 ),
477 semantic_type: SemanticType::Field,
478 column_id: 1,
479 })
480 .push_column_metadata(ColumnMetadata {
481 column_schema: ColumnSchema::new(
482 "timestamp",
483 ConcreteDataType::timestamp_millisecond_datatype(),
484 false,
485 ),
486 semantic_type: SemanticType::Timestamp,
487 column_id: 2,
488 })
489 .primary_key(vec![0]);
490
491 let region_metadata = builder.build().unwrap();
492
493 let context = Arc::new(
495 BulkIterContext::new(
496 Arc::new(region_metadata.clone()),
497 None, None, false,
500 )
501 .unwrap(),
502 );
503 let iter =
505 BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
506 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
507 assert_eq!(1, result.len());
508 assert_eq!(3, result[0].num_rows());
509 assert_eq!(6, result[0].num_columns(),);
510
511 let iter = BulkPartRecordBatchIter::new(
513 record_batch.clone(),
514 context,
515 Some(SequenceRange::LtEq { max: 2 }),
516 0,
517 None,
518 );
519 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
520 assert_eq!(1, result.len());
521 let expect_sequence = Arc::new(UInt64Array::from(vec![1, 2])) as ArrayRef;
522 assert_eq!(
523 &expect_sequence,
524 result[0].column(result[0].num_columns() - 2)
525 );
526 assert_eq!(6, result[0].num_columns());
527
528 let context = Arc::new(
529 BulkIterContext::new(
530 Arc::new(region_metadata),
531 Some(&[0, 2]),
532 Some(Predicate::new(vec![col("key1").eq(lit("key2"))])),
533 false,
534 )
535 .unwrap(),
536 );
537 let iter =
539 BulkPartRecordBatchIter::new(record_batch.clone(), context.clone(), None, 0, None);
540 let result: Vec<_> = iter.map(|rb| rb.unwrap()).collect();
541 assert_eq!(1, result.len());
542 assert_eq!(1, result[0].num_rows());
543 assert_eq!(5, result[0].num_columns());
544 let expect_sequence = Arc::new(UInt64Array::from(vec![2])) as ArrayRef;
545 assert_eq!(
546 &expect_sequence,
547 result[0].column(result[0].num_columns() - 2)
548 );
549 }
550}