1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::RowGroupLastRowCachedReader;
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34 RowGroup(RowGroupReader),
35 LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39 async fn next_batch(&mut self) -> Result<Option<Batch>> {
40 match self {
41 Source::RowGroup(r) => r.next_batch().await,
42 Source::LastRow(r) => r.next_batch().await,
43 }
44 }
45}
46
47pub struct PruneReader {
48 context: FileRangeContextRef,
50 source: Source,
51 metrics: ReaderMetrics,
52}
53
54impl PruneReader {
55 pub(crate) fn new_with_row_group_reader(
56 ctx: FileRangeContextRef,
57 reader: RowGroupReader,
58 ) -> Self {
59 Self {
60 context: ctx,
61 source: Source::RowGroup(reader),
62 metrics: Default::default(),
63 }
64 }
65
66 pub(crate) fn new_with_last_row_reader(
67 ctx: FileRangeContextRef,
68 reader: RowGroupLastRowCachedReader,
69 ) -> Self {
70 Self {
71 context: ctx,
72 source: Source::LastRow(reader),
73 metrics: Default::default(),
74 }
75 }
76
77 pub(crate) fn reset_source(&mut self, source: Source) {
78 self.source = source;
79 }
80
81 pub(crate) fn metrics(&self) -> ReaderMetrics {
83 let mut metrics = self.metrics.clone();
84 match &self.source {
85 Source::RowGroup(r) => {
86 metrics.merge_from(r.metrics());
87 }
88 Source::LastRow(r) => {
89 if let Some(inner_metrics) = r.metrics() {
90 metrics.merge_from(inner_metrics);
91 }
92 }
93 }
94
95 metrics
96 }
97
98 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
99 while let Some(b) = self.source.next_batch().await? {
100 match self.prune(b)? {
101 Some(b) => {
102 return Ok(Some(b));
103 }
104 None => {
105 continue;
106 }
107 }
108 }
109 Ok(None)
110 }
111
112 fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
114 if self.context.filters().is_empty() {
116 return Ok(Some(batch));
117 }
118
119 let num_rows_before_filter = batch.num_rows();
120 let Some(batch_filtered) = self.context.precise_filter(batch)? else {
121 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
123 return Ok(None);
124 };
125
126 let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
128 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
129
130 if !batch_filtered.is_empty() {
131 Ok(Some(batch_filtered))
132 } else {
133 Ok(None)
134 }
135 }
136}
137
138pub(crate) struct PruneTimeIterator {
140 iter: BoxedBatchIterator,
141 time_range: FileTimeRange,
142 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
144}
145
146impl PruneTimeIterator {
147 pub(crate) fn new(
149 iter: BoxedBatchIterator,
150 time_range: FileTimeRange,
151 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
152 ) -> Self {
153 Self {
154 iter,
155 time_range,
156 time_filters,
157 }
158 }
159
160 fn prune(&self, batch: Batch) -> Result<Batch> {
162 if batch.is_empty() {
163 return Ok(batch);
164 }
165
166 if self.time_range.0 <= batch.first_timestamp().unwrap()
169 && batch.last_timestamp().unwrap() <= self.time_range.1
170 {
171 return self.prune_by_time_filters(batch, Vec::new());
172 }
173
174 let unit = batch
178 .timestamps()
179 .data_type()
180 .as_timestamp()
181 .unwrap()
182 .unit();
183 let mut mask = Vec::with_capacity(batch.timestamps().len());
184 let timestamps = batch.timestamps_native().unwrap();
185 for ts in timestamps {
186 let ts = Timestamp::new(*ts, unit);
187 if self.time_range.0 <= ts && ts <= self.time_range.1 {
188 mask.push(true);
189 } else {
190 mask.push(false);
191 }
192 }
193
194 self.prune_by_time_filters(batch, mask)
195 }
196
197 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
200 if let Some(filters) = &self.time_filters {
201 let mut mask = BooleanBuffer::new_set(batch.num_rows());
202 for filter in filters.iter() {
203 let result = filter
204 .evaluate_vector(batch.timestamps())
205 .context(RecordBatchSnafu)?;
206 mask = mask.bitand(&result);
207 }
208
209 if !existing_mask.is_empty() {
210 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
211 }
212
213 batch.filter(&BooleanArray::from(mask).into())?;
214 } else if !existing_mask.is_empty() {
215 batch.filter(&BooleanArray::from(existing_mask).into())?;
216 }
217
218 Ok(batch)
219 }
220
221 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
223 while let Some(batch) = self.iter.next() {
224 let batch = batch?;
225 let pruned_batch = self.prune(batch)?;
226 if !pruned_batch.is_empty() {
227 return Ok(Some(pruned_batch));
228 }
229 }
230 Ok(None)
231 }
232}
233
234impl Iterator for PruneTimeIterator {
235 type Item = Result<Batch>;
236
237 fn next(&mut self) -> Option<Self::Item> {
238 self.next_non_empty_batch().transpose()
239 }
240}
241
242pub enum FlatSource {
243 RowGroup(FlatRowGroupReader),
244}
245
246impl FlatSource {
247 fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
248 match self {
249 FlatSource::RowGroup(r) => r.next_batch(),
250 }
251 }
252}
253
254pub struct FlatPruneReader {
256 context: FileRangeContextRef,
258 source: FlatSource,
259 metrics: ReaderMetrics,
260}
261
262impl FlatPruneReader {
263 pub(crate) fn new_with_row_group_reader(
264 ctx: FileRangeContextRef,
265 reader: FlatRowGroupReader,
266 ) -> Self {
267 Self {
268 context: ctx,
269 source: FlatSource::RowGroup(reader),
270 metrics: Default::default(),
271 }
272 }
273
274 pub(crate) fn metrics(&self) -> ReaderMetrics {
276 self.metrics.clone()
278 }
279
280 pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
281 while let Some(record_batch) = {
282 let start = std::time::Instant::now();
283 let batch = self.source.next_batch()?;
284 self.metrics.scan_cost += start.elapsed();
285 batch
286 } {
287 self.metrics.num_rows += record_batch.num_rows();
289 self.metrics.num_batches += 1;
290
291 match self.prune_flat(record_batch)? {
292 Some(filtered_batch) => {
293 return Ok(Some(filtered_batch));
294 }
295 None => {
296 continue;
297 }
298 }
299 }
300
301 Ok(None)
302 }
303
304 fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
306 if self.context.filters().is_empty() {
308 return Ok(Some(record_batch));
309 }
310
311 let num_rows_before_filter = record_batch.num_rows();
312 let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else {
313 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
315 return Ok(None);
316 };
317
318 let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
320 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
321
322 if filtered_batch.num_rows() > 0 {
323 Ok(Some(filtered_batch))
324 } else {
325 Ok(None)
326 }
327 }
328}
329
330#[cfg(test)]
331mod tests {
332 use api::v1::OpType;
333 use datafusion_common::ScalarValue;
334 use datafusion_expr::{col, lit, Expr};
335
336 use super::*;
337 use crate::test_util::new_batch;
338
339 #[test]
340 fn test_prune_time_iter_empty() {
341 let input = [];
342 let iter = input.into_iter().map(Ok);
343 let iter = PruneTimeIterator::new(
344 Box::new(iter),
345 (
346 Timestamp::new_millisecond(0),
347 Timestamp::new_millisecond(1000),
348 ),
349 None,
350 );
351 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
352 assert!(actual.is_empty());
353 }
354
355 #[test]
356 fn test_prune_time_iter_filter() {
357 let input = [
358 new_batch(
359 b"k1",
360 &[10, 11],
361 &[20, 20],
362 &[OpType::Put, OpType::Put],
363 &[110, 111],
364 ),
365 new_batch(
366 b"k1",
367 &[15, 16],
368 &[20, 20],
369 &[OpType::Put, OpType::Put],
370 &[115, 116],
371 ),
372 new_batch(
373 b"k1",
374 &[17, 18],
375 &[20, 20],
376 &[OpType::Put, OpType::Put],
377 &[117, 118],
378 ),
379 ];
380
381 let iter = input.clone().into_iter().map(Ok);
382 let iter = PruneTimeIterator::new(
383 Box::new(iter),
384 (
385 Timestamp::new_millisecond(10),
386 Timestamp::new_millisecond(15),
387 ),
388 None,
389 );
390 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
391 assert_eq!(
392 actual,
393 [
394 new_batch(
395 b"k1",
396 &[10, 11],
397 &[20, 20],
398 &[OpType::Put, OpType::Put],
399 &[110, 111],
400 ),
401 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
402 ]
403 );
404
405 let iter = input.clone().into_iter().map(Ok);
406 let iter = PruneTimeIterator::new(
407 Box::new(iter),
408 (
409 Timestamp::new_millisecond(11),
410 Timestamp::new_millisecond(20),
411 ),
412 None,
413 );
414 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
415 assert_eq!(
416 actual,
417 [
418 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
419 new_batch(
420 b"k1",
421 &[15, 16],
422 &[20, 20],
423 &[OpType::Put, OpType::Put],
424 &[115, 116],
425 ),
426 new_batch(
427 b"k1",
428 &[17, 18],
429 &[20, 20],
430 &[OpType::Put, OpType::Put],
431 &[117, 118],
432 ),
433 ]
434 );
435
436 let iter = input.into_iter().map(Ok);
437 let iter = PruneTimeIterator::new(
438 Box::new(iter),
439 (
440 Timestamp::new_millisecond(10),
441 Timestamp::new_millisecond(18),
442 ),
443 None,
444 );
445 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
446 assert_eq!(
447 actual,
448 [
449 new_batch(
450 b"k1",
451 &[10, 11],
452 &[20, 20],
453 &[OpType::Put, OpType::Put],
454 &[110, 111],
455 ),
456 new_batch(
457 b"k1",
458 &[15, 16],
459 &[20, 20],
460 &[OpType::Put, OpType::Put],
461 &[115, 116],
462 ),
463 new_batch(
464 b"k1",
465 &[17, 18],
466 &[20, 20],
467 &[OpType::Put, OpType::Put],
468 &[117, 118],
469 ),
470 ]
471 );
472 }
473
474 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
475 let filters = expr
476 .iter()
477 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
478 .collect();
479 Some(Arc::new(filters))
480 }
481
482 #[test]
483 fn test_prune_time_iter_with_time_filters() {
484 let input = [
485 new_batch(
486 b"k1",
487 &[10, 11],
488 &[20, 20],
489 &[OpType::Put, OpType::Put],
490 &[110, 111],
491 ),
492 new_batch(
493 b"k1",
494 &[15, 16],
495 &[20, 20],
496 &[OpType::Put, OpType::Put],
497 &[115, 116],
498 ),
499 new_batch(
500 b"k1",
501 &[17, 18],
502 &[20, 20],
503 &[OpType::Put, OpType::Put],
504 &[117, 118],
505 ),
506 ];
507
508 let iter = input.clone().into_iter().map(Ok);
509 let time_filters = create_time_filters(&[
511 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
512 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
513 ]);
514 let iter = PruneTimeIterator::new(
515 Box::new(iter),
516 (
517 Timestamp::new_millisecond(10),
518 Timestamp::new_millisecond(20),
519 ),
520 time_filters,
521 );
522 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
523 assert_eq!(
524 actual,
525 [
526 new_batch(
527 b"k1",
528 &[10, 11],
529 &[20, 20],
530 &[OpType::Put, OpType::Put],
531 &[110, 111],
532 ),
533 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
534 ]
535 );
536 }
537
538 #[test]
539 fn test_prune_time_iter_in_range_with_time_filters() {
540 let input = [
541 new_batch(
542 b"k1",
543 &[10, 11],
544 &[20, 20],
545 &[OpType::Put, OpType::Put],
546 &[110, 111],
547 ),
548 new_batch(
549 b"k1",
550 &[15, 16],
551 &[20, 20],
552 &[OpType::Put, OpType::Put],
553 &[115, 116],
554 ),
555 new_batch(
556 b"k1",
557 &[17, 18],
558 &[20, 20],
559 &[OpType::Put, OpType::Put],
560 &[117, 118],
561 ),
562 ];
563
564 let iter = input.clone().into_iter().map(Ok);
565 let time_filters = create_time_filters(&[
567 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
568 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
569 ]);
570 let iter = PruneTimeIterator::new(
571 Box::new(iter),
572 (
573 Timestamp::new_millisecond(5),
574 Timestamp::new_millisecond(18),
575 ),
576 time_filters,
577 );
578 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
579 assert_eq!(
580 actual,
581 [
582 new_batch(
583 b"k1",
584 &[10, 11],
585 &[20, 20],
586 &[OpType::Put, OpType::Put],
587 &[110, 111],
588 ),
589 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
590 ]
591 );
592 }
593}