1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::RowGroupLastRowCachedReader;
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34 RowGroup(RowGroupReader),
35 LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39 async fn next_batch(&mut self) -> Result<Option<Batch>> {
40 match self {
41 Source::RowGroup(r) => r.next_batch().await,
42 Source::LastRow(r) => r.next_batch().await,
43 }
44 }
45}
46
47pub struct PruneReader {
48 context: FileRangeContextRef,
50 source: Source,
51 metrics: ReaderMetrics,
52 skip_fields: bool,
54}
55
56impl PruneReader {
57 pub(crate) fn new_with_row_group_reader(
58 ctx: FileRangeContextRef,
59 reader: RowGroupReader,
60 skip_fields: bool,
61 ) -> Self {
62 Self {
63 context: ctx,
64 source: Source::RowGroup(reader),
65 metrics: Default::default(),
66 skip_fields,
67 }
68 }
69
70 pub(crate) fn new_with_last_row_reader(
71 ctx: FileRangeContextRef,
72 reader: RowGroupLastRowCachedReader,
73 skip_fields: bool,
74 ) -> Self {
75 Self {
76 context: ctx,
77 source: Source::LastRow(reader),
78 metrics: Default::default(),
79 skip_fields,
80 }
81 }
82
83 pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) {
84 self.source = source;
85 self.skip_fields = skip_fields;
86 }
87
88 pub(crate) fn metrics(&self) -> ReaderMetrics {
90 let mut metrics = self.metrics.clone();
91 match &self.source {
92 Source::RowGroup(r) => {
93 metrics.merge_from(r.metrics());
94 }
95 Source::LastRow(r) => {
96 if let Some(inner_metrics) = r.metrics() {
97 metrics.merge_from(inner_metrics);
98 }
99 }
100 }
101
102 metrics
103 }
104
105 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
106 while let Some(b) = self.source.next_batch().await? {
107 match self.prune(b)? {
108 Some(b) => {
109 return Ok(Some(b));
110 }
111 None => {
112 continue;
113 }
114 }
115 }
116 Ok(None)
117 }
118
119 fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
121 if self.context.filters().is_empty() {
123 return Ok(Some(batch));
124 }
125
126 let num_rows_before_filter = batch.num_rows();
127 let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
128 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
130 return Ok(None);
131 };
132
133 let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
135 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
136
137 if !batch_filtered.is_empty() {
138 Ok(Some(batch_filtered))
139 } else {
140 Ok(None)
141 }
142 }
143}
144
145pub(crate) struct PruneTimeIterator {
147 iter: BoxedBatchIterator,
148 time_range: FileTimeRange,
149 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
151}
152
153impl PruneTimeIterator {
154 pub(crate) fn new(
156 iter: BoxedBatchIterator,
157 time_range: FileTimeRange,
158 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
159 ) -> Self {
160 Self {
161 iter,
162 time_range,
163 time_filters,
164 }
165 }
166
167 fn prune(&self, batch: Batch) -> Result<Batch> {
169 if batch.is_empty() {
170 return Ok(batch);
171 }
172
173 if self.time_range.0 <= batch.first_timestamp().unwrap()
176 && batch.last_timestamp().unwrap() <= self.time_range.1
177 {
178 return self.prune_by_time_filters(batch, Vec::new());
179 }
180
181 let unit = batch
185 .timestamps()
186 .data_type()
187 .as_timestamp()
188 .unwrap()
189 .unit();
190 let mut mask = Vec::with_capacity(batch.timestamps().len());
191 let timestamps = batch.timestamps_native().unwrap();
192 for ts in timestamps {
193 let ts = Timestamp::new(*ts, unit);
194 if self.time_range.0 <= ts && ts <= self.time_range.1 {
195 mask.push(true);
196 } else {
197 mask.push(false);
198 }
199 }
200
201 self.prune_by_time_filters(batch, mask)
202 }
203
204 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
207 if let Some(filters) = &self.time_filters {
208 let mut mask = BooleanBuffer::new_set(batch.num_rows());
209 for filter in filters.iter() {
210 let result = filter
211 .evaluate_vector(batch.timestamps())
212 .context(RecordBatchSnafu)?;
213 mask = mask.bitand(&result);
214 }
215
216 if !existing_mask.is_empty() {
217 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
218 }
219
220 batch.filter(&BooleanArray::from(mask).into())?;
221 } else if !existing_mask.is_empty() {
222 batch.filter(&BooleanArray::from(existing_mask).into())?;
223 }
224
225 Ok(batch)
226 }
227
228 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
230 while let Some(batch) = self.iter.next() {
231 let batch = batch?;
232 let pruned_batch = self.prune(batch)?;
233 if !pruned_batch.is_empty() {
234 return Ok(Some(pruned_batch));
235 }
236 }
237 Ok(None)
238 }
239}
240
241impl Iterator for PruneTimeIterator {
242 type Item = Result<Batch>;
243
244 fn next(&mut self) -> Option<Self::Item> {
245 self.next_non_empty_batch().transpose()
246 }
247}
248
249pub enum FlatSource {
250 RowGroup(FlatRowGroupReader),
251}
252
253impl FlatSource {
254 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
255 match self {
256 FlatSource::RowGroup(r) => r.next_batch(),
257 }
258 }
259}
260
261pub struct FlatPruneReader {
263 context: FileRangeContextRef,
265 source: FlatSource,
266 metrics: ReaderMetrics,
267 skip_fields: bool,
269}
270
271impl FlatPruneReader {
272 pub(crate) fn new_with_row_group_reader(
273 ctx: FileRangeContextRef,
274 reader: FlatRowGroupReader,
275 skip_fields: bool,
276 ) -> Self {
277 Self {
278 context: ctx,
279 source: FlatSource::RowGroup(reader),
280 metrics: Default::default(),
281 skip_fields,
282 }
283 }
284
285 pub(crate) fn metrics(&self) -> ReaderMetrics {
287 self.metrics.clone()
289 }
290
291 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
292 while let Some(record_batch) = {
293 let start = std::time::Instant::now();
294 let batch = self.source.next_batch()?;
295 self.metrics.scan_cost += start.elapsed();
296 batch
297 } {
298 self.metrics.num_rows += record_batch.num_rows();
300 self.metrics.num_batches += 1;
301
302 match self.prune_flat(record_batch)? {
303 Some(filtered_batch) => {
304 return Ok(Some(filtered_batch));
305 }
306 None => {
307 continue;
308 }
309 }
310 }
311
312 Ok(None)
313 }
314
315 fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
317 if self.context.filters().is_empty() {
319 return Ok(Some(record_batch));
320 }
321
322 let num_rows_before_filter = record_batch.num_rows();
323 let Some(filtered_batch) = self
324 .context
325 .precise_filter_flat(record_batch, self.skip_fields)?
326 else {
327 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
329 return Ok(None);
330 };
331
332 let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
334 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
335
336 if filtered_batch.num_rows() > 0 {
337 Ok(Some(filtered_batch))
338 } else {
339 Ok(None)
340 }
341 }
342}
343
344#[cfg(test)]
345mod tests {
346 use api::v1::OpType;
347 use datafusion_common::ScalarValue;
348 use datafusion_expr::{Expr, col, lit};
349
350 use super::*;
351 use crate::test_util::new_batch;
352
353 #[test]
354 fn test_prune_time_iter_empty() {
355 let input = [];
356 let iter = input.into_iter().map(Ok);
357 let iter = PruneTimeIterator::new(
358 Box::new(iter),
359 (
360 Timestamp::new_millisecond(0),
361 Timestamp::new_millisecond(1000),
362 ),
363 None,
364 );
365 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
366 assert!(actual.is_empty());
367 }
368
369 #[test]
370 fn test_prune_time_iter_filter() {
371 let input = [
372 new_batch(
373 b"k1",
374 &[10, 11],
375 &[20, 20],
376 &[OpType::Put, OpType::Put],
377 &[110, 111],
378 ),
379 new_batch(
380 b"k1",
381 &[15, 16],
382 &[20, 20],
383 &[OpType::Put, OpType::Put],
384 &[115, 116],
385 ),
386 new_batch(
387 b"k1",
388 &[17, 18],
389 &[20, 20],
390 &[OpType::Put, OpType::Put],
391 &[117, 118],
392 ),
393 ];
394
395 let iter = input.clone().into_iter().map(Ok);
396 let iter = PruneTimeIterator::new(
397 Box::new(iter),
398 (
399 Timestamp::new_millisecond(10),
400 Timestamp::new_millisecond(15),
401 ),
402 None,
403 );
404 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
405 assert_eq!(
406 actual,
407 [
408 new_batch(
409 b"k1",
410 &[10, 11],
411 &[20, 20],
412 &[OpType::Put, OpType::Put],
413 &[110, 111],
414 ),
415 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
416 ]
417 );
418
419 let iter = input.clone().into_iter().map(Ok);
420 let iter = PruneTimeIterator::new(
421 Box::new(iter),
422 (
423 Timestamp::new_millisecond(11),
424 Timestamp::new_millisecond(20),
425 ),
426 None,
427 );
428 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
429 assert_eq!(
430 actual,
431 [
432 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
433 new_batch(
434 b"k1",
435 &[15, 16],
436 &[20, 20],
437 &[OpType::Put, OpType::Put],
438 &[115, 116],
439 ),
440 new_batch(
441 b"k1",
442 &[17, 18],
443 &[20, 20],
444 &[OpType::Put, OpType::Put],
445 &[117, 118],
446 ),
447 ]
448 );
449
450 let iter = input.into_iter().map(Ok);
451 let iter = PruneTimeIterator::new(
452 Box::new(iter),
453 (
454 Timestamp::new_millisecond(10),
455 Timestamp::new_millisecond(18),
456 ),
457 None,
458 );
459 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
460 assert_eq!(
461 actual,
462 [
463 new_batch(
464 b"k1",
465 &[10, 11],
466 &[20, 20],
467 &[OpType::Put, OpType::Put],
468 &[110, 111],
469 ),
470 new_batch(
471 b"k1",
472 &[15, 16],
473 &[20, 20],
474 &[OpType::Put, OpType::Put],
475 &[115, 116],
476 ),
477 new_batch(
478 b"k1",
479 &[17, 18],
480 &[20, 20],
481 &[OpType::Put, OpType::Put],
482 &[117, 118],
483 ),
484 ]
485 );
486 }
487
488 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
489 let filters = expr
490 .iter()
491 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
492 .collect();
493 Some(Arc::new(filters))
494 }
495
496 #[test]
497 fn test_prune_time_iter_with_time_filters() {
498 let input = [
499 new_batch(
500 b"k1",
501 &[10, 11],
502 &[20, 20],
503 &[OpType::Put, OpType::Put],
504 &[110, 111],
505 ),
506 new_batch(
507 b"k1",
508 &[15, 16],
509 &[20, 20],
510 &[OpType::Put, OpType::Put],
511 &[115, 116],
512 ),
513 new_batch(
514 b"k1",
515 &[17, 18],
516 &[20, 20],
517 &[OpType::Put, OpType::Put],
518 &[117, 118],
519 ),
520 ];
521
522 let iter = input.clone().into_iter().map(Ok);
523 let time_filters = create_time_filters(&[
525 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
526 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
527 ]);
528 let iter = PruneTimeIterator::new(
529 Box::new(iter),
530 (
531 Timestamp::new_millisecond(10),
532 Timestamp::new_millisecond(20),
533 ),
534 time_filters,
535 );
536 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
537 assert_eq!(
538 actual,
539 [
540 new_batch(
541 b"k1",
542 &[10, 11],
543 &[20, 20],
544 &[OpType::Put, OpType::Put],
545 &[110, 111],
546 ),
547 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
548 ]
549 );
550 }
551
552 #[test]
553 fn test_prune_time_iter_in_range_with_time_filters() {
554 let input = [
555 new_batch(
556 b"k1",
557 &[10, 11],
558 &[20, 20],
559 &[OpType::Put, OpType::Put],
560 &[110, 111],
561 ),
562 new_batch(
563 b"k1",
564 &[15, 16],
565 &[20, 20],
566 &[OpType::Put, OpType::Put],
567 &[115, 116],
568 ),
569 new_batch(
570 b"k1",
571 &[17, 18],
572 &[20, 20],
573 &[OpType::Put, OpType::Put],
574 &[117, 118],
575 ),
576 ];
577
578 let iter = input.clone().into_iter().map(Ok);
579 let time_filters = create_time_filters(&[
581 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
582 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
583 ]);
584 let iter = PruneTimeIterator::new(
585 Box::new(iter),
586 (
587 Timestamp::new_millisecond(5),
588 Timestamp::new_millisecond(18),
589 ),
590 time_filters,
591 );
592 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
593 assert_eq!(
594 actual,
595 [
596 new_batch(
597 b"k1",
598 &[10, 11],
599 &[20, 20],
600 &[OpType::Put, OpType::Put],
601 &[110, 111],
602 ),
603 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
604 ]
605 );
606 }
607}