1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
25use datatypes::arrow::array::{ArrayRef, BooleanArray};
26use datatypes::arrow::buffer::BooleanBuffer;
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::Schema;
29use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
30use parquet::arrow::arrow_reader::RowSelection;
31use parquet::file::metadata::ParquetMetaData;
32use snafu::{OptionExt, ResultExt};
33use store_api::codec::PrimaryKeyEncoding;
34use store_api::metadata::RegionMetadataRef;
35use store_api::storage::{ColumnId, TimeSeriesRowSelector};
36use table::predicate::Predicate;
37
38use crate::error::{
39 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
40 Result, StatsNotPresentSnafu,
41};
42use crate::read::Batch;
43use crate::read::compat::CompatBatch;
44use crate::read::last_row::RowGroupLastRowCachedReader;
45use crate::read::prune::{FlatPruneReader, PruneReader};
46use crate::sst::file::FileHandle;
47use crate::sst::parquet::flat_format::{
48 DecodedPrimaryKeys, decode_primary_keys, time_index_column_index,
49};
50use crate::sst::parquet::format::ReadFormat;
51use crate::sst::parquet::reader::{
52 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
53};
54use crate::sst::parquet::row_group::ParquetFetchMetrics;
55use crate::sst::parquet::stats::RowGroupPruningStats;
56
57pub(crate) fn row_group_contains_delete(
62 parquet_meta: &ParquetMetaData,
63 row_group_index: usize,
64 file_path: &str,
65) -> Result<bool> {
66 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
67
68 let column_metadata = &row_group_metadata.columns().last().unwrap();
70 let stats = column_metadata
71 .statistics()
72 .context(StatsNotPresentSnafu { file_path })?;
73 stats
74 .min_bytes_opt()
75 .context(StatsNotPresentSnafu { file_path })?
76 .try_into()
77 .map(i32::from_le_bytes)
78 .map(|min_op_type| min_op_type == OpType::Delete as i32)
79 .ok()
80 .context(DecodeStatsSnafu { file_path })
81}
82
83#[derive(Clone)]
86pub struct FileRange {
87 context: FileRangeContextRef,
89 row_group_idx: usize,
91 row_selection: Option<RowSelection>,
93}
94
95impl FileRange {
96 pub(crate) fn new(
98 context: FileRangeContextRef,
99 row_group_idx: usize,
100 row_selection: Option<RowSelection>,
101 ) -> Self {
102 Self {
103 context,
104 row_group_idx,
105 row_selection,
106 }
107 }
108
109 fn select_all(&self) -> bool {
111 let rows_in_group = self
112 .context
113 .reader_builder
114 .parquet_metadata()
115 .row_group(self.row_group_idx)
116 .num_rows();
117
118 let Some(row_selection) = &self.row_selection else {
119 return true;
120 };
121 row_selection.row_count() == rows_in_group as usize
122 }
123
124 fn in_dynamic_filter_range(&self) -> bool {
129 if self.context.base.dyn_filters.is_empty() {
130 return true;
131 }
132 let curr_row_group = self
133 .context
134 .reader_builder
135 .parquet_metadata()
136 .row_group(self.row_group_idx);
137 let read_format = self.context.read_format();
138 let prune_schema = &self.context.base.prune_schema;
139 let stats = RowGroupPruningStats::new(
140 std::slice::from_ref(curr_row_group),
141 read_format,
142 self.context.base.expected_metadata.clone(),
143 self.compute_skip_fields(),
144 );
145
146 let pred = Predicate::new(vec![]).with_dyn_filters(self.context.base.dyn_filters.clone());
148
149 pred.prune_with_stats(&stats, prune_schema.arrow_schema())
150 .first()
151 .cloned()
152 .unwrap_or(true) }
154
155 fn compute_skip_fields(&self) -> bool {
156 match self.context.base.pre_filter_mode {
157 PreFilterMode::All => false,
158 PreFilterMode::SkipFields => true,
159 PreFilterMode::SkipFieldsOnDelete => {
160 row_group_contains_delete(
162 self.context.reader_builder.parquet_metadata(),
163 self.row_group_idx,
164 self.context.reader_builder.file_path(),
165 )
166 .unwrap_or(true)
167 }
168 }
169 }
170
171 pub(crate) async fn reader(
173 &self,
174 selector: Option<TimeSeriesRowSelector>,
175 fetch_metrics: Option<&ParquetFetchMetrics>,
176 ) -> Result<Option<PruneReader>> {
177 if !self.in_dynamic_filter_range() {
178 return Ok(None);
179 }
180 let parquet_reader = self
181 .context
182 .reader_builder
183 .build(
184 self.row_group_idx,
185 self.row_selection.clone(),
186 fetch_metrics,
187 )
188 .await?;
189
190 let use_last_row_reader = if selector
191 .map(|s| s == TimeSeriesRowSelector::LastRow)
192 .unwrap_or(false)
193 {
194 let put_only = !self
197 .context
198 .contains_delete(self.row_group_idx)
199 .inspect_err(|e| {
200 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
201 })
202 .unwrap_or(true);
203 put_only && self.select_all()
204 } else {
205 false
207 };
208
209 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
211
212 let prune_reader = if use_last_row_reader {
213 let reader = RowGroupLastRowCachedReader::new(
215 self.file_handle().file_id().file_id(),
216 self.row_group_idx,
217 self.context.reader_builder.cache_strategy().clone(),
218 RowGroupReader::new(self.context.clone(), parquet_reader),
219 );
220 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
221 } else {
222 PruneReader::new_with_row_group_reader(
224 self.context.clone(),
225 RowGroupReader::new(self.context.clone(), parquet_reader),
226 skip_fields,
227 )
228 };
229
230 Ok(Some(prune_reader))
231 }
232
233 pub(crate) async fn flat_reader(
235 &self,
236 fetch_metrics: Option<&ParquetFetchMetrics>,
237 ) -> Result<Option<FlatPruneReader>> {
238 if !self.in_dynamic_filter_range() {
239 return Ok(None);
240 }
241 let parquet_reader = self
242 .context
243 .reader_builder
244 .build(
245 self.row_group_idx,
246 self.row_selection.clone(),
247 fetch_metrics,
248 )
249 .await?;
250
251 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
253
254 let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
255 let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
256 self.context.clone(),
257 flat_row_group_reader,
258 skip_fields,
259 );
260
261 Ok(Some(flat_prune_reader))
262 }
263
264 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
266 self.context.compat_batch()
267 }
268
269 pub(crate) fn file_handle(&self) -> &FileHandle {
271 self.context.reader_builder.file_handle()
272 }
273}
274
275pub(crate) struct FileRangeContext {
277 reader_builder: RowGroupReaderBuilder,
279 base: RangeBase,
281}
282
283pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
284
285impl FileRangeContext {
286 pub(crate) fn new(reader_builder: RowGroupReaderBuilder, base: RangeBase) -> Self {
288 Self {
289 reader_builder,
290 base,
291 }
292 }
293
294 pub(crate) fn file_path(&self) -> &str {
296 self.reader_builder.file_path()
297 }
298
299 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
301 &self.base.filters
302 }
303
304 pub(crate) fn read_format(&self) -> &ReadFormat {
306 &self.base.read_format
307 }
308
309 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
311 &self.reader_builder
312 }
313
314 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
316 self.base.compat_batch.as_ref()
317 }
318
319 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
321 self.base.compat_batch = compat;
322 }
323
324 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
327 self.base.precise_filter(input, skip_fields)
328 }
329
330 pub(crate) fn precise_filter_flat(
332 &self,
333 input: RecordBatch,
334 skip_fields: bool,
335 ) -> Result<Option<RecordBatch>> {
336 self.base.precise_filter_flat(input, skip_fields)
337 }
338
339 pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
341 match self.base.pre_filter_mode {
342 PreFilterMode::All => false,
343 PreFilterMode::SkipFields => true,
344 PreFilterMode::SkipFieldsOnDelete => {
345 self.contains_delete(row_group_idx).unwrap_or(true)
347 }
348 }
349 }
350
351 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
353 let metadata = self.reader_builder.parquet_metadata();
354 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
355 }
356}
357
358#[derive(Debug, Clone, Copy)]
360pub enum PreFilterMode {
361 All,
363 SkipFieldsOnDelete,
366 SkipFields,
368}
369
370pub(crate) struct RangeBase {
372 pub(crate) filters: Vec<SimpleFilterContext>,
374 pub(crate) dyn_filters: Arc<Vec<DynamicFilterPhysicalExpr>>,
376 pub(crate) read_format: ReadFormat,
378 pub(crate) expected_metadata: Option<RegionMetadataRef>,
379 pub(crate) prune_schema: Arc<Schema>,
381 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
383 pub(crate) compat_batch: Option<CompatBatch>,
385 pub(crate) pre_filter_mode: PreFilterMode,
387}
388
389impl RangeBase {
390 pub(crate) fn precise_filter(
402 &self,
403 mut input: Batch,
404 skip_fields: bool,
405 ) -> Result<Option<Batch>> {
406 let mut mask = BooleanBuffer::new_set(input.num_rows());
407
408 for filter_ctx in &self.filters {
411 let filter = match filter_ctx.filter() {
412 MaybeFilter::Filter(f) => f,
413 MaybeFilter::Matched => continue,
415 MaybeFilter::Pruned => return Ok(None),
417 };
418 let result = match filter_ctx.semantic_type() {
419 SemanticType::Tag => {
420 let pk_values = if let Some(pk_values) = input.pk_values() {
421 pk_values
422 } else {
423 input.set_pk_values(
424 self.codec
425 .decode(input.primary_key())
426 .context(DecodeSnafu)?,
427 );
428 input.pk_values().unwrap()
429 };
430 let pk_value = match pk_values {
431 CompositeValues::Dense(v) => {
432 let pk_index = self
434 .read_format
435 .metadata()
436 .primary_key_index(filter_ctx.column_id())
437 .unwrap();
438 v[pk_index]
439 .1
440 .try_to_scalar_value(filter_ctx.data_type())
441 .context(DataTypeMismatchSnafu)?
442 }
443 CompositeValues::Sparse(v) => {
444 let v = v.get_or_null(filter_ctx.column_id());
445 v.try_to_scalar_value(filter_ctx.data_type())
446 .context(DataTypeMismatchSnafu)?
447 }
448 };
449 if filter
450 .evaluate_scalar(&pk_value)
451 .context(RecordBatchSnafu)?
452 {
453 continue;
454 } else {
455 return Ok(None);
457 }
458 }
459 SemanticType::Field => {
460 if skip_fields {
462 continue;
463 }
464 let Some(field_index) = self
466 .read_format
467 .as_primary_key()
468 .unwrap()
469 .field_index_by_id(filter_ctx.column_id())
470 else {
471 continue;
472 };
473 let field_col = &input.fields()[field_index].data;
474 filter
475 .evaluate_vector(field_col)
476 .context(RecordBatchSnafu)?
477 }
478 SemanticType::Timestamp => filter
479 .evaluate_vector(input.timestamps())
480 .context(RecordBatchSnafu)?,
481 };
482
483 mask = mask.bitand(&result);
484 }
485
486 input.filter(&BooleanArray::from(mask).into())?;
487
488 Ok(Some(input))
489 }
490
491 pub(crate) fn precise_filter_flat(
499 &self,
500 input: RecordBatch,
501 skip_fields: bool,
502 ) -> Result<Option<RecordBatch>> {
503 let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
504
505 let Some(mask) = mask else {
507 return Ok(None);
508 };
509
510 let filtered_batch =
511 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
512 .context(ComputeArrowSnafu)?;
513
514 if filtered_batch.num_rows() > 0 {
515 Ok(Some(filtered_batch))
516 } else {
517 Ok(None)
518 }
519 }
520
521 pub(crate) fn compute_filter_mask_flat(
529 &self,
530 input: &RecordBatch,
531 skip_fields: bool,
532 ) -> Result<Option<BooleanBuffer>> {
533 let mut mask = BooleanBuffer::new_set(input.num_rows());
534
535 let flat_format = self
536 .read_format
537 .as_flat()
538 .context(crate::error::UnexpectedSnafu {
539 reason: "Expected flat format for precise_filter_flat",
540 })?;
541
542 let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
544 let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
546
547 for filter_ctx in &self.filters {
549 let filter = match filter_ctx.filter() {
550 MaybeFilter::Filter(f) => f,
551 MaybeFilter::Matched => continue,
553 MaybeFilter::Pruned => return Ok(None),
555 };
556
557 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
559 continue;
560 }
561
562 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
564 if let Some(idx) = column_idx {
565 let column = &input.columns()[idx];
566 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
567 mask = mask.bitand(&result);
568 } else if filter_ctx.semantic_type() == SemanticType::Tag {
569 if decoded_pks.is_none() {
572 decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
573 }
574
575 let metadata = flat_format.metadata();
576 let column_id = filter_ctx.column_id();
577
578 let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
580 cached_column.clone()
581 } else {
582 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
584 None
585 } else {
586 metadata.primary_key_index(column_id)
587 };
588 let column_index = metadata.column_index_by_id(column_id);
589
590 if let (Some(column_index), Some(decoded)) =
591 (column_index, decoded_pks.as_ref())
592 {
593 let column_metadata = &metadata.column_metadatas[column_index];
594 let tag_column = decoded.get_tag_column(
595 column_id,
596 pk_index,
597 &column_metadata.column_schema.data_type,
598 )?;
599 decoded_tag_cache.insert(column_id, tag_column.clone());
601 tag_column
602 } else {
603 continue;
604 }
605 };
606
607 let result = filter
608 .evaluate_array(&tag_column)
609 .context(RecordBatchSnafu)?;
610 mask = mask.bitand(&result);
611 } else if filter_ctx.semantic_type() == SemanticType::Timestamp {
612 let time_index_pos = time_index_column_index(input.num_columns());
613 let column = &input.columns()[time_index_pos];
614 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
615 mask = mask.bitand(&result);
616 }
617 }
619
620 Ok(Some(mask))
621 }
622}