1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datatypes::arrow::array::{ArrayRef, BooleanArray};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
28use parquet::arrow::arrow_reader::RowSelection;
29use parquet::file::metadata::ParquetMetaData;
30use snafu::{OptionExt, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::{ColumnId, TimeSeriesRowSelector};
33
34use crate::error::{
35 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
36 Result, StatsNotPresentSnafu,
37};
38use crate::read::Batch;
39use crate::read::compat::CompatBatch;
40use crate::read::last_row::RowGroupLastRowCachedReader;
41use crate::read::prune::{FlatPruneReader, PruneReader};
42use crate::sst::file::FileHandle;
43use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys};
44use crate::sst::parquet::format::ReadFormat;
45use crate::sst::parquet::reader::{
46 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
47};
48use crate::sst::parquet::row_group::ParquetFetchMetrics;
49
50pub(crate) fn row_group_contains_delete(
55 parquet_meta: &ParquetMetaData,
56 row_group_index: usize,
57 file_path: &str,
58) -> Result<bool> {
59 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
60
61 let column_metadata = &row_group_metadata.columns().last().unwrap();
63 let stats = column_metadata
64 .statistics()
65 .context(StatsNotPresentSnafu { file_path })?;
66 stats
67 .min_bytes_opt()
68 .context(StatsNotPresentSnafu { file_path })?
69 .try_into()
70 .map(i32::from_le_bytes)
71 .map(|min_op_type| min_op_type == OpType::Delete as i32)
72 .ok()
73 .context(DecodeStatsSnafu { file_path })
74}
75
76#[derive(Clone)]
79pub struct FileRange {
80 context: FileRangeContextRef,
82 row_group_idx: usize,
84 row_selection: Option<RowSelection>,
86}
87
88impl FileRange {
89 pub(crate) fn new(
91 context: FileRangeContextRef,
92 row_group_idx: usize,
93 row_selection: Option<RowSelection>,
94 ) -> Self {
95 Self {
96 context,
97 row_group_idx,
98 row_selection,
99 }
100 }
101
102 fn select_all(&self) -> bool {
104 let rows_in_group = self
105 .context
106 .reader_builder
107 .parquet_metadata()
108 .row_group(self.row_group_idx)
109 .num_rows();
110
111 let Some(row_selection) = &self.row_selection else {
112 return true;
113 };
114 row_selection.row_count() == rows_in_group as usize
115 }
116
117 pub(crate) async fn reader(
119 &self,
120 selector: Option<TimeSeriesRowSelector>,
121 fetch_metrics: Option<&ParquetFetchMetrics>,
122 ) -> Result<PruneReader> {
123 let parquet_reader = self
124 .context
125 .reader_builder
126 .build(
127 self.row_group_idx,
128 self.row_selection.clone(),
129 fetch_metrics,
130 )
131 .await?;
132
133 let use_last_row_reader = if selector
134 .map(|s| s == TimeSeriesRowSelector::LastRow)
135 .unwrap_or(false)
136 {
137 let put_only = !self
140 .context
141 .contains_delete(self.row_group_idx)
142 .inspect_err(|e| {
143 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
144 })
145 .unwrap_or(true);
146 put_only && self.select_all()
147 } else {
148 false
150 };
151
152 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
154
155 let prune_reader = if use_last_row_reader {
156 let reader = RowGroupLastRowCachedReader::new(
158 self.file_handle().file_id().file_id(),
159 self.row_group_idx,
160 self.context.reader_builder.cache_strategy().clone(),
161 RowGroupReader::new(self.context.clone(), parquet_reader),
162 );
163 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
164 } else {
165 PruneReader::new_with_row_group_reader(
167 self.context.clone(),
168 RowGroupReader::new(self.context.clone(), parquet_reader),
169 skip_fields,
170 )
171 };
172
173 Ok(prune_reader)
174 }
175
176 pub(crate) async fn flat_reader(
178 &self,
179 fetch_metrics: Option<&ParquetFetchMetrics>,
180 ) -> Result<FlatPruneReader> {
181 let parquet_reader = self
182 .context
183 .reader_builder
184 .build(
185 self.row_group_idx,
186 self.row_selection.clone(),
187 fetch_metrics,
188 )
189 .await?;
190
191 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
193
194 let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
195 let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
196 self.context.clone(),
197 flat_row_group_reader,
198 skip_fields,
199 );
200
201 Ok(flat_prune_reader)
202 }
203
204 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
206 self.context.compat_batch()
207 }
208
209 pub(crate) fn file_handle(&self) -> &FileHandle {
211 self.context.reader_builder.file_handle()
212 }
213}
214
215pub(crate) struct FileRangeContext {
217 reader_builder: RowGroupReaderBuilder,
219 base: RangeBase,
221}
222
223pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
224
225impl FileRangeContext {
226 pub(crate) fn new(
228 reader_builder: RowGroupReaderBuilder,
229 filters: Vec<SimpleFilterContext>,
230 read_format: ReadFormat,
231 codec: Arc<dyn PrimaryKeyCodec>,
232 pre_filter_mode: PreFilterMode,
233 ) -> Self {
234 Self {
235 reader_builder,
236 base: RangeBase {
237 filters,
238 read_format,
239 codec,
240 compat_batch: None,
241 pre_filter_mode,
242 },
243 }
244 }
245
246 pub(crate) fn file_path(&self) -> &str {
248 self.reader_builder.file_path()
249 }
250
251 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
253 &self.base.filters
254 }
255
256 pub(crate) fn read_format(&self) -> &ReadFormat {
258 &self.base.read_format
259 }
260
261 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
263 &self.reader_builder
264 }
265
266 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
268 self.base.compat_batch.as_ref()
269 }
270
271 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
273 self.base.compat_batch = compat;
274 }
275
276 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
279 self.base.precise_filter(input, skip_fields)
280 }
281
282 pub(crate) fn precise_filter_flat(
284 &self,
285 input: RecordBatch,
286 skip_fields: bool,
287 ) -> Result<Option<RecordBatch>> {
288 self.base.precise_filter_flat(input, skip_fields)
289 }
290
291 pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
293 match self.base.pre_filter_mode {
294 PreFilterMode::All => false,
295 PreFilterMode::SkipFields => true,
296 PreFilterMode::SkipFieldsOnDelete => {
297 self.contains_delete(row_group_idx).unwrap_or(true)
299 }
300 }
301 }
302
303 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
305 let metadata = self.reader_builder.parquet_metadata();
306 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
307 }
308}
309
310#[derive(Debug, Clone, Copy)]
312pub enum PreFilterMode {
313 All,
315 SkipFieldsOnDelete,
318 SkipFields,
320}
321
322pub(crate) struct RangeBase {
324 pub(crate) filters: Vec<SimpleFilterContext>,
326 pub(crate) read_format: ReadFormat,
328 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
330 pub(crate) compat_batch: Option<CompatBatch>,
332 pub(crate) pre_filter_mode: PreFilterMode,
334}
335
336impl RangeBase {
337 pub(crate) fn precise_filter(
349 &self,
350 mut input: Batch,
351 skip_fields: bool,
352 ) -> Result<Option<Batch>> {
353 let mut mask = BooleanBuffer::new_set(input.num_rows());
354
355 for filter_ctx in &self.filters {
358 let filter = match filter_ctx.filter() {
359 MaybeFilter::Filter(f) => f,
360 MaybeFilter::Matched => continue,
362 MaybeFilter::Pruned => return Ok(None),
364 };
365 let result = match filter_ctx.semantic_type() {
366 SemanticType::Tag => {
367 let pk_values = if let Some(pk_values) = input.pk_values() {
368 pk_values
369 } else {
370 input.set_pk_values(
371 self.codec
372 .decode(input.primary_key())
373 .context(DecodeSnafu)?,
374 );
375 input.pk_values().unwrap()
376 };
377 let pk_value = match pk_values {
378 CompositeValues::Dense(v) => {
379 let pk_index = self
381 .read_format
382 .metadata()
383 .primary_key_index(filter_ctx.column_id())
384 .unwrap();
385 v[pk_index]
386 .1
387 .try_to_scalar_value(filter_ctx.data_type())
388 .context(DataTypeMismatchSnafu)?
389 }
390 CompositeValues::Sparse(v) => {
391 let v = v.get_or_null(filter_ctx.column_id());
392 v.try_to_scalar_value(filter_ctx.data_type())
393 .context(DataTypeMismatchSnafu)?
394 }
395 };
396 if filter
397 .evaluate_scalar(&pk_value)
398 .context(RecordBatchSnafu)?
399 {
400 continue;
401 } else {
402 return Ok(None);
404 }
405 }
406 SemanticType::Field => {
407 if skip_fields {
409 continue;
410 }
411 let Some(field_index) = self
413 .read_format
414 .as_primary_key()
415 .unwrap()
416 .field_index_by_id(filter_ctx.column_id())
417 else {
418 continue;
419 };
420 let field_col = &input.fields()[field_index].data;
421 filter
422 .evaluate_vector(field_col)
423 .context(RecordBatchSnafu)?
424 }
425 SemanticType::Timestamp => filter
426 .evaluate_vector(input.timestamps())
427 .context(RecordBatchSnafu)?,
428 };
429
430 mask = mask.bitand(&result);
431 }
432
433 input.filter(&BooleanArray::from(mask).into())?;
434
435 Ok(Some(input))
436 }
437
438 pub(crate) fn precise_filter_flat(
446 &self,
447 input: RecordBatch,
448 skip_fields: bool,
449 ) -> Result<Option<RecordBatch>> {
450 let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
451
452 let Some(mask) = mask else {
454 return Ok(None);
455 };
456
457 let filtered_batch =
458 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
459 .context(ComputeArrowSnafu)?;
460
461 if filtered_batch.num_rows() > 0 {
462 Ok(Some(filtered_batch))
463 } else {
464 Ok(None)
465 }
466 }
467
468 pub(crate) fn compute_filter_mask_flat(
476 &self,
477 input: &RecordBatch,
478 skip_fields: bool,
479 ) -> Result<Option<BooleanBuffer>> {
480 let mut mask = BooleanBuffer::new_set(input.num_rows());
481
482 let flat_format = self
483 .read_format
484 .as_flat()
485 .context(crate::error::UnexpectedSnafu {
486 reason: "Expected flat format for precise_filter_flat",
487 })?;
488
489 let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
491 let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
493
494 for filter_ctx in &self.filters {
496 let filter = match filter_ctx.filter() {
497 MaybeFilter::Filter(f) => f,
498 MaybeFilter::Matched => continue,
500 MaybeFilter::Pruned => return Ok(None),
502 };
503
504 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
506 continue;
507 }
508
509 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
511 if let Some(idx) = column_idx {
512 let column = &input.columns()[idx];
513 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
514 mask = mask.bitand(&result);
515 } else if filter_ctx.semantic_type() == SemanticType::Tag {
516 if decoded_pks.is_none() {
519 decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
520 }
521
522 let metadata = flat_format.metadata();
523 let column_id = filter_ctx.column_id();
524
525 let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
527 cached_column.clone()
528 } else {
529 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
531 None
532 } else {
533 metadata.primary_key_index(column_id)
534 };
535 let column_index = metadata.column_index_by_id(column_id);
536
537 if let (Some(column_index), Some(decoded)) =
538 (column_index, decoded_pks.as_ref())
539 {
540 let column_metadata = &metadata.column_metadatas[column_index];
541 let tag_column = decoded.get_tag_column(
542 column_id,
543 pk_index,
544 &column_metadata.column_schema.data_type,
545 )?;
546 decoded_tag_cache.insert(column_id, tag_column.clone());
548 tag_column
549 } else {
550 continue;
551 }
552 };
553
554 let result = filter
555 .evaluate_array(&tag_column)
556 .context(RecordBatchSnafu)?;
557 mask = mask.bitand(&result);
558 }
559 }
561
562 Ok(Some(mask))
563 }
564}