1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::schema::Schema;
30use mito_codec::row_converter::PrimaryKeyCodec;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::file::metadata::ParquetMetaData;
33use snafu::{OptionExt, ResultExt};
34use store_api::codec::PrimaryKeyEncoding;
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::{ColumnId, TimeSeriesRowSelector};
37use table::predicate::Predicate;
38
39use crate::cache::CacheStrategy;
40use crate::error::{
41 ComputeArrowSnafu, DecodeStatsSnafu, EvalPartitionFilterSnafu, NewRecordBatchSnafu,
42 RecordBatchSnafu, Result, StatsNotPresentSnafu, UnexpectedSnafu,
43};
44use crate::read::compat::FlatCompatBatch;
45use crate::read::flat_projection::CompactionProjectionMapper;
46use crate::read::last_row::FlatRowGroupLastRowCachedReader;
47use crate::read::prune::FlatPruneReader;
48use crate::sst::file::FileHandle;
49use crate::sst::parquet::flat_format::{
50 DecodedPrimaryKeys, FlatReadFormat, decode_primary_keys, time_index_column_index,
51};
52use crate::sst::parquet::reader::{
53 FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReaderBuilder,
54 SimpleFilterContext,
55};
56use crate::sst::parquet::row_group::ParquetFetchMetrics;
57use crate::sst::parquet::stats::RowGroupPruningStats;
58
59pub(crate) fn row_group_contains_delete(
64 parquet_meta: &ParquetMetaData,
65 row_group_index: usize,
66 file_path: &str,
67) -> Result<bool> {
68 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
69
70 let column_metadata = &row_group_metadata.columns().last().unwrap();
72 let stats = column_metadata
73 .statistics()
74 .context(StatsNotPresentSnafu { file_path })?;
75 stats
76 .min_bytes_opt()
77 .context(StatsNotPresentSnafu { file_path })?
78 .try_into()
79 .map(i32::from_le_bytes)
80 .map(|min_op_type| min_op_type == OpType::Delete as i32)
81 .ok()
82 .context(DecodeStatsSnafu { file_path })
83}
84
85#[derive(Clone)]
88pub struct FileRange {
89 context: FileRangeContextRef,
91 row_group_idx: usize,
93 row_selection: Option<RowSelection>,
95}
96
97impl FileRange {
98 pub(crate) fn new(
100 context: FileRangeContextRef,
101 row_group_idx: usize,
102 row_selection: Option<RowSelection>,
103 ) -> Self {
104 Self {
105 context,
106 row_group_idx,
107 row_selection,
108 }
109 }
110
111 fn select_all(&self) -> bool {
113 let rows_in_group = self
114 .context
115 .reader_builder
116 .parquet_metadata()
117 .row_group(self.row_group_idx)
118 .num_rows();
119
120 let Some(row_selection) = &self.row_selection else {
121 return true;
122 };
123 row_selection.row_count() == rows_in_group as usize
124 }
125
126 fn in_dynamic_filter_range(&self) -> bool {
131 if self.context.base.dyn_filters.is_empty() {
132 return true;
133 }
134 let curr_row_group = self
135 .context
136 .reader_builder
137 .parquet_metadata()
138 .row_group(self.row_group_idx);
139 let read_format = self.context.read_format();
140 let prune_schema = &self.context.base.prune_schema;
141 let stats = RowGroupPruningStats::new(
142 std::slice::from_ref(curr_row_group),
143 read_format,
144 self.context.base.expected_metadata.clone(),
145 self.context.base.pre_filter_mode.skip_fields(),
146 );
147
148 let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
150
151 pred.prune_with_stats(&stats, prune_schema.arrow_schema())
152 .first()
153 .cloned()
154 .unwrap_or(true) }
156
157 pub(crate) async fn flat_reader(
159 &self,
160 selector: Option<TimeSeriesRowSelector>,
161 fetch_metrics: Option<&ParquetFetchMetrics>,
162 ) -> Result<Option<FlatPruneReader>> {
163 if !self.in_dynamic_filter_range() {
164 return Ok(None);
165 }
166 let skip_fields = self.context.base.pre_filter_mode.skip_fields();
168 let parquet_reader = self
169 .context
170 .reader_builder
171 .build(self.context.build_context(
172 self.row_group_idx,
173 self.row_selection.clone(),
174 fetch_metrics,
175 ))
176 .await?;
177
178 let use_last_row_reader = if selector
179 .map(|s| s == TimeSeriesRowSelector::LastRow)
180 .unwrap_or(false)
181 {
182 let put_only = !self
185 .context
186 .contains_delete(self.row_group_idx)
187 .inspect_err(|e| {
188 error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
189 })
190 .unwrap_or(true);
191 put_only && self.select_all()
192 } else {
193 false
194 };
195
196 let flat_prune_reader = if use_last_row_reader {
197 let flat_row_group_reader =
198 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
199 let cache_strategy = if self.context.reader_builder.has_predicate_prefilter() {
202 CacheStrategy::Disabled
203 } else {
204 self.context.reader_builder.cache_strategy().clone()
205 };
206 let reader = FlatRowGroupLastRowCachedReader::new(
207 self.file_handle().file_id().file_id(),
208 self.row_group_idx,
209 cache_strategy,
210 self.context.read_format().parquet_read_columns(),
211 flat_row_group_reader,
212 );
213 FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
214 } else {
215 let flat_row_group_reader =
216 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
217 FlatPruneReader::new_with_row_group_reader(
218 self.context.clone(),
219 flat_row_group_reader,
220 skip_fields,
221 )
222 };
223
224 Ok(Some(flat_prune_reader))
225 }
226
227 pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
229 self.context.compat_batch()
230 }
231
232 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
234 self.context.compaction_projection_mapper()
235 }
236
237 pub(crate) fn file_handle(&self) -> &FileHandle {
239 self.context.reader_builder.file_handle()
240 }
241}
242
243pub(crate) struct FileRangeContext {
245 reader_builder: RowGroupReaderBuilder,
247 base: RangeBase,
249}
250
251pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
252
253impl FileRangeContext {
254 pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
256 Self {
257 reader_builder,
258 base,
259 }
260 }
261
262 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
264 &self.base.filters
265 }
266
267 pub(crate) fn has_partition_filter(&self) -> bool {
269 self.base.partition_filter.is_some()
270 }
271
272 pub(crate) fn read_format(&self) -> &FlatReadFormat {
274 &self.base.read_format
275 }
276
277 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
279 &self.reader_builder
280 }
281
282 pub(crate) fn compat_batch(&self) -> Option<&FlatCompatBatch> {
284 self.base.compat_batch.as_ref()
285 }
286
287 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
289 self.base.compaction_projection_mapper.as_ref()
290 }
291
292 pub(crate) fn set_compat_batch(&mut self, compat: Option<FlatCompatBatch>) {
294 self.base.compat_batch = compat;
295 }
296
297 pub(crate) fn precise_filter_flat(
301 &self,
302 input: RecordBatch,
303 skip_fields: bool,
304 ) -> Result<Option<RecordBatch>> {
305 self.base.precise_filter_flat(input, skip_fields)
306 }
307
308 pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
309 self.base.pre_filter_mode
310 }
311
312 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
314 let metadata = self.reader_builder.parquet_metadata();
315 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
316 }
317
318 pub(crate) fn build_context<'a>(
320 &'a self,
321 row_group_idx: usize,
322 row_selection: Option<RowSelection>,
323 fetch_metrics: Option<&'a ParquetFetchMetrics>,
324 ) -> RowGroupBuildContext<'a> {
325 RowGroupBuildContext {
326 row_group_idx,
327 row_selection,
328 fetch_metrics,
329 }
330 }
331
332 pub(crate) fn memory_size(&self) -> usize {
335 crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
336 }
337}
338
339#[derive(Debug, Clone, Copy, PartialEq, Eq)]
341pub enum PreFilterMode {
342 All,
344 SkipFields,
346}
347
348impl PreFilterMode {
349 pub(crate) fn skip_fields(self) -> bool {
350 matches!(self, Self::SkipFields)
351 }
352}
353
354pub(crate) struct PartitionFilterContext {
356 pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
357 pub(crate) partition_schema: Arc<Schema>,
360}
361
362pub(crate) struct RangeBase {
364 pub(crate) filters: Vec<SimpleFilterContext>,
366 pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
368 pub(crate) read_format: FlatReadFormat,
370 pub(crate) expected_metadata: Option<RegionMetadataRef>,
371 pub(crate) prune_schema: Arc<Schema>,
373 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
375 pub(crate) compat_batch: Option<FlatCompatBatch>,
377 pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
379 pub(crate) pre_filter_mode: PreFilterMode,
381 pub(crate) partition_filter: Option<PartitionFilterContext>,
383}
384
385pub(crate) struct TagDecodeState {
386 decoded_pks: Option<DecodedPrimaryKeys>,
387 decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
388}
389
390impl TagDecodeState {
391 pub(crate) fn new() -> Self {
392 Self {
393 decoded_pks: None,
394 decoded_tag_cache: HashMap::new(),
395 }
396 }
397}
398
399impl RangeBase {
400 pub(crate) fn precise_filter_flat(
408 &self,
409 input: RecordBatch,
410 skip_fields: bool,
411 ) -> Result<Option<RecordBatch>> {
412 let mut tag_decode_state = TagDecodeState::new();
413 let mask = self.compute_filter_mask_flat(&input, skip_fields, &mut tag_decode_state)?;
414
415 let Some(mut mask) = mask else {
417 return Ok(None);
418 };
419
420 if let Some(partition_filter) = &self.partition_filter {
422 let record_batch = self.project_record_batch_for_pruning_flat(
423 &input,
424 &partition_filter.partition_schema,
425 &mut tag_decode_state,
426 )?;
427 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
428 mask = mask.bitand(&partition_mask);
429 }
430
431 if mask.count_set_bits() == 0 {
432 return Ok(None);
433 }
434
435 let filtered_batch =
436 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
437 .context(ComputeArrowSnafu)?;
438
439 if filtered_batch.num_rows() > 0 {
440 Ok(Some(filtered_batch))
441 } else {
442 Ok(None)
443 }
444 }
445
446 pub(crate) fn compute_filter_mask_flat(
456 &self,
457 input: &RecordBatch,
458 skip_fields: bool,
459 tag_decode_state: &mut TagDecodeState,
460 ) -> Result<Option<BooleanBuffer>> {
461 let mut mask = BooleanBuffer::new_set(input.num_rows());
462
463 let metadata = self.read_format.metadata();
464
465 for filter_ctx in &self.filters {
467 let filter = match filter_ctx.filter() {
468 MaybeFilter::Filter(f) => f,
469 MaybeFilter::Matched => continue,
471 MaybeFilter::Pruned => return Ok(None),
473 };
474
475 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
477 continue;
478 }
479
480 let column_idx = self
484 .read_format
485 .projected_index_by_id(filter_ctx.column_id());
486 if let Some(idx) = column_idx {
487 let column = &input.columns().get(idx).unwrap();
488 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
489 mask = mask.bitand(&result);
490 } else if filter_ctx.semantic_type() == SemanticType::Tag {
491 let column_id = filter_ctx.column_id();
493
494 if let Some(tag_column) =
495 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
496 {
497 let result = filter
498 .evaluate_array(&tag_column)
499 .context(RecordBatchSnafu)?;
500 mask = mask.bitand(&result);
501 }
502 } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
503 let time_index_pos = time_index_column_index(input.num_columns());
504 let column = &input.columns()[time_index_pos];
505 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
506 mask = mask.bitand(&result);
507 }
508 }
510
511 Ok(Some(mask))
512 }
513
514 fn maybe_decode_tag_column(
516 &self,
517 metadata: &RegionMetadataRef,
518 column_id: ColumnId,
519 input: &RecordBatch,
520 tag_decode_state: &mut TagDecodeState,
521 ) -> Result<Option<ArrayRef>> {
522 let Some(pk_index) = metadata.primary_key_index(column_id) else {
523 return Ok(None);
524 };
525
526 if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
527 return Ok(Some(cached_column.clone()));
528 }
529
530 if tag_decode_state.decoded_pks.is_none() {
531 tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
532 }
533
534 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
535 None
536 } else {
537 Some(pk_index)
538 };
539 let Some(column_index) = metadata.column_index_by_id(column_id) else {
540 return Ok(None);
541 };
542 let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
543 return Ok(None);
544 };
545
546 let column_metadata = &metadata.column_metadatas[column_index];
547 let tag_column = decoded.get_tag_column(
548 column_id,
549 pk_index,
550 &column_metadata.column_schema.data_type,
551 )?;
552 tag_decode_state
553 .decoded_tag_cache
554 .insert(column_id, tag_column.clone());
555
556 Ok(Some(tag_column))
557 }
558
559 fn evaluate_partition_filter(
561 &self,
562 record_batch: &RecordBatch,
563 partition_filter: &PartitionFilterContext,
564 ) -> Result<BooleanBuffer> {
565 let columnar_value = partition_filter
566 .region_partition_physical_expr
567 .evaluate(record_batch)
568 .context(EvalPartitionFilterSnafu)?;
569 let array = columnar_value
570 .into_array(record_batch.num_rows())
571 .context(EvalPartitionFilterSnafu)?;
572 let boolean_array =
573 array
574 .as_any()
575 .downcast_ref::<BooleanArray>()
576 .context(UnexpectedSnafu {
577 reason: "Failed to downcast to BooleanArray".to_string(),
578 })?;
579
580 let mut mask = boolean_array.values().clone();
582 if let Some(nulls) = boolean_array.nulls() {
583 mask = mask.bitand(nulls.inner());
584 }
585
586 Ok(mask)
587 }
588
589 fn project_record_batch_for_pruning_flat(
594 &self,
595 input: &RecordBatch,
596 schema: &Arc<Schema>,
597 tag_decode_state: &mut TagDecodeState,
598 ) -> Result<RecordBatch> {
599 let arrow_schema = schema.arrow_schema();
600 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
601
602 let metadata = self.read_format.metadata();
603
604 for field in arrow_schema.fields() {
605 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
606
607 let Some(column_id) = column_id else {
608 return UnexpectedSnafu {
609 reason: format!(
610 "Partition pruning schema expects column '{}' but it is missing in \
611 region metadata",
612 field.name()
613 ),
614 }
615 .fail();
616 };
617
618 if let Some(idx) = self.read_format.projected_index_by_id(column_id) {
619 columns.push(input.column(idx).clone());
620 continue;
621 }
622
623 if metadata.time_index_column().column_id == column_id {
624 let time_index_pos = time_index_column_index(input.num_columns());
625 columns.push(input.column(time_index_pos).clone());
626 continue;
627 }
628
629 if let Some(tag_column) =
630 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
631 {
632 columns.push(tag_column);
633 continue;
634 }
635
636 return UnexpectedSnafu {
637 reason: format!(
638 "Partition pruning schema expects column '{}' (id {}) but it is not \
639 present in projected record batch",
640 field.name(),
641 column_id
642 ),
643 }
644 .fail();
645 }
646
647 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use std::sync::Arc;
654
655 use datafusion_expr::{col, lit};
656
657 use super::*;
658 use crate::read::read_columns::ReadColumns;
659 use crate::sst::parquet::flat_format::FlatReadFormat;
660 use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
661
662 fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
663 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
664
665 let read_format = FlatReadFormat::new(
666 metadata.clone(),
667 ReadColumns::from_deduped_column_ids(
668 metadata.column_metadatas.iter().map(|c| c.column_id),
669 ),
670 None,
671 "test",
672 true,
673 )
674 .unwrap();
675
676 RangeBase {
677 filters,
678 dyn_filters: vec![],
679 read_format,
680 expected_metadata: None,
681 prune_schema: metadata.schema.clone(),
682 codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()),
683 compat_batch: None,
684 compaction_projection_mapper: None,
685 pre_filter_mode: PreFilterMode::All,
686 partition_filter: None,
687 }
688 }
689
690 #[test]
691 fn test_compute_filter_mask_flat_applies_remaining_simple_filters() {
692 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
693 let filters = vec![
694 SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
695 SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(),
696 ];
697 let base = new_test_range_base(filters);
698 let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
699
700 let mask = base
701 .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
702 .unwrap()
703 .unwrap();
704 assert_eq!(mask.count_set_bits(), 0);
705 }
706
707 #[test]
708 fn test_compute_filter_mask_flat_does_not_postfilter_physical_filters() {
709 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
710 let read_format = FlatReadFormat::new(
711 metadata.clone(),
712 ReadColumns::from_deduped_column_ids(
713 metadata.column_metadatas.iter().map(|c| c.column_id),
714 ),
715 None,
716 "test",
717 true,
718 )
719 .unwrap();
720 let physical_filter = crate::sst::parquet::reader::PhysicalFilterContext::new_opt(
721 &metadata,
722 None,
723 &read_format,
724 &col("field_0").in_list(vec![lit(1_u64), lit(2_u64)], false),
725 );
726 assert!(physical_filter.is_some());
727 let base = new_test_range_base(vec![]);
728 let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
729
730 let mask = base
731 .compute_filter_mask_flat(&batch, false, &mut TagDecodeState::new())
732 .unwrap()
733 .unwrap();
734 assert_eq!(mask.count_set_bits(), 4);
735 }
736}