1use std::ops::Range;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use datatypes::arrow::array::BinaryArray;
27use datatypes::arrow::record_batch::RecordBatch;
28use futures::StreamExt;
29use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
30use parquet::arrow::ProjectionMask;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::schema::types::SchemaDescriptor;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35
36use crate::error::{DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
37use crate::sst::parquet::flat_format::primary_key_column_index;
38use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
39use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
40use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
41
42pub(crate) fn matching_row_ranges_by_primary_key(
43 input: &RecordBatch,
44 pk_column_index: usize,
45 pk_filter: &mut dyn PrimaryKeyFilter,
46) -> Result<Vec<Range<usize>>> {
47 let pk_dict_array = input
48 .column(pk_column_index)
49 .as_any()
50 .downcast_ref::<PrimaryKeyArray>()
51 .context(UnexpectedSnafu {
52 reason: "Primary key column is not a dictionary array",
53 })?;
54 let pk_values = pk_dict_array
55 .values()
56 .as_any()
57 .downcast_ref::<BinaryArray>()
58 .context(UnexpectedSnafu {
59 reason: "Primary key values are not binary array",
60 })?;
61 let keys = pk_dict_array.keys();
62 let key_values = keys.values();
63
64 if key_values.is_empty() {
65 return Ok(std::iter::once(0..input.num_rows()).collect());
66 }
67
68 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
69 let mut start = 0;
70 while start < key_values.len() {
71 let key = key_values[start];
72 let mut end = start + 1;
73 while end < key_values.len() && key_values[end] == key {
74 end += 1;
75 }
76
77 if pk_filter
78 .matches(pk_values.value(key as usize))
79 .context(DecodeSnafu)?
80 {
81 if let Some(last) = matched_row_ranges.last_mut()
82 && last.end == start
83 {
84 last.end = end;
85 } else {
86 matched_row_ranges.push(start..end);
87 }
88 }
89
90 start = end;
91 }
92
93 Ok(matched_row_ranges)
94}
95
96pub(crate) fn is_usable_primary_key_filter(
101 sst_metadata: &RegionMetadataRef,
102 expected_metadata: Option<&RegionMetadata>,
103 filter: &SimpleFilterEvaluator,
104) -> bool {
105 let sst_column = match expected_metadata {
106 Some(expected_metadata) => {
107 let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
108 else {
109 return false;
110 };
111 let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
112 return false;
113 };
114
115 if sst_column.column_schema.name != expected_column.column_schema.name
116 || sst_column.semantic_type != expected_column.semantic_type
117 || sst_column.column_schema.data_type != expected_column.column_schema.data_type
118 {
119 return false;
120 }
121
122 sst_column
123 }
124 None => {
125 let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
126 return false;
127 };
128 sst_column
129 }
130 };
131
132 sst_column.semantic_type == SemanticType::Tag
133 && sst_metadata
134 .primary_key_index(sst_column.column_id)
135 .is_some()
136}
137
138pub(crate) struct CachedPrimaryKeyFilter {
139 inner: Box<dyn PrimaryKeyFilter>,
140 last_primary_key: Vec<u8>,
141 last_match: Option<bool>,
142}
143
144impl CachedPrimaryKeyFilter {
145 pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
146 Self {
147 inner,
148 last_primary_key: Vec::new(),
149 last_match: None,
150 }
151 }
152}
153
154impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
155 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
156 if let Some(last_match) = self.last_match
157 && self.last_primary_key == pk
158 {
159 return Ok(last_match);
160 }
161
162 let matched = self.inner.matches(pk)?;
163 self.last_primary_key.clear();
164 self.last_primary_key.extend_from_slice(pk);
165 self.last_match = Some(matched);
166 Ok(matched)
167 }
168}
169
170pub(crate) struct PrefilterContext {
175 pk_filter: Box<dyn PrimaryKeyFilter>,
177 pk_projection: ProjectionMask,
179 pk_column_index: usize,
182}
183
184pub(crate) struct PrefilterContextBuilder {
190 pk_projection: ProjectionMask,
191 pk_column_index: usize,
192 codec: Arc<dyn PrimaryKeyCodec>,
193 metadata: RegionMetadataRef,
194 pk_filters: Arc<Vec<SimpleFilterEvaluator>>,
195}
196
197impl PrefilterContextBuilder {
198 pub(crate) fn new(
205 read_format: &ReadFormat,
206 codec: &Arc<dyn PrimaryKeyCodec>,
207 primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
208 parquet_schema: &SchemaDescriptor,
209 ) -> Option<Self> {
210 let pk_filters = primary_key_filters?;
211 if pk_filters.is_empty() {
212 return None;
213 }
214
215 let metadata = read_format.metadata();
216 if metadata.primary_key.is_empty() {
217 return None;
218 }
219
220 let flat_format = read_format.as_flat()?;
222 if !flat_format.raw_batch_has_primary_key_dictionary() {
223 return None;
224 }
225
226 let num_parquet_columns = parquet_schema.num_columns();
228 let pk_index = primary_key_column_index(num_parquet_columns);
229 let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]);
230
231 let pk_column_index = 0;
233
234 Some(Self {
235 pk_projection,
236 pk_column_index,
237 codec: Arc::clone(codec),
238 metadata: metadata.clone(),
239 pk_filters: Arc::clone(pk_filters),
240 })
241 }
242
243 pub(crate) fn build(&self) -> PrefilterContext {
245 let pk_filter =
248 self.codec
249 .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false);
250 let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter));
251 PrefilterContext {
252 pk_filter,
253 pk_projection: self.pk_projection.clone(),
254 pk_column_index: self.pk_column_index,
255 }
256 }
257}
258
259pub(crate) struct PrefilterResult {
261 pub(crate) refined_selection: RowSelection,
263 pub(crate) filtered_rows: usize,
265}
266
267pub(crate) async fn execute_prefilter(
272 prefilter_ctx: &mut PrefilterContext,
273 reader_builder: &RowGroupReaderBuilder,
274 build_ctx: &RowGroupBuildContext<'_>,
275) -> Result<PrefilterResult> {
276 let mut pk_stream = reader_builder
278 .build_with_projection(
279 build_ctx.row_group_idx,
280 build_ctx.row_selection.clone(),
281 prefilter_ctx.pk_projection.clone(),
282 build_ctx.fetch_metrics,
283 )
284 .await?;
285
286 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
288 let mut row_offset = 0;
289 let mut rows_before_filter = 0usize;
290
291 while let Some(batch_result) = pk_stream.next().await {
292 let batch = batch_result.context(ReadParquetSnafu {
293 path: reader_builder.file_path(),
294 })?;
295 let batch_num_rows = batch.num_rows();
296 if batch_num_rows == 0 {
297 continue;
298 }
299 rows_before_filter += batch_num_rows;
300
301 let ranges = matching_row_ranges_by_primary_key(
302 &batch,
303 prefilter_ctx.pk_column_index,
304 prefilter_ctx.pk_filter.as_mut(),
305 )?;
306 matched_row_ranges.extend(
307 ranges
308 .into_iter()
309 .map(|range| (range.start + row_offset)..(range.end + row_offset)),
310 );
311 row_offset += batch_num_rows;
312 }
313
314 let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum();
316 let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
317
318 let refined_selection = if rows_selected == 0 {
319 RowSelection::from(vec![])
320 } else {
321 let prefilter_selection =
325 row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter);
326
327 match &build_ctx.row_selection {
331 Some(original) => original.and_then(&prefilter_selection),
332 None => prefilter_selection,
333 }
334 };
335
336 Ok(PrefilterResult {
337 refined_selection,
338 filtered_rows,
339 })
340}
341
342#[cfg(test)]
343mod tests {
344 use std::sync::Arc;
345 use std::sync::atomic::{AtomicUsize, Ordering};
346
347 use common_recordbatch::filter::SimpleFilterEvaluator;
348 use datafusion_expr::{col, lit};
349 use mito_codec::row_converter::PrimaryKeyFilter;
350 use store_api::codec::PrimaryKeyEncoding;
351
352 use super::*;
353 use crate::sst::parquet::format::ReadFormat;
354 use crate::test_util::sst_util::{new_primary_key, sst_region_metadata_with_encoding};
355
356 #[test]
357 fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
358 let metadata = Arc::new(sst_region_metadata_with_encoding(
359 PrimaryKeyEncoding::Sparse,
360 ));
361 let read_format = ReadFormat::new_flat(
362 metadata.clone(),
363 metadata.column_metadatas.iter().map(|c| c.column_id),
364 None,
365 "test",
366 true,
367 )
368 .unwrap();
369 assert!(read_format.as_flat().is_some());
370
371 let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap();
372 assert!(is_usable_primary_key_filter(&metadata, None, &filter));
373 }
374
375 #[test]
376 fn test_is_usable_primary_key_filter_supports_partition_column_by_default() {
377 let metadata = Arc::new(sst_region_metadata_with_encoding(
378 PrimaryKeyEncoding::Sparse,
379 ));
380 let filter = SimpleFilterEvaluator::try_new(
381 &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)),
382 )
383 .unwrap();
384
385 assert!(is_usable_primary_key_filter(&metadata, None, &filter));
386 }
387
388 struct CountingPrimaryKeyFilter {
389 hits: Arc<AtomicUsize>,
390 expected: Vec<u8>,
391 }
392
393 impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
394 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
395 self.hits.fetch_add(1, Ordering::Relaxed);
396 Ok(pk == self.expected.as_slice())
397 }
398 }
399
400 #[test]
401 fn test_cached_primary_key_filter_reuses_previous_result() {
402 let expected = new_primary_key(&["a", "x"]);
403 let hits = Arc::new(AtomicUsize::new(0));
404 let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
405 hits: Arc::clone(&hits),
406 expected: expected.clone(),
407 }));
408
409 assert!(filter.matches(expected.as_slice()).unwrap());
410 assert!(filter.matches(expected.as_slice()).unwrap());
411 assert!(
412 !filter
413 .matches(new_primary_key(&["b", "x"]).as_slice())
414 .unwrap()
415 );
416
417 assert_eq!(hits.load(Ordering::Relaxed), 2);
418 }
419}