mito2/sst/parquet/
file_range.rs1use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use datatypes::arrow::record_batch::RecordBatch;
26use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
27use parquet::arrow::arrow_reader::RowSelection;
28use snafu::{OptionExt, ResultExt};
29use store_api::storage::TimeSeriesRowSelector;
30
31use crate::error::{
32 ComputeArrowSnafu, DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu,
33 Result, StatsNotPresentSnafu,
34};
35use crate::read::compat::CompatBatch;
36use crate::read::last_row::RowGroupLastRowCachedReader;
37use crate::read::prune::{FlatPruneReader, PruneReader};
38use crate::read::Batch;
39use crate::sst::file::FileHandle;
40use crate::sst::parquet::format::ReadFormat;
41use crate::sst::parquet::reader::{
42 FlatRowGroupReader, MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
43};
44
45#[derive(Clone)]
48pub struct FileRange {
49 context: FileRangeContextRef,
51 row_group_idx: usize,
53 row_selection: Option<RowSelection>,
55}
56
57impl FileRange {
58 pub(crate) fn new(
60 context: FileRangeContextRef,
61 row_group_idx: usize,
62 row_selection: Option<RowSelection>,
63 ) -> Self {
64 Self {
65 context,
66 row_group_idx,
67 row_selection,
68 }
69 }
70
71 fn select_all(&self) -> bool {
73 let rows_in_group = self
74 .context
75 .reader_builder
76 .parquet_metadata()
77 .row_group(self.row_group_idx)
78 .num_rows();
79
80 let Some(row_selection) = &self.row_selection else {
81 return true;
82 };
83 row_selection.row_count() == rows_in_group as usize
84 }
85
86 pub(crate) async fn reader(
88 &self,
89 selector: Option<TimeSeriesRowSelector>,
90 ) -> Result<PruneReader> {
91 let parquet_reader = self
92 .context
93 .reader_builder
94 .build(self.row_group_idx, self.row_selection.clone())
95 .await?;
96
97 let use_last_row_reader = if selector
98 .map(|s| s == TimeSeriesRowSelector::LastRow)
99 .unwrap_or(false)
100 {
101 let put_only = !self
104 .context
105 .contains_delete(self.row_group_idx)
106 .inspect_err(|e| {
107 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
108 })
109 .unwrap_or(true);
110 put_only && self.select_all()
111 } else {
112 false
114 };
115
116 let prune_reader = if use_last_row_reader {
117 let reader = RowGroupLastRowCachedReader::new(
119 self.file_handle().file_id().file_id(),
120 self.row_group_idx,
121 self.context.reader_builder.cache_strategy().clone(),
122 RowGroupReader::new(self.context.clone(), parquet_reader),
123 );
124 PruneReader::new_with_last_row_reader(self.context.clone(), reader)
125 } else {
126 PruneReader::new_with_row_group_reader(
128 self.context.clone(),
129 RowGroupReader::new(self.context.clone(), parquet_reader),
130 )
131 };
132
133 Ok(prune_reader)
134 }
135
136 pub(crate) async fn flat_reader(&self) -> Result<FlatPruneReader> {
138 let parquet_reader = self
139 .context
140 .reader_builder
141 .build(self.row_group_idx, self.row_selection.clone())
142 .await?;
143
144 let flat_row_group_reader = FlatRowGroupReader::new(self.context.clone(), parquet_reader);
145 let flat_prune_reader =
146 FlatPruneReader::new_with_row_group_reader(self.context.clone(), flat_row_group_reader);
147
148 Ok(flat_prune_reader)
149 }
150
151 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
153 self.context.compat_batch()
154 }
155
156 pub(crate) fn file_handle(&self) -> &FileHandle {
158 self.context.reader_builder.file_handle()
159 }
160}
161
162pub(crate) struct FileRangeContext {
164 reader_builder: RowGroupReaderBuilder,
166 base: RangeBase,
168}
169
170pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
171
172impl FileRangeContext {
173 pub(crate) fn new(
175 reader_builder: RowGroupReaderBuilder,
176 filters: Vec<SimpleFilterContext>,
177 read_format: ReadFormat,
178 codec: Arc<dyn PrimaryKeyCodec>,
179 ) -> Self {
180 Self {
181 reader_builder,
182 base: RangeBase {
183 filters,
184 read_format,
185 codec,
186 compat_batch: None,
187 },
188 }
189 }
190
191 pub(crate) fn file_path(&self) -> &str {
193 self.reader_builder.file_path()
194 }
195
196 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
198 &self.base.filters
199 }
200
201 pub(crate) fn read_format(&self) -> &ReadFormat {
203 &self.base.read_format
204 }
205
206 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
208 &self.reader_builder
209 }
210
211 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
213 self.base.compat_batch.as_ref()
214 }
215
216 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
218 self.base.compat_batch = compat;
219 }
220
221 pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
224 self.base.precise_filter(input)
225 }
226
227 pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
229 self.base.precise_filter_flat(input)
230 }
231
232 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
234 let metadata = self.reader_builder.parquet_metadata();
235 let row_group_metadata = &metadata.row_groups()[row_group_index];
236
237 let column_metadata = &row_group_metadata.columns().last().unwrap();
239 let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
240 file_path: self.reader_builder.file_path(),
241 })?;
242 stats
243 .min_bytes_opt()
244 .context(StatsNotPresentSnafu {
245 file_path: self.reader_builder.file_path(),
246 })?
247 .try_into()
248 .map(i32::from_le_bytes)
249 .map(|min_op_type| min_op_type == OpType::Delete as i32)
250 .ok()
251 .context(DecodeStatsSnafu {
252 file_path: self.reader_builder.file_path(),
253 })
254 }
255}
256
257pub(crate) struct RangeBase {
259 pub(crate) filters: Vec<SimpleFilterContext>,
261 pub(crate) read_format: ReadFormat,
263 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
265 pub(crate) compat_batch: Option<CompatBatch>,
267}
268
269impl RangeBase {
270 pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
278 let mut mask = BooleanBuffer::new_set(input.num_rows());
279
280 for filter_ctx in &self.filters {
283 let filter = match filter_ctx.filter() {
284 MaybeFilter::Filter(f) => f,
285 MaybeFilter::Matched => continue,
287 MaybeFilter::Pruned => return Ok(None),
289 };
290 let result = match filter_ctx.semantic_type() {
291 SemanticType::Tag => {
292 let pk_values = if let Some(pk_values) = input.pk_values() {
293 pk_values
294 } else {
295 input.set_pk_values(
296 self.codec
297 .decode(input.primary_key())
298 .context(DecodeSnafu)?,
299 );
300 input.pk_values().unwrap()
301 };
302 let pk_value = match pk_values {
303 CompositeValues::Dense(v) => {
304 let pk_index = self
306 .read_format
307 .metadata()
308 .primary_key_index(filter_ctx.column_id())
309 .unwrap();
310 v[pk_index]
311 .1
312 .try_to_scalar_value(filter_ctx.data_type())
313 .context(DataTypeMismatchSnafu)?
314 }
315 CompositeValues::Sparse(v) => {
316 let v = v.get_or_null(filter_ctx.column_id());
317 v.try_to_scalar_value(filter_ctx.data_type())
318 .context(DataTypeMismatchSnafu)?
319 }
320 };
321 if filter
322 .evaluate_scalar(&pk_value)
323 .context(RecordBatchSnafu)?
324 {
325 continue;
326 } else {
327 return Ok(None);
329 }
330 }
331 SemanticType::Field => {
332 let Some(field_index) = self
334 .read_format
335 .as_primary_key()
336 .unwrap()
337 .field_index_by_id(filter_ctx.column_id())
338 else {
339 continue;
340 };
341 let field_col = &input.fields()[field_index].data;
342 filter
343 .evaluate_vector(field_col)
344 .context(RecordBatchSnafu)?
345 }
346 SemanticType::Timestamp => filter
347 .evaluate_vector(input.timestamps())
348 .context(RecordBatchSnafu)?,
349 };
350
351 mask = mask.bitand(&result);
352 }
353
354 input.filter(&BooleanArray::from(mask).into())?;
355
356 Ok(Some(input))
357 }
358
359 pub(crate) fn precise_filter_flat(&self, input: RecordBatch) -> Result<Option<RecordBatch>> {
361 let mut mask = BooleanBuffer::new_set(input.num_rows());
362
363 let flat_format = self
364 .read_format
365 .as_flat()
366 .context(crate::error::UnexpectedSnafu {
367 reason: "Expected flat format for precise_filter_flat",
368 })?;
369
370 for filter_ctx in &self.filters {
372 let filter = match filter_ctx.filter() {
373 MaybeFilter::Filter(f) => f,
374 MaybeFilter::Matched => continue,
376 MaybeFilter::Pruned => return Ok(None),
378 };
379
380 let column_idx = flat_format.projected_index_by_id(filter_ctx.column_id());
382 if let Some(idx) = column_idx {
383 let column = &input.columns()[idx];
384 let result = filter.evaluate_array(column).context(RecordBatchSnafu)?;
385 mask = mask.bitand(&result);
386 } else {
387 continue;
389 }
390 }
391
392 let filtered_batch =
393 datatypes::arrow::compute::filter_record_batch(&input, &BooleanArray::from(mask))
394 .context(ComputeArrowSnafu)?;
395
396 if filtered_batch.num_rows() > 0 {
397 Ok(Some(filtered_batch))
398 } else {
399 Ok(None)
400 }
401 }
402}