1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34 RowGroup(RowGroupReader),
35 LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39 async fn next_batch(&mut self) -> Result<Option<Batch>> {
40 match self {
41 Source::RowGroup(r) => r.next_batch().await,
42 Source::LastRow(r) => r.next_batch().await,
43 }
44 }
45}
46
47pub struct PruneReader {
48 context: FileRangeContextRef,
50 source: Source,
51 metrics: ReaderMetrics,
52 skip_fields: bool,
54}
55
56impl PruneReader {
57 pub(crate) fn new_with_row_group_reader(
58 ctx: FileRangeContextRef,
59 reader: RowGroupReader,
60 skip_fields: bool,
61 ) -> Self {
62 Self {
63 context: ctx,
64 source: Source::RowGroup(reader),
65 metrics: Default::default(),
66 skip_fields,
67 }
68 }
69
70 pub(crate) fn new_with_last_row_reader(
71 ctx: FileRangeContextRef,
72 reader: RowGroupLastRowCachedReader,
73 skip_fields: bool,
74 ) -> Self {
75 Self {
76 context: ctx,
77 source: Source::LastRow(reader),
78 metrics: Default::default(),
79 skip_fields,
80 }
81 }
82
83 pub(crate) fn metrics(&self) -> ReaderMetrics {
85 let mut metrics = self.metrics.clone();
86 match &self.source {
87 Source::RowGroup(r) => {
88 metrics.merge_from(r.metrics());
89 }
90 Source::LastRow(r) => {
91 if let Some(inner_metrics) = r.metrics() {
92 metrics.merge_from(inner_metrics);
93 }
94 }
95 }
96
97 metrics
98 }
99
100 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
101 while let Some(b) = self.source.next_batch().await? {
102 match self.prune(b)? {
103 Some(b) => {
104 return Ok(Some(b));
105 }
106 None => {
107 continue;
108 }
109 }
110 }
111 Ok(None)
112 }
113
114 fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
116 if self.context.filters().is_empty() && !self.context.has_partition_filter() {
118 return Ok(Some(batch));
119 }
120
121 let num_rows_before_filter = batch.num_rows();
122 let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
123 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
125 return Ok(None);
126 };
127
128 let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
130 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
131
132 if !batch_filtered.is_empty() {
133 Ok(Some(batch_filtered))
134 } else {
135 Ok(None)
136 }
137 }
138}
139
140pub(crate) struct PruneTimeIterator {
142 iter: BoxedBatchIterator,
143 time_range: FileTimeRange,
144 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
146}
147
148impl PruneTimeIterator {
149 pub(crate) fn new(
151 iter: BoxedBatchIterator,
152 time_range: FileTimeRange,
153 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
154 ) -> Self {
155 Self {
156 iter,
157 time_range,
158 time_filters,
159 }
160 }
161
162 fn prune(&self, batch: Batch) -> Result<Batch> {
164 if batch.is_empty() {
165 return Ok(batch);
166 }
167
168 if self.time_range.0 <= batch.first_timestamp().unwrap()
171 && batch.last_timestamp().unwrap() <= self.time_range.1
172 {
173 return self.prune_by_time_filters(batch, Vec::new());
174 }
175
176 let unit = batch
180 .timestamps()
181 .data_type()
182 .as_timestamp()
183 .unwrap()
184 .unit();
185 let mut mask = Vec::with_capacity(batch.timestamps().len());
186 let timestamps = batch.timestamps_native().unwrap();
187 for ts in timestamps {
188 let ts = Timestamp::new(*ts, unit);
189 if self.time_range.0 <= ts && ts <= self.time_range.1 {
190 mask.push(true);
191 } else {
192 mask.push(false);
193 }
194 }
195
196 self.prune_by_time_filters(batch, mask)
197 }
198
199 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
202 if let Some(filters) = &self.time_filters {
203 let mut mask = BooleanBuffer::new_set(batch.num_rows());
204 for filter in filters.iter() {
205 let result = filter
206 .evaluate_vector(batch.timestamps())
207 .context(RecordBatchSnafu)?;
208 mask = mask.bitand(&result);
209 }
210
211 if !existing_mask.is_empty() {
212 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
213 }
214
215 batch.filter(&BooleanArray::from(mask).into())?;
216 } else if !existing_mask.is_empty() {
217 batch.filter(&BooleanArray::from(existing_mask).into())?;
218 }
219
220 Ok(batch)
221 }
222
223 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
225 while let Some(batch) = self.iter.next() {
226 let batch = batch?;
227 let pruned_batch = self.prune(batch)?;
228 if !pruned_batch.is_empty() {
229 return Ok(Some(pruned_batch));
230 }
231 }
232 Ok(None)
233 }
234}
235
236impl Iterator for PruneTimeIterator {
237 type Item = Result<Batch>;
238
239 fn next(&mut self) -> Option<Self::Item> {
240 self.next_non_empty_batch().transpose()
241 }
242}
243
244pub enum FlatSource {
245 RowGroup(FlatRowGroupReader),
246 LastRow(FlatRowGroupLastRowCachedReader),
247}
248
249impl FlatSource {
250 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
251 match self {
252 FlatSource::RowGroup(r) => r.next_batch(),
253 FlatSource::LastRow(r) => r.next_batch(),
254 }
255 }
256}
257
258pub struct FlatPruneReader {
260 context: FileRangeContextRef,
262 source: FlatSource,
263 metrics: ReaderMetrics,
264 skip_fields: bool,
266}
267
268impl FlatPruneReader {
269 pub(crate) fn new_with_row_group_reader(
270 ctx: FileRangeContextRef,
271 reader: FlatRowGroupReader,
272 skip_fields: bool,
273 ) -> Self {
274 Self {
275 context: ctx,
276 source: FlatSource::RowGroup(reader),
277 metrics: Default::default(),
278 skip_fields,
279 }
280 }
281
282 pub(crate) fn new_with_last_row_reader(
283 ctx: FileRangeContextRef,
284 reader: FlatRowGroupLastRowCachedReader,
285 skip_fields: bool,
286 ) -> Self {
287 Self {
288 context: ctx,
289 source: FlatSource::LastRow(reader),
290 metrics: Default::default(),
291 skip_fields,
292 }
293 }
294
295 pub(crate) fn metrics(&self) -> ReaderMetrics {
297 self.metrics.clone()
298 }
299
300 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
301 while let Some(record_batch) = {
302 let start = std::time::Instant::now();
303 let batch = self.source.next_batch()?;
304 self.metrics.scan_cost += start.elapsed();
305 batch
306 } {
307 self.metrics.num_rows += record_batch.num_rows();
309 self.metrics.num_batches += 1;
310
311 match self.prune_flat(record_batch)? {
312 Some(filtered_batch) => {
313 return Ok(Some(filtered_batch));
314 }
315 None => {
316 continue;
317 }
318 }
319 }
320
321 Ok(None)
322 }
323
324 fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
326 if self.context.filters().is_empty() && !self.context.has_partition_filter() {
328 return Ok(Some(record_batch));
329 }
330
331 let num_rows_before_filter = record_batch.num_rows();
332 let Some(filtered_batch) = self
333 .context
334 .precise_filter_flat(record_batch, self.skip_fields)?
335 else {
336 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
338 return Ok(None);
339 };
340
341 let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
343 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
344
345 if filtered_batch.num_rows() > 0 {
346 Ok(Some(filtered_batch))
347 } else {
348 Ok(None)
349 }
350 }
351}
352
353#[cfg(test)]
354mod tests {
355 use api::v1::OpType;
356 use datafusion_common::ScalarValue;
357 use datafusion_expr::{Expr, col, lit};
358
359 use super::*;
360 use crate::test_util::new_batch;
361
362 #[test]
363 fn test_prune_time_iter_empty() {
364 let input = [];
365 let iter = input.into_iter().map(Ok);
366 let iter = PruneTimeIterator::new(
367 Box::new(iter),
368 (
369 Timestamp::new_millisecond(0),
370 Timestamp::new_millisecond(1000),
371 ),
372 None,
373 );
374 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
375 assert!(actual.is_empty());
376 }
377
378 #[test]
379 fn test_prune_time_iter_filter() {
380 let input = [
381 new_batch(
382 b"k1",
383 &[10, 11],
384 &[20, 20],
385 &[OpType::Put, OpType::Put],
386 &[110, 111],
387 ),
388 new_batch(
389 b"k1",
390 &[15, 16],
391 &[20, 20],
392 &[OpType::Put, OpType::Put],
393 &[115, 116],
394 ),
395 new_batch(
396 b"k1",
397 &[17, 18],
398 &[20, 20],
399 &[OpType::Put, OpType::Put],
400 &[117, 118],
401 ),
402 ];
403
404 let iter = input.clone().into_iter().map(Ok);
405 let iter = PruneTimeIterator::new(
406 Box::new(iter),
407 (
408 Timestamp::new_millisecond(10),
409 Timestamp::new_millisecond(15),
410 ),
411 None,
412 );
413 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
414 assert_eq!(
415 actual,
416 [
417 new_batch(
418 b"k1",
419 &[10, 11],
420 &[20, 20],
421 &[OpType::Put, OpType::Put],
422 &[110, 111],
423 ),
424 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
425 ]
426 );
427
428 let iter = input.clone().into_iter().map(Ok);
429 let iter = PruneTimeIterator::new(
430 Box::new(iter),
431 (
432 Timestamp::new_millisecond(11),
433 Timestamp::new_millisecond(20),
434 ),
435 None,
436 );
437 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
438 assert_eq!(
439 actual,
440 [
441 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
442 new_batch(
443 b"k1",
444 &[15, 16],
445 &[20, 20],
446 &[OpType::Put, OpType::Put],
447 &[115, 116],
448 ),
449 new_batch(
450 b"k1",
451 &[17, 18],
452 &[20, 20],
453 &[OpType::Put, OpType::Put],
454 &[117, 118],
455 ),
456 ]
457 );
458
459 let iter = input.into_iter().map(Ok);
460 let iter = PruneTimeIterator::new(
461 Box::new(iter),
462 (
463 Timestamp::new_millisecond(10),
464 Timestamp::new_millisecond(18),
465 ),
466 None,
467 );
468 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
469 assert_eq!(
470 actual,
471 [
472 new_batch(
473 b"k1",
474 &[10, 11],
475 &[20, 20],
476 &[OpType::Put, OpType::Put],
477 &[110, 111],
478 ),
479 new_batch(
480 b"k1",
481 &[15, 16],
482 &[20, 20],
483 &[OpType::Put, OpType::Put],
484 &[115, 116],
485 ),
486 new_batch(
487 b"k1",
488 &[17, 18],
489 &[20, 20],
490 &[OpType::Put, OpType::Put],
491 &[117, 118],
492 ),
493 ]
494 );
495 }
496
497 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
498 let filters = expr
499 .iter()
500 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
501 .collect();
502 Some(Arc::new(filters))
503 }
504
505 #[test]
506 fn test_prune_time_iter_with_time_filters() {
507 let input = [
508 new_batch(
509 b"k1",
510 &[10, 11],
511 &[20, 20],
512 &[OpType::Put, OpType::Put],
513 &[110, 111],
514 ),
515 new_batch(
516 b"k1",
517 &[15, 16],
518 &[20, 20],
519 &[OpType::Put, OpType::Put],
520 &[115, 116],
521 ),
522 new_batch(
523 b"k1",
524 &[17, 18],
525 &[20, 20],
526 &[OpType::Put, OpType::Put],
527 &[117, 118],
528 ),
529 ];
530
531 let iter = input.clone().into_iter().map(Ok);
532 let time_filters = create_time_filters(&[
534 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
535 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
536 ]);
537 let iter = PruneTimeIterator::new(
538 Box::new(iter),
539 (
540 Timestamp::new_millisecond(10),
541 Timestamp::new_millisecond(20),
542 ),
543 time_filters,
544 );
545 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
546 assert_eq!(
547 actual,
548 [
549 new_batch(
550 b"k1",
551 &[10, 11],
552 &[20, 20],
553 &[OpType::Put, OpType::Put],
554 &[110, 111],
555 ),
556 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
557 ]
558 );
559 }
560
561 #[test]
562 fn test_prune_time_iter_in_range_with_time_filters() {
563 let input = [
564 new_batch(
565 b"k1",
566 &[10, 11],
567 &[20, 20],
568 &[OpType::Put, OpType::Put],
569 &[110, 111],
570 ),
571 new_batch(
572 b"k1",
573 &[15, 16],
574 &[20, 20],
575 &[OpType::Put, OpType::Put],
576 &[115, 116],
577 ),
578 new_batch(
579 b"k1",
580 &[17, 18],
581 &[20, 20],
582 &[OpType::Put, OpType::Put],
583 &[117, 118],
584 ),
585 ];
586
587 let iter = input.clone().into_iter().map(Ok);
588 let time_filters = create_time_filters(&[
590 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
591 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
592 ]);
593 let iter = PruneTimeIterator::new(
594 Box::new(iter),
595 (
596 Timestamp::new_millisecond(5),
597 Timestamp::new_millisecond(18),
598 ),
599 time_filters,
600 );
601 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
602 assert_eq!(
603 actual,
604 [
605 new_batch(
606 b"k1",
607 &[10, 11],
608 &[20, 20],
609 &[OpType::Put, OpType::Put],
610 &[110, 111],
611 ),
612 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
613 ]
614 );
615 }
616}