1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::PhysicalExpr;
25use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
26use datatypes::arrow::array::{Array as _, ArrayRef, BooleanArray};
27use datatypes::arrow::buffer::BooleanBuffer;
28use datatypes::arrow::record_batch::RecordBatch;
29use datatypes::prelude::ConcreteDataType;
30use datatypes::schema::Schema;
31use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
32use parquet::arrow::arrow_reader::RowSelection;
33use parquet::file::metadata::ParquetMetaData;
34use snafu::{OptionExt, ResultExt};
35use store_api::codec::PrimaryKeyEncoding;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::{ColumnId, TimeSeriesRowSelector};
38use table::predicate::Predicate;
39
40use crate::cache::CacheStrategy;
41use crate::error::{
42 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu,
43 EvalPartitionFilterSnafu, NewRecordBatchSnafu, RecordBatchSnafu, Result, StatsNotPresentSnafu,
44 UnexpectedSnafu,
45};
46use crate::read::Batch;
47use crate::read::compat::CompatBatch;
48use crate::read::flat_projection::CompactionProjectionMapper;
49use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
50use crate::read::prune::{FlatPruneReader, PruneReader};
51use crate::sst::file::FileHandle;
52use crate::sst::parquet::flat_format::{
53 DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
54};
55use crate::sst::parquet::format::ReadFormat;
56use crate::sst::parquet::reader::{
57 FlatRowGroupReader, MaybeFilter, RowGroupBuildContext, RowGroupReader, RowGroupReaderBuilder,
58 SimpleFilterContext,
59};
60use crate::sst::parquet::row_group::ParquetFetchMetrics;
61use crate::sst::parquet::stats::RowGroupPruningStats;
62
63pub(crate) fn row_group_contains_delete(
68 parquet_meta: &ParquetMetaData,
69 row_group_index: usize,
70 file_path: &str,
71) -> Result<bool> {
72 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
73
74 let column_metadata = &row_group_metadata.columns().last().unwrap();
76 let stats = column_metadata
77 .statistics()
78 .context(StatsNotPresentSnafu { file_path })?;
79 stats
80 .min_bytes_opt()
81 .context(StatsNotPresentSnafu { file_path })?
82 .try_into()
83 .map(i32::from_le_bytes)
84 .map(|min_op_type| min_op_type == OpType::Delete as i32)
85 .ok()
86 .context(DecodeStatsSnafu { file_path })
87}
88
89#[derive(Clone)]
92pub struct FileRange {
93 context: FileRangeContextRef,
95 row_group_idx: usize,
97 row_selection: Option<RowSelection>,
99}
100
101impl FileRange {
102 pub(crate) fn new(
104 context: FileRangeContextRef,
105 row_group_idx: usize,
106 row_selection: Option<RowSelection>,
107 ) -> Self {
108 Self {
109 context,
110 row_group_idx,
111 row_selection,
112 }
113 }
114
115 fn select_all(&self) -> bool {
117 let rows_in_group = self
118 .context
119 .reader_builder
120 .parquet_metadata()
121 .row_group(self.row_group_idx)
122 .num_rows();
123
124 let Some(row_selection) = &self.row_selection else {
125 return true;
126 };
127 row_selection.row_count() == rows_in_group as usize
128 }
129
130 fn in_dynamic_filter_range(&self) -> bool {
135 if self.context.base.dyn_filters.is_empty() {
136 return true;
137 }
138 let curr_row_group = self
139 .context
140 .reader_builder
141 .parquet_metadata()
142 .row_group(self.row_group_idx);
143 let read_format = self.context.read_format();
144 let prune_schema = &self.context.base.prune_schema;
145 let stats = RowGroupPruningStats::new(
146 std::slice::from_ref(curr_row_group),
147 read_format,
148 self.context.base.expected_metadata.clone(),
149 self.context.base.pre_filter_mode.skip_fields(),
150 );
151
152 let pred = Predicate::with_dyn_filters(vec![], self.context.base.dyn_filters.clone());
154
155 pred.prune_with_stats(&stats, prune_schema.arrow_schema())
156 .first()
157 .cloned()
158 .unwrap_or(true) }
160
161 #[allow(dead_code)]
163 pub(crate) async fn reader(
164 &self,
165 selector: Option<TimeSeriesRowSelector>,
166 fetch_metrics: Option<&ParquetFetchMetrics>,
167 ) -> Result<Option<PruneReader>> {
168 if !self.in_dynamic_filter_range() {
169 return Ok(None);
170 }
171 let skip_fields = self.context.base.pre_filter_mode.skip_fields();
173 let parquet_reader = self
174 .context
175 .reader_builder
176 .build(self.context.build_context(
177 self.row_group_idx,
178 self.row_selection.clone(),
179 fetch_metrics,
180 skip_fields,
181 ))
182 .await?;
183
184 let use_last_row_reader = if selector
185 .map(|s| s == TimeSeriesRowSelector::LastRow)
186 .unwrap_or(false)
187 {
188 let put_only = !self
191 .context
192 .contains_delete(self.row_group_idx)
193 .inspect_err(|e| {
194 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
195 })
196 .unwrap_or(true);
197 put_only && self.select_all()
198 } else {
199 false
201 };
202
203 let prune_reader = if use_last_row_reader {
204 let reader = RowGroupLastRowCachedReader::new(
206 self.file_handle().file_id().file_id(),
207 self.row_group_idx,
208 self.context.reader_builder.cache_strategy().clone(),
209 RowGroupReader::new(self.context.clone(), parquet_reader),
210 );
211 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
212 } else {
213 PruneReader::new_with_row_group_reader(
215 self.context.clone(),
216 RowGroupReader::new(self.context.clone(), parquet_reader),
217 skip_fields,
218 )
219 };
220
221 Ok(Some(prune_reader))
222 }
223
224 pub(crate) async fn flat_reader(
226 &self,
227 selector: Option<TimeSeriesRowSelector>,
228 fetch_metrics: Option<&ParquetFetchMetrics>,
229 ) -> Result<Option<FlatPruneReader>> {
230 if !self.in_dynamic_filter_range() {
231 return Ok(None);
232 }
233 let skip_fields = self.context.base.pre_filter_mode.skip_fields();
235 let parquet_reader = self
236 .context
237 .reader_builder
238 .build(self.context.build_context(
239 self.row_group_idx,
240 self.row_selection.clone(),
241 fetch_metrics,
242 skip_fields,
243 ))
244 .await?;
245
246 let use_last_row_reader = if selector
247 .map(|s| s == TimeSeriesRowSelector::LastRow)
248 .unwrap_or(false)
249 {
250 let put_only = !self
253 .context
254 .contains_delete(self.row_group_idx)
255 .inspect_err(|e| {
256 error!(e; "Failed to decode min value of op_type, fallback to FlatRowGroupReader");
257 })
258 .unwrap_or(true);
259 put_only && self.select_all()
260 } else {
261 false
262 };
263
264 let flat_prune_reader = if use_last_row_reader {
265 let flat_row_group_reader =
266 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
267 let cache_strategy = if self.context.reader_builder.has_flat_primary_key_prefilter() {
270 CacheStrategy::Disabled
271 } else {
272 self.context.reader_builder.cache_strategy().clone()
273 };
274 let reader = FlatRowGroupLastRowCachedReader::new(
275 self.file_handle().file_id().file_id(),
276 self.row_group_idx,
277 cache_strategy,
278 self.context.read_format().projection_indices(),
279 flat_row_group_reader,
280 );
281 FlatPruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
282 } else {
283 let flat_row_group_reader =
284 FlatRowGroupReader::new(self.context.clone(), parquet_reader);
285 FlatPruneReader::new_with_row_group_reader(
286 self.context.clone(),
287 flat_row_group_reader,
288 skip_fields,
289 )
290 };
291
292 Ok(Some(flat_prune_reader))
293 }
294
295 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
297 self.context.compat_batch()
298 }
299
300 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
302 self.context.compaction_projection_mapper()
303 }
304
305 pub(crate) fn file_handle(&self) -> &FileHandle {
307 self.context.reader_builder.file_handle()
308 }
309}
310
311pub(crate) struct FileRangeContext {
313 reader_builder: RowGroupReaderBuilder,
315 base: RangeBase,
317}
318
319pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
320
321impl FileRangeContext {
322 pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
324 Self {
325 reader_builder,
326 base,
327 }
328 }
329
330 pub(crate) fn file_path(&self) -> &str {
332 self.reader_builder.file_path()
333 }
334
335 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
337 &self.base.filters
338 }
339
340 pub(crate) fn has_partition_filter(&self) -> bool {
342 self.base.partition_filter.is_some()
343 }
344
345 pub(crate) fn read_format(&self) -> &ReadFormat {
347 &self.base.read_format
348 }
349
350 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
352 &self.reader_builder
353 }
354
355 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
357 self.base.compat_batch.as_ref()
358 }
359
360 pub(crate) fn compaction_projection_mapper(&self) -> Option<&CompactionProjectionMapper> {
362 self.base.compaction_projection_mapper.as_ref()
363 }
364
365 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
367 self.base.compat_batch = compat;
368 }
369
370 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
374 self.base.precise_filter(input, skip_fields)
375 }
376
377 pub(crate) fn precise_filter_flat(
380 &self,
381 input: RecordBatch,
382 skip_fields: bool,
383 ) -> Result<Option<RecordBatch>> {
384 self.base.precise_filter_flat(
385 input,
386 skip_fields,
387 self.reader_builder.has_flat_primary_key_prefilter(),
388 )
389 }
390
391 pub(crate) fn pre_filter_mode(&self) -> PreFilterMode {
392 self.base.pre_filter_mode
393 }
394
395 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
397 let metadata = self.reader_builder.parquet_metadata();
398 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
399 }
400
401 pub(crate) fn build_context<'a>(
403 &'a self,
404 row_group_idx: usize,
405 row_selection: Option<RowSelection>,
406 fetch_metrics: Option<&'a ParquetFetchMetrics>,
407 skip_fields: bool,
408 ) -> RowGroupBuildContext<'a> {
409 RowGroupBuildContext {
410 filters: &self.base.filters,
411 skip_fields,
412 row_group_idx,
413 row_selection,
414 fetch_metrics,
415 }
416 }
417
418 pub(crate) fn memory_size(&self) -> usize {
421 crate::cache::cache_size::parquet_meta_size(self.reader_builder.parquet_metadata())
422 }
423}
424
425#[derive(Debug, Clone, Copy, PartialEq)]
427pub enum PreFilterMode {
428 All,
430 SkipFields,
432}
433
434impl PreFilterMode {
435 pub(crate) fn skip_fields(self) -> bool {
436 matches!(self, Self::SkipFields)
437 }
438}
439
440pub(crate) struct PartitionFilterContext {
442 pub(crate) region_partition_physical_expr: Arc<dyn PhysicalExpr>,
443 pub(crate) partition_schema: Arc<Schema>,
446}
447
448pub(crate) struct RangeBase {
450 pub(crate) filters: Vec<SimpleFilterContext>,
452 pub(crate) dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>,
454 pub(crate) read_format: ReadFormat,
456 pub(crate) expected_metadata: Option<RegionMetadataRef>,
457 pub(crate) prune_schema: Arc<Schema>,
459 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
461 pub(crate) compat_batch: Option<CompatBatch>,
463 pub(crate) compaction_projection_mapper: Option<CompactionProjectionMapper>,
465 pub(crate) pre_filter_mode: PreFilterMode,
467 pub(crate) partition_filter: Option<PartitionFilterContext>,
469}
470
471pub(crate) struct TagDecodeState {
472 decoded_pks: Option<DecodedPrimaryKeys>,
473 decoded_tag_cache: HashMap<ColumnId, ArrayRef>,
474}
475
476impl TagDecodeState {
477 pub(crate) fn new() -> Self {
478 Self {
479 decoded_pks: None,
480 decoded_tag_cache: HashMap::new(),
481 }
482 }
483}
484
485impl RangeBase {
486 pub(crate) fn precise_filter(
498 &self,
499 mut input: Batch,
500 skip_fields: bool,
501 ) -> Result<Option<Batch>> {
502 let mut mask = BooleanBuffer::new_set(input.num_rows());
503
504 for filter_ctx in &self.filters {
507 let filter = match filter_ctx.filter() {
508 MaybeFilter::Filter(f) => f,
509 MaybeFilter::Matched => continue,
511 MaybeFilter::Pruned => return Ok(None),
513 };
514 let result = match filter_ctx.semantic_type() {
515 SemanticType::Tag => {
516 let pk_values = if let Some(pk_values) = input.pk_values() {
517 pk_values
518 } else {
519 input.set_pk_values(
520 self.codec
521 .decode(input.primary_key())
522 .context(DecodeSnafu)?,
523 );
524 input.pk_values().unwrap()
525 };
526 let pk_value = match pk_values {
527 CompositeValues::Dense(v) => {
528 let pk_index = self
530 .read_format
531 .metadata()
532 .primary_key_index(filter_ctx.column_id())
533 .unwrap();
534 v[pk_index]
535 .1
536 .try_to_scalar_value(filter_ctx.data_type())
537 .context(DataTypeMismatchSnafu)?
538 }
539 CompositeValues::Sparse(v) => {
540 let v = v.get_or_null(filter_ctx.column_id());
541 v.try_to_scalar_value(filter_ctx.data_type())
542 .context(DataTypeMismatchSnafu)?
543 }
544 };
545 if filter
546 .evaluate_scalar(&pk_value)
547 .context(RecordBatchSnafu)?
548 {
549 continue;
550 } else {
551 return Ok(None);
553 }
554 }
555 SemanticType::Field => {
556 if skip_fields {
558 continue;
559 }
560 let Some(field_index) = self
562 .read_format
563 .as_primary_key()
564 .unwrap()
565 .field_index_by_id(filter_ctx.column_id())
566 else {
567 continue;
568 };
569 let field_col = &input.fields()[field_index].data;
570 filter
571 .evaluate_vector(field_col)
572 .context(RecordBatchSnafu)?
573 }
574 SemanticType::Timestamp => filter
575 .evaluate_vector(input.timestamps())
576 .context(RecordBatchSnafu)?,
577 };
578
579 mask = mask.bitand(&result);
580 }
581
582 if mask.count_set_bits() == 0 {
583 return Ok(None);
584 }
585
586 if let Some(partition_filter) = &self.partition_filter {
588 let record_batch = self
589 .build_record_batch_for_pruning(&mut input, &partition_filter.partition_schema)?;
590 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
591 mask = mask.bitand(&partition_mask);
592 }
593
594 if mask.count_set_bits() == 0 {
595 Ok(None)
596 } else {
597 input.filter(&BooleanArray::from(mask).into())?;
598 Ok(Some(input))
599 }
600 }
601
602 pub(crate) fn precise_filter_flat(
610 &self,
611 input: RecordBatch,
612 skip_fields: bool,
613 skip_prefiltered_pk_filters: bool,
614 ) -> Result<Option<RecordBatch>> {
615 let mut tag_decode_state = TagDecodeState::new();
616 let mask = self.compute_filter_mask_flat(
617 &input,
618 skip_fields,
619 skip_prefiltered_pk_filters,
620 &mut tag_decode_state,
621 )?;
622
623 let Some(mut mask) = mask else {
625 return Ok(None);
626 };
627
628 if let Some(partition_filter) = &self.partition_filter {
630 let record_batch = self.project_record_batch_for_pruning_flat(
631 &input,
632 &partition_filter.partition_schema,
633 &mut tag_decode_state,
634 )?;
635 let partition_mask = self.evaluate_partition_filter(&record_batch, partition_filter)?;
636 mask = mask.bitand(&partition_mask);
637 }
638
639 if mask.count_set_bits() == 0 {
640 return Ok(None);
641 }
642
643 let filtered_batch =
644 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
645 .context(ComputeArrowSnafu)?;
646
647 if filtered_batch.num_rows() > 0 {
648 Ok(Some(filtered_batch))
649 } else {
650 Ok(None)
651 }
652 }
653
654 pub(crate) fn compute_filter_mask_flat(
663 &self,
664 input: &RecordBatch,
665 skip_fields: bool,
666 skip_prefiltered_pk_filters: bool,
667 tag_decode_state: &mut TagDecodeState,
668 ) -> Result<Option<BooleanBuffer>> {
669 let mut mask = BooleanBuffer::new_set(input.num_rows());
670
671 let flat_format = self
672 .read_format
673 .as_flat()
674 .context(crate::error::UnexpectedSnafu {
675 reason: "Expected flat format for precise_filter_flat",
676 })?;
677 let metadata = flat_format.metadata();
678
679 for filter_ctx in &self.filters {
681 let filter = match filter_ctx.filter() {
682 MaybeFilter::Filter(f) => f,
683 MaybeFilter::Matched => continue,
685 MaybeFilter::Pruned => return Ok(None),
687 };
688
689 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
691 continue;
692 }
693
694 if skip_prefiltered_pk_filters && filter_ctx.usable_primary_key_filter() {
697 continue;
698 }
699
700 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
704 if let Some(idx) = column_idx {
705 let column = &input.columns().get(idx).unwrap();
706 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
707 mask = mask.bitand(&result);
708 } else if filter_ctx.semantic_type() == SemanticType::Tag {
709 let column_id = filter_ctx.column_id();
711
712 if let Some(tag_column) =
713 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
714 {
715 let result = filter
716 .evaluate_array(&tag_column)
717 .context(RecordBatchSnafu)?;
718 mask = mask.bitand(&result);
719 }
720 } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
721 let time_index_pos = time_index_column_index(input.num_columns());
722 let column = &input.columns()[time_index_pos];
723 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
724 mask = mask.bitand(&result);
725 }
726 }
728
729 Ok(Some(mask))
730 }
731
732 fn maybe_decode_tag_column(
734 &self,
735 metadata: &RegionMetadataRef,
736 column_id: ColumnId,
737 input: &RecordBatch,
738 tag_decode_state: &mut TagDecodeState,
739 ) -> Result<Option<ArrayRef>> {
740 let Some(pk_index) = metadata.primary_key_index(column_id) else {
741 return Ok(None);
742 };
743
744 if let Some(cached_column) = tag_decode_state.decoded_tag_cache.get(&column_id) {
745 return Ok(Some(cached_column.clone()));
746 }
747
748 if tag_decode_state.decoded_pks.is_none() {
749 tag_decode_state.decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
750 }
751
752 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
753 None
754 } else {
755 Some(pk_index)
756 };
757 let Some(column_index) = metadata.column_index_by_id(column_id) else {
758 return Ok(None);
759 };
760 let Some(decoded) = tag_decode_state.decoded_pks.as_ref() else {
761 return Ok(None);
762 };
763
764 let column_metadata = &metadata.column_metadatas[column_index];
765 let tag_column = decoded.get_tag_column(
766 column_id,
767 pk_index,
768 &column_metadata.column_schema.data_type,
769 )?;
770 tag_decode_state
771 .decoded_tag_cache
772 .insert(column_id, tag_column.clone());
773
774 Ok(Some(tag_column))
775 }
776
777 fn evaluate_partition_filter(
779 &self,
780 record_batch: &RecordBatch,
781 partition_filter: &PartitionFilterContext,
782 ) -> Result<BooleanBuffer> {
783 let columnar_value = partition_filter
784 .region_partition_physical_expr
785 .evaluate(record_batch)
786 .context(EvalPartitionFilterSnafu)?;
787 let array = columnar_value
788 .into_array(record_batch.num_rows())
789 .context(EvalPartitionFilterSnafu)?;
790 let boolean_array =
791 array
792 .as_any()
793 .downcast_ref::<BooleanArray>()
794 .context(UnexpectedSnafu {
795 reason: "Failed to downcast to BooleanArray".to_string(),
796 })?;
797
798 let mut mask = boolean_array.values().clone();
800 if let Some(nulls) = boolean_array.nulls() {
801 mask = mask.bitand(nulls.inner());
802 }
803
804 Ok(mask)
805 }
806
807 fn build_record_batch_for_pruning(
812 &self,
813 input: &mut Batch,
814 schema: &Arc<Schema>,
815 ) -> Result<RecordBatch> {
816 let arrow_schema = schema.arrow_schema();
817 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
818
819 if input.pk_values().is_none() {
821 input.set_pk_values(
822 self.codec
823 .decode(input.primary_key())
824 .context(DecodeSnafu)?,
825 );
826 }
827
828 for field in arrow_schema.fields() {
829 let metadata = self.read_format.metadata();
830 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
831
832 let Some(column_id) = column_id else {
834 return UnexpectedSnafu {
835 reason: format!(
836 "Partition pruning schema expects column '{}' but it is missing in \
837 region metadata",
838 field.name()
839 ),
840 }
841 .fail();
842 };
843
844 if let Some(pk_index) = metadata.primary_key_index(column_id) {
846 let pk_values = input.pk_values().unwrap();
847 let value = match pk_values {
848 CompositeValues::Dense(v) => &v[pk_index].1,
849 CompositeValues::Sparse(v) => v.get_or_null(column_id),
850 };
851 let concrete_type = ConcreteDataType::from_arrow_type(field.data_type());
852 let arrow_scalar = value
853 .try_to_scalar_value(&concrete_type)
854 .context(DataTypeMismatchSnafu)?;
855 let array = arrow_scalar
856 .to_array_of_size(input.num_rows())
857 .context(EvalPartitionFilterSnafu)?;
858 columns.push(array);
859 } else if metadata.time_index_column().column_id == column_id {
860 columns.push(input.timestamps().to_arrow_array());
862 } else if let Some(field_index) = self
863 .read_format
864 .as_primary_key()
865 .and_then(|f| f.field_index_by_id(column_id))
866 {
867 columns.push(input.fields()[field_index].data.to_arrow_array());
869 } else {
870 return UnexpectedSnafu {
871 reason: format!(
872 "Partition pruning schema expects column '{}' (id {}) but it is not \
873 present in input batch",
874 field.name(),
875 column_id
876 ),
877 }
878 .fail();
879 }
880 }
881
882 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
883 }
884
885 fn project_record_batch_for_pruning_flat(
890 &self,
891 input: &RecordBatch,
892 schema: &Arc<Schema>,
893 tag_decode_state: &mut TagDecodeState,
894 ) -> Result<RecordBatch> {
895 let arrow_schema = schema.arrow_schema();
896 let mut columns = Vec::with_capacity(arrow_schema.fields().len());
897
898 let flat_format = self
899 .read_format
900 .as_flat()
901 .context(crate::error::UnexpectedSnafu {
902 reason: "Expected flat format for precise_filter_flat",
903 })?;
904 let metadata = flat_format.metadata();
905
906 for field in arrow_schema.fields() {
907 let column_id = metadata.column_by_name(field.name()).map(|c| c.column_id);
908
909 let Some(column_id) = column_id else {
910 return UnexpectedSnafu {
911 reason: format!(
912 "Partition pruning schema expects column '{}' but it is missing in \
913 region metadata",
914 field.name()
915 ),
916 }
917 .fail();
918 };
919
920 if let Some(idx) = flat_format.projected_index_by_id(column_id) {
921 columns.push(input.column(idx).clone());
922 continue;
923 }
924
925 if metadata.time_index_column().column_id == column_id {
926 let time_index_pos = time_index_column_index(input.num_columns());
927 columns.push(input.column(time_index_pos).clone());
928 continue;
929 }
930
931 if let Some(tag_column) =
932 self.maybe_decode_tag_column(metadata, column_id, input, tag_decode_state)?
933 {
934 columns.push(tag_column);
935 continue;
936 }
937
938 return UnexpectedSnafu {
939 reason: format!(
940 "Partition pruning schema expects column '{}' (id {}) but it is not \
941 present in projected record batch",
942 field.name(),
943 column_id
944 ),
945 }
946 .fail();
947 }
948
949 RecordBatch::try_new(arrow_schema.clone(), columns).context(NewRecordBatchSnafu)
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use std::sync::Arc;
956
957 use datafusion_expr::{col, lit};
958
959 use super::*;
960 use crate::sst::parquet::format::ReadFormat;
961 use crate::test_util::sst_util::{new_record_batch_with_custom_sequence, sst_region_metadata};
962
963 fn new_test_range_base(filters: Vec<SimpleFilterContext>) -> RangeBase {
964 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
965 let read_format = ReadFormat::new_flat(
966 metadata.clone(),
967 metadata.column_metadatas.iter().map(|c| c.column_id),
968 None,
969 "test",
970 true,
971 )
972 .unwrap();
973
974 RangeBase {
975 filters,
976 dyn_filters: vec![],
977 read_format,
978 expected_metadata: None,
979 prune_schema: metadata.schema.clone(),
980 codec: mito_codec::row_converter::build_primary_key_codec(metadata.as_ref()),
981 compat_batch: None,
982 compaction_projection_mapper: None,
983 pre_filter_mode: PreFilterMode::All,
984 partition_filter: None,
985 }
986 }
987
988 #[test]
989 fn test_compute_filter_mask_flat_skips_prefiltered_pk_filters() {
990 let metadata: RegionMetadataRef = Arc::new(sst_region_metadata());
991 let filters = vec![
992 SimpleFilterContext::new_opt(&metadata, None, &col("tag_0").eq(lit("a"))).unwrap(),
993 SimpleFilterContext::new_opt(&metadata, None, &col("field_0").gt(lit(1_u64))).unwrap(),
994 ];
995 let base = new_test_range_base(filters);
996 let batch = new_record_batch_with_custom_sequence(&["b", "x"], 0, 4, 1);
997
998 let mask_without_skip = base
999 .compute_filter_mask_flat(&batch, false, false, &mut TagDecodeState::new())
1000 .unwrap()
1001 .unwrap();
1002 assert_eq!(mask_without_skip.count_set_bits(), 0);
1003
1004 let mask_with_skip = base
1005 .compute_filter_mask_flat(&batch, false, true, &mut TagDecodeState::new())
1006 .unwrap()
1007 .unwrap();
1008 assert_eq!(mask_with_skip.count_set_bits(), 2);
1009 }
1010}