1use std::ops::Range;
22use std::sync::Arc;
23
24use api::v1::SemanticType;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use datatypes::arrow::array::{BinaryArray, BooleanArray, BooleanBufferBuilder};
27use datatypes::arrow::record_batch::RecordBatch;
28use futures::StreamExt;
29use mito_codec::row_converter::{PrimaryKeyCodec, PrimaryKeyFilter};
30use parquet::arrow::ProjectionMask;
31use parquet::arrow::arrow_reader::RowSelection;
32use parquet::schema::types::SchemaDescriptor;
33use snafu::{OptionExt, ResultExt};
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35
36use crate::error::{ComputeArrowSnafu, DecodeSnafu, ReadParquetSnafu, Result, UnexpectedSnafu};
37use crate::sst::parquet::flat_format::primary_key_column_index;
38use crate::sst::parquet::format::{PrimaryKeyArray, ReadFormat};
39use crate::sst::parquet::reader::{RowGroupBuildContext, RowGroupReaderBuilder};
40use crate::sst::parquet::row_selection::row_selection_from_row_ranges_exact;
41
42pub(crate) fn matching_row_ranges_by_primary_key(
43 input: &RecordBatch,
44 pk_column_index: usize,
45 pk_filter: &mut dyn PrimaryKeyFilter,
46) -> Result<Vec<Range<usize>>> {
47 let pk_dict_array = input
48 .column(pk_column_index)
49 .as_any()
50 .downcast_ref::<PrimaryKeyArray>()
51 .context(UnexpectedSnafu {
52 reason: "Primary key column is not a dictionary array",
53 })?;
54 let pk_values = pk_dict_array
55 .values()
56 .as_any()
57 .downcast_ref::<BinaryArray>()
58 .context(UnexpectedSnafu {
59 reason: "Primary key values are not binary array",
60 })?;
61 let keys = pk_dict_array.keys();
62 let key_values = keys.values();
63
64 if key_values.is_empty() {
65 return Ok(std::iter::once(0..input.num_rows()).collect());
66 }
67
68 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
69 let mut start = 0;
70 while start < key_values.len() {
71 let key = key_values[start];
72 let mut end = start + 1;
73 while end < key_values.len() && key_values[end] == key {
74 end += 1;
75 }
76
77 if pk_filter
78 .matches(pk_values.value(key as usize))
79 .context(DecodeSnafu)?
80 {
81 if let Some(last) = matched_row_ranges.last_mut()
82 && last.end == start
83 {
84 last.end = end;
85 } else {
86 matched_row_ranges.push(start..end);
87 }
88 }
89
90 start = end;
91 }
92
93 Ok(matched_row_ranges)
94}
95
96pub(crate) fn prefilter_flat_batch_by_primary_key(
99 input: RecordBatch,
100 pk_column_index: usize,
101 pk_filter: &mut dyn PrimaryKeyFilter,
102) -> Result<Option<RecordBatch>> {
103 if input.num_rows() == 0 {
104 return Ok(Some(input));
105 }
106
107 let matched_row_ranges =
108 matching_row_ranges_by_primary_key(&input, pk_column_index, pk_filter)?;
109 if matched_row_ranges.is_empty() {
110 return Ok(None);
111 }
112
113 if matched_row_ranges.len() == 1
114 && matched_row_ranges[0].start == 0
115 && matched_row_ranges[0].end == input.num_rows()
116 {
117 return Ok(Some(input));
118 }
119
120 if matched_row_ranges.len() == 1 {
121 let span = &matched_row_ranges[0];
122 return Ok(Some(input.slice(span.start, span.end - span.start)));
123 }
124
125 let mut builder = BooleanBufferBuilder::new(input.num_rows());
126 builder.append_n(input.num_rows(), false);
127 for span in matched_row_ranges {
128 for i in span {
129 builder.set_bit(i, true);
130 }
131 }
132
133 let filtered = datatypes::arrow::compute::filter_record_batch(
134 &input,
135 &BooleanArray::new(builder.finish(), None),
136 )
137 .context(ComputeArrowSnafu)?;
138 if filtered.num_rows() == 0 {
139 Ok(None)
140 } else {
141 Ok(Some(filtered))
142 }
143}
144
145pub(crate) fn is_usable_primary_key_filter(
150 sst_metadata: &RegionMetadataRef,
151 expected_metadata: Option<&RegionMetadata>,
152 filter: &SimpleFilterEvaluator,
153) -> bool {
154 let sst_column = match expected_metadata {
155 Some(expected_metadata) => {
156 let Some(expected_column) = expected_metadata.column_by_name(filter.column_name())
157 else {
158 return false;
159 };
160 let Some(sst_column) = sst_metadata.column_by_id(expected_column.column_id) else {
161 return false;
162 };
163
164 if sst_column.column_schema.name != expected_column.column_schema.name
165 || sst_column.semantic_type != expected_column.semantic_type
166 || sst_column.column_schema.data_type != expected_column.column_schema.data_type
167 {
168 return false;
169 }
170
171 sst_column
172 }
173 None => {
174 let Some(sst_column) = sst_metadata.column_by_name(filter.column_name()) else {
175 return false;
176 };
177 sst_column
178 }
179 };
180
181 sst_column.semantic_type == SemanticType::Tag
182 && sst_metadata
183 .primary_key_index(sst_column.column_id)
184 .is_some()
185}
186
187pub(crate) struct CachedPrimaryKeyFilter {
188 inner: Box<dyn PrimaryKeyFilter>,
189 last_primary_key: Vec<u8>,
190 last_match: Option<bool>,
191}
192
193impl CachedPrimaryKeyFilter {
194 pub(crate) fn new(inner: Box<dyn PrimaryKeyFilter>) -> Self {
195 Self {
196 inner,
197 last_primary_key: Vec::new(),
198 last_match: None,
199 }
200 }
201}
202
203impl PrimaryKeyFilter for CachedPrimaryKeyFilter {
204 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
205 if let Some(last_match) = self.last_match
206 && self.last_primary_key == pk
207 {
208 return Ok(last_match);
209 }
210
211 let matched = self.inner.matches(pk)?;
212 self.last_primary_key.clear();
213 self.last_primary_key.extend_from_slice(pk);
214 self.last_match = Some(matched);
215 Ok(matched)
216 }
217}
218
219pub(crate) struct PrefilterContext {
224 pk_filter: Box<dyn PrimaryKeyFilter>,
226 pk_projection: ProjectionMask,
228 pk_column_index: usize,
231}
232
233pub(crate) struct PrefilterContextBuilder {
239 pk_projection: ProjectionMask,
240 pk_column_index: usize,
241 codec: Arc<dyn PrimaryKeyCodec>,
242 metadata: RegionMetadataRef,
243 pk_filters: Arc<Vec<SimpleFilterEvaluator>>,
244}
245
246impl PrefilterContextBuilder {
247 pub(crate) fn new(
254 read_format: &ReadFormat,
255 codec: &Arc<dyn PrimaryKeyCodec>,
256 primary_key_filters: Option<&Arc<Vec<SimpleFilterEvaluator>>>,
257 parquet_schema: &SchemaDescriptor,
258 ) -> Option<Self> {
259 let pk_filters = primary_key_filters?;
260 if pk_filters.is_empty() {
261 return None;
262 }
263
264 let metadata = read_format.metadata();
265 if metadata.primary_key.is_empty() {
266 return None;
267 }
268
269 let flat_format = read_format.as_flat()?;
271 if flat_format.batch_has_raw_pk_columns() {
272 return None;
273 }
274
275 let num_parquet_columns = parquet_schema.num_columns();
277 let pk_index = primary_key_column_index(num_parquet_columns);
278 let pk_projection = ProjectionMask::roots(parquet_schema, [pk_index]);
279
280 let pk_column_index = 0;
282
283 Some(Self {
284 pk_projection,
285 pk_column_index,
286 codec: Arc::clone(codec),
287 metadata: metadata.clone(),
288 pk_filters: Arc::clone(pk_filters),
289 })
290 }
291
292 pub(crate) fn build(&self) -> PrefilterContext {
294 let pk_filter =
297 self.codec
298 .primary_key_filter(&self.metadata, Arc::clone(&self.pk_filters), false);
299 let pk_filter = Box::new(CachedPrimaryKeyFilter::new(pk_filter));
300 PrefilterContext {
301 pk_filter,
302 pk_projection: self.pk_projection.clone(),
303 pk_column_index: self.pk_column_index,
304 }
305 }
306}
307
308pub(crate) struct PrefilterResult {
310 pub(crate) refined_selection: RowSelection,
312 pub(crate) filtered_rows: usize,
314}
315
316pub(crate) async fn execute_prefilter(
321 prefilter_ctx: &mut PrefilterContext,
322 reader_builder: &RowGroupReaderBuilder,
323 build_ctx: &RowGroupBuildContext<'_>,
324) -> Result<PrefilterResult> {
325 let mut pk_stream = reader_builder
327 .build_with_projection(
328 build_ctx.row_group_idx,
329 build_ctx.row_selection.clone(),
330 prefilter_ctx.pk_projection.clone(),
331 build_ctx.fetch_metrics,
332 )
333 .await?;
334
335 let mut matched_row_ranges: Vec<Range<usize>> = Vec::new();
337 let mut row_offset = 0;
338 let mut rows_before_filter = 0usize;
339
340 while let Some(batch_result) = pk_stream.next().await {
341 let batch = batch_result.context(ReadParquetSnafu {
342 path: reader_builder.file_path(),
343 })?;
344 let batch_num_rows = batch.num_rows();
345 if batch_num_rows == 0 {
346 continue;
347 }
348 rows_before_filter += batch_num_rows;
349
350 let ranges = matching_row_ranges_by_primary_key(
351 &batch,
352 prefilter_ctx.pk_column_index,
353 prefilter_ctx.pk_filter.as_mut(),
354 )?;
355 matched_row_ranges.extend(
356 ranges
357 .into_iter()
358 .map(|range| (range.start + row_offset)..(range.end + row_offset)),
359 );
360 row_offset += batch_num_rows;
361 }
362
363 let rows_selected: usize = matched_row_ranges.iter().map(|r| r.end - r.start).sum();
365 let filtered_rows = rows_before_filter.saturating_sub(rows_selected);
366
367 let refined_selection = if rows_selected == 0 {
368 RowSelection::from(vec![])
369 } else {
370 let prefilter_selection =
374 row_selection_from_row_ranges_exact(matched_row_ranges.into_iter(), rows_before_filter);
375
376 match &build_ctx.row_selection {
380 Some(original) => original.and_then(&prefilter_selection),
381 None => prefilter_selection,
382 }
383 };
384
385 Ok(PrefilterResult {
386 refined_selection,
387 filtered_rows,
388 })
389}
390
391#[cfg(test)]
392mod tests {
393 use std::sync::Arc;
394 use std::sync::atomic::{AtomicUsize, Ordering};
395
396 use common_recordbatch::filter::SimpleFilterEvaluator;
397 use datafusion_expr::{col, lit};
398 use datatypes::arrow::array::{
399 ArrayRef, DictionaryArray, TimestampMillisecondArray, UInt8Array, UInt32Array, UInt64Array,
400 };
401 use datatypes::arrow::datatypes::{Schema, UInt32Type};
402 use mito_codec::row_converter::{PrimaryKeyFilter, build_primary_key_codec};
403 use store_api::codec::PrimaryKeyEncoding;
404
405 use super::*;
406 use crate::sst::internal_fields;
407 use crate::sst::parquet::format::ReadFormat;
408 use crate::test_util::sst_util::{
409 new_primary_key, sst_region_metadata, sst_region_metadata_with_encoding,
410 };
411
412 #[test]
413 fn test_is_usable_primary_key_filter_skips_legacy_primary_key_batches() {
414 let metadata = Arc::new(sst_region_metadata_with_encoding(
415 PrimaryKeyEncoding::Sparse,
416 ));
417 let read_format = ReadFormat::new_flat(
418 metadata.clone(),
419 metadata.column_metadatas.iter().map(|c| c.column_id),
420 None,
421 "test",
422 true,
423 )
424 .unwrap();
425 assert!(read_format.as_flat().is_some());
426
427 let filter = SimpleFilterEvaluator::try_new(&col("tag_0").eq(lit("b"))).unwrap();
428 assert!(is_usable_primary_key_filter(&metadata, None, &filter));
429 }
430
431 #[test]
432 fn test_is_usable_primary_key_filter_supports_partition_column_by_default() {
433 let metadata = Arc::new(sst_region_metadata_with_encoding(
434 PrimaryKeyEncoding::Sparse,
435 ));
436 let filter = SimpleFilterEvaluator::try_new(
437 &col(store_api::metric_engine_consts::DATA_SCHEMA_TABLE_ID_COLUMN_NAME).eq(lit(1_u32)),
438 )
439 .unwrap();
440
441 assert!(is_usable_primary_key_filter(&metadata, None, &filter));
442 }
443
444 struct CountingPrimaryKeyFilter {
445 hits: Arc<AtomicUsize>,
446 expected: Vec<u8>,
447 }
448
449 impl PrimaryKeyFilter for CountingPrimaryKeyFilter {
450 fn matches(&mut self, pk: &[u8]) -> mito_codec::error::Result<bool> {
451 self.hits.fetch_add(1, Ordering::Relaxed);
452 Ok(pk == self.expected.as_slice())
453 }
454 }
455
456 #[test]
457 fn test_cached_primary_key_filter_reuses_previous_result() {
458 let expected = new_primary_key(&["a", "x"]);
459 let hits = Arc::new(AtomicUsize::new(0));
460 let mut filter = CachedPrimaryKeyFilter::new(Box::new(CountingPrimaryKeyFilter {
461 hits: Arc::clone(&hits),
462 expected: expected.clone(),
463 }));
464
465 assert!(filter.matches(expected.as_slice()).unwrap());
466 assert!(filter.matches(expected.as_slice()).unwrap());
467 assert!(
468 !filter
469 .matches(new_primary_key(&["b", "x"]).as_slice())
470 .unwrap()
471 );
472
473 assert_eq!(hits.load(Ordering::Relaxed), 2);
474 }
475
476 fn new_test_filters(exprs: &[datafusion_expr::Expr]) -> Vec<SimpleFilterEvaluator> {
477 exprs
478 .iter()
479 .filter_map(SimpleFilterEvaluator::try_new)
480 .collect()
481 }
482
483 fn new_raw_batch(primary_keys: &[&[u8]], field_values: &[u64]) -> RecordBatch {
484 assert_eq!(primary_keys.len(), field_values.len());
485
486 let metadata = Arc::new(sst_region_metadata());
487 let arrow_schema = metadata.schema.arrow_schema();
488 let field_column = arrow_schema
489 .field(arrow_schema.index_of("field_0").unwrap())
490 .clone();
491 let time_index_column = arrow_schema
492 .field(arrow_schema.index_of("ts").unwrap())
493 .clone();
494 let mut fields = vec![field_column, time_index_column];
495 fields.extend(
496 internal_fields()
497 .into_iter()
498 .map(|field| field.as_ref().clone()),
499 );
500 let schema = Arc::new(Schema::new(fields));
501
502 let mut dict_values = Vec::new();
503 let mut keys = Vec::with_capacity(primary_keys.len());
504 for pk in primary_keys {
505 let key = dict_values
506 .iter()
507 .position(|existing: &&[u8]| existing == pk)
508 .unwrap_or_else(|| {
509 dict_values.push(*pk);
510 dict_values.len() - 1
511 });
512 keys.push(key as u32);
513 }
514 let pk_array: ArrayRef = Arc::new(DictionaryArray::<UInt32Type>::new(
515 UInt32Array::from(keys),
516 Arc::new(BinaryArray::from_iter_values(dict_values.iter().copied())),
517 ));
518
519 RecordBatch::try_new(
520 schema,
521 vec![
522 Arc::new(UInt64Array::from(field_values.to_vec())),
523 Arc::new(TimestampMillisecondArray::from_iter_values(
524 0..primary_keys.len() as i64,
525 )),
526 pk_array,
527 Arc::new(UInt64Array::from(vec![1; primary_keys.len()])),
528 Arc::new(UInt8Array::from(vec![1; primary_keys.len()])),
529 ],
530 )
531 .unwrap()
532 }
533
534 fn field_values(batch: &RecordBatch) -> Vec<u64> {
535 batch
536 .column(0)
537 .as_any()
538 .downcast_ref::<UInt64Array>()
539 .unwrap()
540 .values()
541 .to_vec()
542 }
543
544 #[test]
545 fn test_prefilter_primary_key_drops_single_dictionary_batch() {
546 let metadata = Arc::new(sst_region_metadata());
547 let filters = Arc::new(new_test_filters(&[col("tag_0").eq(lit("b"))]));
548 let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
549 .primary_key_filter(&metadata, filters, false);
550 let pk_a = new_primary_key(&["a", "x"]);
551 let batch = new_raw_batch(&[pk_a.as_slice(), pk_a.as_slice()], &[10, 11]);
552 let pk_col_idx = primary_key_column_index(batch.num_columns());
553
554 let filtered =
555 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
556 .unwrap();
557
558 assert!(filtered.is_none());
559 }
560
561 #[test]
562 fn test_prefilter_primary_key_builds_mask_for_fragmented_matches() {
563 let metadata = Arc::new(sst_region_metadata());
564 let filters = Arc::new(new_test_filters(&[col("tag_0")
565 .eq(lit("a"))
566 .or(col("tag_0").eq(lit("c")))]));
567 let mut primary_key_filter = build_primary_key_codec(metadata.as_ref())
568 .primary_key_filter(&metadata, filters, false);
569 let pk_a = new_primary_key(&["a", "x"]);
570 let pk_b = new_primary_key(&["b", "x"]);
571 let pk_c = new_primary_key(&["c", "x"]);
572 let pk_d = new_primary_key(&["d", "x"]);
573 let batch = new_raw_batch(
574 &[
575 pk_a.as_slice(),
576 pk_a.as_slice(),
577 pk_b.as_slice(),
578 pk_b.as_slice(),
579 pk_c.as_slice(),
580 pk_c.as_slice(),
581 pk_d.as_slice(),
582 pk_d.as_slice(),
583 ],
584 &[10, 11, 12, 13, 14, 15, 16, 17],
585 );
586 let pk_col_idx = primary_key_column_index(batch.num_columns());
587
588 let filtered =
589 prefilter_flat_batch_by_primary_key(batch, pk_col_idx, primary_key_filter.as_mut())
590 .unwrap()
591 .unwrap();
592
593 assert_eq!(filtered.num_rows(), 4);
594 assert_eq!(field_values(&filtered), vec![10, 11, 14, 15]);
595 }
596}