1use std::collections::HashMap;
19use std::ops::BitAnd;
20use std::sync::Arc;
21
22use api::v1::{OpType, SemanticType};
23use common_telemetry::error;
24use datatypes::arrow::array::{ArrayRef, BooleanArray};
25use datatypes::arrow::buffer::BooleanBuffer;
26use datatypes::arrow::record_batch::RecordBatch;
27use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
28use parquet::arrow::arrow_reader::RowSelection;
29use parquet::file::metadata::ParquetMetaData;
30use snafu::{OptionExt, ResultExt};
31use store_api::codec::PrimaryKeyEncoding;
32use store_api::storage::{ColumnId, TimeSeriesRowSelector};
33
34use crate::error::{
35 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
36 Result, StatsNotPresentSnafu,
37};
38use crate::read::Batch;
39use crate::read::compat::CompatBatch;
40use crate::read::last_row::RowGroupLastRowCachedReader;
41use crate::read::prune::{FlatPruneReader, PruneReader};
42use crate::sst::file::FileHandle;
43use crate::sst::parquet::flat_format::{DecodedPrimaryKeys, decode_primary_keys};
44use crate::sst::parquet::format::ReadFormat;
45use crate::sst::parquet::reader::{
46 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
47};
48
49pub(crate) fn row_group_contains_delete(
54 parquet_meta: &ParquetMetaData,
55 row_group_index: usize,
56 file_path: &str,
57) -> Result<bool> {
58 let row_group_metadata = &parquet_meta.row_groups()[row_group_index];
59
60 let column_metadata = &row_group_metadata.columns().last().unwrap();
62 let stats = column_metadata
63 .statistics()
64 .context(StatsNotPresentSnafu { file_path })?;
65 stats
66 .min_bytes_opt()
67 .context(StatsNotPresentSnafu { file_path })?
68 .try_into()
69 .map(i32::from_le_bytes)
70 .map(|min_op_type| min_op_type == OpType::Delete as i32)
71 .ok()
72 .context(DecodeStatsSnafu { file_path })
73}
74
75#[derive(Clone)]
78pub struct FileRange {
79 context: FileRangeContextRef,
81 row_group_idx: usize,
83 row_selection: Option<RowSelection>,
85}
86
87impl FileRange {
88 pub(crate) fn new(
90 context: FileRangeContextRef,
91 row_group_idx: usize,
92 row_selection: Option<RowSelection>,
93 ) -> Self {
94 Self {
95 context,
96 row_group_idx,
97 row_selection,
98 }
99 }
100
101 fn select_all(&self) -> bool {
103 let rows_in_group = self
104 .context
105 .reader_builder
106 .parquet_metadata()
107 .row_group(self.row_group_idx)
108 .num_rows();
109
110 let Some(row_selection) = &self.row_selection else {
111 return true;
112 };
113 row_selection.row_count() == rows_in_group as usize
114 }
115
116 pub(crate) async fn reader(
118 &self,
119 selector: Option<TimeSeriesRowSelector>,
120 ) -> Result<PruneReader> {
121 let parquet_reader = self
122 .context
123 .reader_builder
124 .build(self.row_group_idx, self.row_selection.clone())
125 .await?;
126
127 let use_last_row_reader = if selector
128 .map(|s| s == TimeSeriesRowSelector::LastRow)
129 .unwrap_or(false)
130 {
131 let put_only = !self
134 .context
135 .contains_delete(self.row_group_idx)
136 .inspect_err(|e| {
137 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
138 })
139 .unwrap_or(true);
140 put_only && self.select_all()
141 } else {
142 false
144 };
145
146 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
148
149 let prune_reader = if use_last_row_reader {
150 let reader = RowGroupLastRowCachedReader::new(
152 self.file_handle().file_id().file_id(),
153 self.row_group_idx,
154 self.context.reader_builder.cache_strategy().clone(),
155 RowGroupReader::new(self.context.clone(), parquet_reader),
156 );
157 PruneReader::new_with_last_row_reader(self.context.clone(), reader, skip_fields)
158 } else {
159 PruneReader::new_with_row_group_reader(
161 self.context.clone(),
162 RowGroupReader::new(self.context.clone(), parquet_reader),
163 skip_fields,
164 )
165 };
166
167 Ok(prune_reader)
168 }
169
170 pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
172 let parquet_reader = self
173 .context
174 .reader_builder
175 .build(self.row_group_idx, self.row_selection.clone())
176 .await?;
177
178 let skip_fields = self.context.should_skip_fields(self.row_group_idx);
180
181 let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
182 let flat_prune_reader = FlatPruneReader::new_with_row_group_reader(
183 self.context.clone(),
184 flat_row_group_reader,
185 skip_fields,
186 );
187
188 Ok(flat_prune_reader)
189 }
190
191 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
193 self.context.compat_batch()
194 }
195
196 pub(crate) fn file_handle(&self) -> &FileHandle {
198 self.context.reader_builder.file_handle()
199 }
200}
201
202pub(crate) struct FileRangeContext {
204 reader_builder: RowGroupReaderBuilder,
206 base: RangeBase,
208}
209
210pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
211
212impl FileRangeContext {
213 pub(crate) fn new(
215 reader_builder: RowGroupReaderBuilder,
216 filters: Vec<SimpleFilterContext>,
217 read_format: ReadFormat,
218 codec: Arc<dyn PrimaryKeyCodec>,
219 pre_filter_mode: PreFilterMode,
220 ) -> Self {
221 Self {
222 reader_builder,
223 base: RangeBase {
224 filters,
225 read_format,
226 codec,
227 compat_batch: None,
228 pre_filter_mode,
229 },
230 }
231 }
232
233 pub(crate) fn file_path(&self) -> &str {
235 self.reader_builder.file_path()
236 }
237
238 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
240 &self.base.filters
241 }
242
243 pub(crate) fn read_format(&self) -> &ReadFormat {
245 &self.base.read_format
246 }
247
248 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
250 &self.reader_builder
251 }
252
253 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
255 self.base.compat_batch.as_ref()
256 }
257
258 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
260 self.base.compat_batch = compat;
261 }
262
263 pub(crate) fn precise_filter(&self, input: Batch, skip_fields: bool) -> Result<Option<Batch>> {
266 self.base.precise_filter(input, skip_fields)
267 }
268
269 pub(crate) fn precise_filter_flat(
271 &self,
272 input: RecordBatch,
273 skip_fields: bool,
274 ) -> Result<Option<RecordBatch>> {
275 self.base.precise_filter_flat(input, skip_fields)
276 }
277
278 pub(crate) fn should_skip_fields(&self, row_group_idx: usize) -> bool {
280 match self.base.pre_filter_mode {
281 PreFilterMode::All => false,
282 PreFilterMode::SkipFields => true,
283 PreFilterMode::SkipFieldsOnDelete => {
284 self.contains_delete(row_group_idx).unwrap_or(true)
286 }
287 }
288 }
289
290 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
292 let metadata = self.reader_builder.parquet_metadata();
293 row_group_contains_delete(metadata, row_group_index, self.reader_builder.file_path())
294 }
295}
296
297#[derive(Debug, Clone, Copy)]
299pub enum PreFilterMode {
300 All,
302 SkipFieldsOnDelete,
305 SkipFields,
307}
308
309pub(crate) struct RangeBase {
311 pub(crate) filters: Vec<SimpleFilterContext>,
313 pub(crate) read_format: ReadFormat,
315 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
317 pub(crate) compat_batch: Option<CompatBatch>,
319 pub(crate) pre_filter_mode: PreFilterMode,
321}
322
323impl RangeBase {
324 pub(crate) fn precise_filter(
336 &self,
337 mut input: Batch,
338 skip_fields: bool,
339 ) -> Result<Option<Batch>> {
340 let mut mask = BooleanBuffer::new_set(input.num_rows());
341
342 for filter_ctx in &self.filters {
345 let filter = match filter_ctx.filter() {
346 MaybeFilter::Filter(f) => f,
347 MaybeFilter::Matched => continue,
349 MaybeFilter::Pruned => return Ok(None),
351 };
352 let result = match filter_ctx.semantic_type() {
353 SemanticType::Tag => {
354 let pk_values = if let Some(pk_values) = input.pk_values() {
355 pk_values
356 } else {
357 input.set_pk_values(
358 self.codec
359 .decode(input.primary_key())
360 .context(DecodeSnafu)?,
361 );
362 input.pk_values().unwrap()
363 };
364 let pk_value = match pk_values {
365 CompositeValues::Dense(v) => {
366 let pk_index = self
368 .read_format
369 .metadata()
370 .primary_key_index(filter_ctx.column_id())
371 .unwrap();
372 v[pk_index]
373 .1
374 .try_to_scalar_value(filter_ctx.data_type())
375 .context(DataTypeMismatchSnafu)?
376 }
377 CompositeValues::Sparse(v) => {
378 let v = v.get_or_null(filter_ctx.column_id());
379 v.try_to_scalar_value(filter_ctx.data_type())
380 .context(DataTypeMismatchSnafu)?
381 }
382 };
383 if filter
384 .evaluate_scalar(&pk_value)
385 .context(RecordBatchSnafu)?
386 {
387 continue;
388 } else {
389 return Ok(None);
391 }
392 }
393 SemanticType::Field => {
394 if skip_fields {
396 continue;
397 }
398 let Some(field_index) = self
400 .read_format
401 .as_primary_key()
402 .unwrap()
403 .field_index_by_id(filter_ctx.column_id())
404 else {
405 continue;
406 };
407 let field_col = &input.fields()[field_index].data;
408 filter
409 .evaluate_vector(field_col)
410 .context(RecordBatchSnafu)?
411 }
412 SemanticType::Timestamp => filter
413 .evaluate_vector(input.timestamps())
414 .context(RecordBatchSnafu)?,
415 };
416
417 mask = mask.bitand(&result);
418 }
419
420 input.filter(&BooleanArray::from(mask).into())?;
421
422 Ok(Some(input))
423 }
424
425 pub(crate) fn precise_filter_flat(
433 &self,
434 input: RecordBatch,
435 skip_fields: bool,
436 ) -> Result<Option<RecordBatch>> {
437 let mask = self.compute_filter_mask_flat(&input, skip_fields)?;
438
439 let Some(mask) = mask else {
441 return Ok(None);
442 };
443
444 let filtered_batch =
445 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
446 .context(ComputeArrowSnafu)?;
447
448 if filtered_batch.num_rows() > 0 {
449 Ok(Some(filtered_batch))
450 } else {
451 Ok(None)
452 }
453 }
454
455 pub(crate) fn compute_filter_mask_flat(
463 &self,
464 input: &RecordBatch,
465 skip_fields: bool,
466 ) -> Result<Option<BooleanBuffer>> {
467 let mut mask = BooleanBuffer::new_set(input.num_rows());
468
469 let flat_format = self
470 .read_format
471 .as_flat()
472 .context(crate::error::UnexpectedSnafu {
473 reason: "Expected flat format for precise_filter_flat",
474 })?;
475
476 let mut decoded_pks: Option<DecodedPrimaryKeys> = None;
478 let mut decoded_tag_cache: HashMap<ColumnId, ArrayRef> = HashMap::new();
480
481 for filter_ctx in &self.filters {
483 let filter = match filter_ctx.filter() {
484 MaybeFilter::Filter(f) => f,
485 MaybeFilter::Matched => continue,
487 MaybeFilter::Pruned => return Ok(None),
489 };
490
491 if skip_fields && filter_ctx.semantic_type() == SemanticType::Field {
493 continue;
494 }
495
496 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
498 if let Some(idx) = column_idx {
499 let column = &input.columns()[idx];
500 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
501 mask = mask.bitand(&result);
502 } else if filter_ctx.semantic_type() == SemanticType::Tag {
503 if decoded_pks.is_none() {
506 decoded_pks = Some(decode_primary_keys(self.codec.as_ref(), input)?);
507 }
508
509 let metadata = flat_format.metadata();
510 let column_id = filter_ctx.column_id();
511
512 let tag_column = if let Some(cached_column) = decoded_tag_cache.get(&column_id) {
514 cached_column.clone()
515 } else {
516 let pk_index = if self.codec.encoding() == PrimaryKeyEncoding::Sparse {
518 None
519 } else {
520 metadata.primary_key_index(column_id)
521 };
522 let column_index = metadata.column_index_by_id(column_id);
523
524 if let (Some(column_index), Some(decoded)) =
525 (column_index, decoded_pks.as_ref())
526 {
527 let column_metadata = &metadata.column_metadatas[column_index];
528 let tag_column = decoded.get_tag_column(
529 column_id,
530 pk_index,
531 &column_metadata.column_schema.data_type,
532 )?;
533 decoded_tag_cache.insert(column_id, tag_column.clone());
535 tag_column
536 } else {
537 continue;
538 }
539 };
540
541 let result = filter
542 .evaluate_array(&tag_column)
543 .context(RecordBatchSnafu)?;
544 mask = mask.bitand(&result);
545 }
546 }
548
549 Ok(Some(mask))
550 }
551}