mito2/sst/parquet/
file_range.rs1use std::ops::BitAnd;
19use std::sync::Arc;
20
21use api::v1::{OpType, SemanticType};
22use common_telemetry::error;
23use datatypes::arrow::array::BooleanArray;
24use datatypes::arrow::buffer::BooleanBuffer;
25use parquet::arrow::arrow_reader::RowSelection;
26use snafu::{OptionExt, ResultExt};
27use store_api::storage::TimeSeriesRowSelector;
28
29use crate::error::{
30 DecodeStatsSnafu, FieldTypeMismatchSnafu, FilterRecordBatchSnafu, Result, StatsNotPresentSnafu,
31};
32use crate::read::compat::CompatBatch;
33use crate::read::last_row::RowGroupLastRowCachedReader;
34use crate::read::prune::PruneReader;
35use crate::read::Batch;
36use crate::row_converter::{CompositeValues, PrimaryKeyCodec};
37use crate::sst::file::FileHandle;
38use crate::sst::parquet::format::ReadFormat;
39use crate::sst::parquet::reader::{
40 MaybeFilter, RowGroupReader, RowGroupReaderBuilder, SimpleFilterContext,
41};
42
43#[derive(Clone)]
46pub struct FileRange {
47 context: FileRangeContextRef,
49 row_group_idx: usize,
51 row_selection: Option<RowSelection>,
53}
54
55impl FileRange {
56 pub(crate) fn new(
58 context: FileRangeContextRef,
59 row_group_idx: usize,
60 row_selection: Option<RowSelection>,
61 ) -> Self {
62 Self {
63 context,
64 row_group_idx,
65 row_selection,
66 }
67 }
68
69 fn select_all(&self) -> bool {
71 let rows_in_group = self
72 .context
73 .reader_builder
74 .parquet_metadata()
75 .row_group(self.row_group_idx)
76 .num_rows();
77
78 let Some(row_selection) = &self.row_selection else {
79 return true;
80 };
81 row_selection.row_count() == rows_in_group as usize
82 }
83
84 pub(crate) async fn reader(
86 &self,
87 selector: Option<TimeSeriesRowSelector>,
88 ) -> Result<PruneReader> {
89 let parquet_reader = self
90 .context
91 .reader_builder
92 .build(self.row_group_idx, self.row_selection.clone())
93 .await?;
94
95 let use_last_row_reader = if selector
96 .map(|s| s == TimeSeriesRowSelector::LastRow)
97 .unwrap_or(false)
98 {
99 let put_only = !self
102 .context
103 .contains_delete(self.row_group_idx)
104 .inspect_err(|e| {
105 error!(e; "Failed to decode min value of op_type, fallback to RowGroupReader");
106 })
107 .unwrap_or(true);
108 put_only && self.select_all()
109 } else {
110 false
112 };
113
114 let prune_reader = if use_last_row_reader {
115 let reader = RowGroupLastRowCachedReader::new(
117 self.file_handle().file_id(),
118 self.row_group_idx,
119 self.context.reader_builder.cache_strategy().clone(),
120 RowGroupReader::new(self.context.clone(), parquet_reader),
121 );
122 PruneReader::new_with_last_row_reader(self.context.clone(), reader)
123 } else {
124 PruneReader::new_with_row_group_reader(
126 self.context.clone(),
127 RowGroupReader::new(self.context.clone(), parquet_reader),
128 )
129 };
130
131 Ok(prune_reader)
132 }
133
134 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
136 self.context.compat_batch()
137 }
138
139 pub(crate) fn file_handle(&self) -> &FileHandle {
141 self.context.reader_builder.file_handle()
142 }
143}
144
145pub(crate) struct FileRangeContext {
147 reader_builder: RowGroupReaderBuilder,
149 base: RangeBase,
151}
152
153pub(crate) type FileRangeContextRef = Arc<FileRangeContext>;
154
155impl FileRangeContext {
156 pub(crate) fn new(
158 reader_builder: RowGroupReaderBuilder,
159 filters: Vec<SimpleFilterContext>,
160 read_format: ReadFormat,
161 codec: Arc<dyn PrimaryKeyCodec>,
162 ) -> Self {
163 Self {
164 reader_builder,
165 base: RangeBase {
166 filters,
167 read_format,
168 codec,
169 compat_batch: None,
170 },
171 }
172 }
173
174 pub(crate) fn file_path(&self) -> &str {
176 self.reader_builder.file_path()
177 }
178
179 pub(crate) fn filters(&self) -> &[SimpleFilterContext] {
181 &self.base.filters
182 }
183
184 pub(crate) fn read_format(&self) -> &ReadFormat {
186 &self.base.read_format
187 }
188
189 pub(crate) fn reader_builder(&self) -> &RowGroupReaderBuilder {
191 &self.reader_builder
192 }
193
194 pub(crate) fn compat_batch(&self) -> Option<&CompatBatch> {
196 self.base.compat_batch.as_ref()
197 }
198
199 pub(crate) fn set_compat_batch(&mut self, compat: Option<CompatBatch>) {
201 self.base.compat_batch = compat;
202 }
203
204 pub(crate) fn precise_filter(&self, input: Batch) -> Result<Option<Batch>> {
207 self.base.precise_filter(input)
208 }
209
210 pub(crate) fn contains_delete(&self, row_group_index: usize) -> Result<bool> {
212 let metadata = self.reader_builder.parquet_metadata();
213 let row_group_metadata = &metadata.row_groups()[row_group_index];
214
215 let column_metadata = &row_group_metadata.columns().last().unwrap();
217 let stats = column_metadata.statistics().context(StatsNotPresentSnafu {
218 file_path: self.reader_builder.file_path(),
219 })?;
220 stats
221 .min_bytes_opt()
222 .context(StatsNotPresentSnafu {
223 file_path: self.reader_builder.file_path(),
224 })?
225 .try_into()
226 .map(i32::from_le_bytes)
227 .map(|min_op_type| min_op_type == OpType::Delete as i32)
228 .ok()
229 .context(DecodeStatsSnafu {
230 file_path: self.reader_builder.file_path(),
231 })
232 }
233}
234
235pub(crate) struct RangeBase {
237 pub(crate) filters: Vec<SimpleFilterContext>,
239 pub(crate) read_format: ReadFormat,
241 pub(crate) codec: Arc<dyn PrimaryKeyCodec>,
243 pub(crate) compat_batch: Option<CompatBatch>,
245}
246
247impl RangeBase {
248 pub(crate) fn precise_filter(&self, mut input: Batch) -> Result<Option<Batch>> {
256 let mut mask = BooleanBuffer::new_set(input.num_rows());
257
258 for filter_ctx in &self.filters {
261 let filter = match filter_ctx.filter() {
262 MaybeFilter::Filter(f) => f,
263 MaybeFilter::Matched => continue,
265 MaybeFilter::Pruned => return Ok(None),
267 };
268 let result = match filter_ctx.semantic_type() {
269 SemanticType::Tag => {
270 let pk_values = if let Some(pk_values) = input.pk_values() {
271 pk_values
272 } else {
273 input.set_pk_values(self.codec.decode(input.primary_key())?);
274 input.pk_values().unwrap()
275 };
276 let pk_value = match pk_values {
277 CompositeValues::Dense(v) => {
278 let pk_index = self
280 .read_format
281 .metadata()
282 .primary_key_index(filter_ctx.column_id())
283 .unwrap();
284 v[pk_index]
285 .1
286 .try_to_scalar_value(filter_ctx.data_type())
287 .context(FieldTypeMismatchSnafu)?
288 }
289 CompositeValues::Sparse(v) => {
290 let v = v.get_or_null(filter_ctx.column_id());
291 v.try_to_scalar_value(filter_ctx.data_type())
292 .context(FieldTypeMismatchSnafu)?
293 }
294 };
295 if filter
296 .evaluate_scalar(&pk_value)
297 .context(FilterRecordBatchSnafu)?
298 {
299 continue;
300 } else {
301 return Ok(None);
303 }
304 }
305 SemanticType::Field => {
306 let Some(field_index) =
307 self.read_format.field_index_by_id(filter_ctx.column_id())
308 else {
309 continue;
310 };
311 let field_col = &input.fields()[field_index].data;
312 filter
313 .evaluate_vector(field_col)
314 .context(FilterRecordBatchSnafu)?
315 }
316 SemanticType::Timestamp => filter
317 .evaluate_vector(input.timestamps())
318 .context(FilterRecordBatchSnafu)?,
319 };
320
321 mask = mask.bitand(&result);
322 }
323
324 input.filter(&BooleanArray::from(mask).into())?;
325
326 Ok(Some(input))
327 }
328}