mito2/sst/parquet/
file_range.rs1use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use mito_codec::row_converter::{CompositeValues, PrimaryKeyCodec};
26use parquet::arrow::arrow_reader::RowSelection;
27use snafu::{OptionExt, ResultExt};
28use store_api::storage::TimeSeriesRowSelector;
29
30use crate::error::{
31 DataTypeMismatchSnafu, DecodeSnafu, DecodeStatsSnafu, RecordBatchSnafu, Result,
32 StatsNotPresentSnafu,
33};
34use crate::read::compat::CompatBatch;
35use crate::read::last_row::RowGroupLastRowCachedReader;
36use crate::read::prune::PruneReader;
37use crate::read::Batch;
38use crate::sst::file::FileHandle;
39use crate::sst::parquet::format::ReadFormat;
40use crate::sst::parquet::reader::{
41 MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
42};
43
44#[derive(Clone)]
47pub struct FileRange {
48 context: FileRangeContextRef,
50 row_group_idx: usize,
52 row_selection: Option<RowSelection>,
54}
55
56impl FileRange {
57 pub(crate) fn new(
59 context: FileRangeContextRef,
60 row_group_idx: usize,
61 row_selection: Option<RowSelection>,
62 ) -> Self {
63 Self {
64 context,
65 row_group_idx,
66 row_selection,
67 }
68 }
69
70 fn select_all(&self) -> bool {
72 let rows_in_group = self
73 .context
74 .reader_builder
75 .parquet_metadata()
76 .row_group(self.row_group_idx)
77 .num_rows();
78
79 let Some(row_selection) = &self.row_selection else {
80 return true;
81 };
82 row_selection.row_count() == rows_in_group as usize
83 }
84
85 pub(crate) async fn reader(
87 &self,
88 selector: Option<TimeSeriesRowSelector>,
89 ) -> Result<PruneReader> {
90 let parquet_reader = self
91 .context
92 .reader_builder
93 .build(self.row_group_idx, self.row_selection.clone())
94 .await?;
95
96 let use_last_row_reader = if selector
97 .map(|s| s == TimeSeriesRowSelector::LastRow)
98 .unwrap_or(false)
99 {
100 let put_only = !self
103 .context
104 .contains_delete(self.row_group_idx)
105 .inspect_err(|e| {
106 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
107 })
108 .unwrap_or(true);
109 put_only && self.select_all()
110 } else {
111 false
113 };
114
115 let prune_reader = if use_last_row_reader {
116 let reader = RowGroupLastRowCachedReader::new(
118 self.file_handle().file_id(),
119 self.row_group_idx,
120 self.context.reader_builder.cache_strategy().clone(),
121 RowGroupReader::new(self.context.clone(), parquet_reader),
122 );
123 PruneReader::new_with_last_row_reader(self.context.clone(), reader)
124 } else {
125 PruneReader::new_with_row_group_reader(
127 self.context.clone(),
128 RowGroupReader::new(self.context.clone(), parquet_reader),
129 )
130 };
131
132 Ok(prune_reader)
133 }
134
135 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
137 self.context.compat_batch()
138 }
139
140 pub(crate) fn file_handle(&self) -> &FileHandle {
142 self.context.reader_builder.file_handle()
143 }
144}
145
146pub(crate) struct FileRangeContext {
148 reader_builder: RowGroupReaderBuilder,
150 base: RangeBase,
152}
153
154pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
155
156impl FileRangeContext {
157 pub(crate) fn new(
159 reader_builder: RowGroupReaderBuilder,
160 filters: Vec<SimpleFilterContext>,
161 read_format: ReadFormat,
162 codec: Arc<dyn PrimaryKeyCodec>,
163 ) -> Self {
164 Self {
165 reader_builder,
166 base: RangeBase {
167 filters,
168 read_format,
169 codec,
170 compat_batch: None,
171 },
172 }
173 }
174
175 pub(crate) fn file_path(&self) -> &str {
177 self.reader_builder.file_path()
178 }
179
180 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
182 &self.base.filters
183 }
184
185 pub(crate) fn read_format(&self) -> &ReadFormat {
187 &self.base.read_format
188 }
189
190 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
192 &self.reader_builder
193 }
194
195 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
197 self.base.compat_batch.as_ref()
198 }
199
200 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
202 self.base.compat_batch = compat;
203 }
204
205 pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
208 self.base.precise_filter(input)
209 }
210
211 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
213 let metadata = self.reader_builder.parquet_metadata();
214 let row_group_metadata = &metadata.row_groups()[row_group_index];
215
216 let column_metadata = &row_group_metadata.columns().last().unwrap();
218 let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
219 file_path: self.reader_builder.file_path(),
220 })?;
221 stats
222 .min_bytes_opt()
223 .context(StatsNotPresentSnafu {
224 file_path: self.reader_builder.file_path(),
225 })?
226 .try_into()
227 .map(i32::from_le_bytes)
228 .map(|min_op_type| min_op_type == OpType::Delete as i32)
229 .ok()
230 .context(DecodeStatsSnafu {
231 file_path: self.reader_builder.file_path(),
232 })
233 }
234}
235
236pub(crate) struct RangeBase {
238 pub(crate) filters: Vec<SimpleFilterContext>,
240 pub(crate) read_format: ReadFormat,
242 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
244 pub(crate) compat_batch: Option<CompatBatch>,
246}
247
248impl RangeBase {
249 pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
257 let mut mask = BooleanBuffer::new_set(input.num_rows());
258
259 for filter_ctx in &self.filters {
262 let filter = match filter_ctx.filter() {
263 MaybeFilter::Filter(f) => f,
264 MaybeFilter::Matched => continue,
266 MaybeFilter::Pruned => return Ok(None),
268 };
269 let result = match filter_ctx.semantic_type() {
270 SemanticType::Tag => {
271 let pk_values = if let Some(pk_values) = input.pk_values() {
272 pk_values
273 } else {
274 input.set_pk_values(
275 self.codec
276 .decode(input.primary_key())
277 .context(DecodeSnafu)?,
278 );
279 input.pk_values().unwrap()
280 };
281 let pk_value = match pk_values {
282 CompositeValues::Dense(v) => {
283 let pk_index = self
285 .read_format
286 .metadata()
287 .primary_key_index(filter_ctx.column_id())
288 .unwrap();
289 v[pk_index]
290 .1
291 .try_to_scalar_value(filter_ctx.data_type())
292 .context(DataTypeMismatchSnafu)?
293 }
294 CompositeValues::Sparse(v) => {
295 let v = v.get_or_null(filter_ctx.column_id());
296 v.try_to_scalar_value(filter_ctx.data_type())
297 .context(DataTypeMismatchSnafu)?
298 }
299 };
300 if filter
301 .evaluate_scalar(&pk_value)
302 .context(RecordBatchSnafu)?
303 {
304 continue;
305 } else {
306 return Ok(None);
308 }
309 }
310 SemanticType::Field => {
311 let Some(field_index) =
312 self.read_format.field_index_by_id(filter_ctx.column_id())
313 else {
314 continue;
315 };
316 let field_col = &input.fields()[field_index].data;
317 filter
318 .evaluate_vector(field_col)
319 .context(RecordBatchSnafu)?
320 }
321 SemanticType::Timestamp => filter
322 .evaluate_vector(input.timestamps())
323 .context(RecordBatchSnafu)?,
324 };
325
326 mask = mask.bitand(&result);
327 }
328
329 input.filter(&BooleanArray::from(mask).into())?;
330
331 Ok(Some(input))
332 }
333}