mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::RowGroupLastRowCachedReader;
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34    RowGroup(RowGroupReader),
35    LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39    async fn next_batch(&mut self) -> Result<Option<Batch>> {
40        match self {
41            Source::RowGroup(r) => r.next_batch().await,
42            Source::LastRow(r) => r.next_batch().await,
43        }
44    }
45}
46
47pub struct PruneReader {
48    /// Context for file ranges.
49    context: FileRangeContextRef,
50    source: Source,
51    metrics: ReaderMetrics,
52    /// Whether to skip field filters for this row group.
53    skip_fields: bool,
54}
55
56impl PruneReader {
57    pub(crate) fn new_with_row_group_reader(
58        ctx: FileRangeContextRef,
59        reader: RowGroupReader,
60        skip_fields: bool,
61    ) -> Self {
62        Self {
63            context: ctx,
64            source: Source::RowGroup(reader),
65            metrics: Default::default(),
66            skip_fields,
67        }
68    }
69
70    pub(crate) fn new_with_last_row_reader(
71        ctx: FileRangeContextRef,
72        reader: RowGroupLastRowCachedReader,
73        skip_fields: bool,
74    ) -> Self {
75        Self {
76            context: ctx,
77            source: Source::LastRow(reader),
78            metrics: Default::default(),
79            skip_fields,
80        }
81    }
82
83    pub(crate) fn reset_source(&mut self, source: Source, skip_fields: bool) {
84        self.source = source;
85        self.skip_fields = skip_fields;
86    }
87
88    /// Merge metrics with the inner reader and return the merged metrics.
89    pub(crate) fn metrics(&self) -> ReaderMetrics {
90        let mut metrics = self.metrics.clone();
91        match &self.source {
92            Source::RowGroup(r) => {
93                metrics.merge_from(r.metrics());
94            }
95            Source::LastRow(r) => {
96                if let Some(inner_metrics) = r.metrics() {
97                    metrics.merge_from(inner_metrics);
98                }
99            }
100        }
101
102        metrics
103    }
104
105    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
106        while let Some(b) = self.source.next_batch().await? {
107            match self.prune(b)? {
108                Some(b) => {
109                    return Ok(Some(b));
110                }
111                None => {
112                    continue;
113                }
114            }
115        }
116        Ok(None)
117    }
118
119    /// Prunes batches by the pushed down predicate.
120    fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
121        // fast path
122        if self.context.filters().is_empty() {
123            return Ok(Some(batch));
124        }
125
126        let num_rows_before_filter = batch.num_rows();
127        let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
128            // the entire batch is filtered out
129            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
130            return Ok(None);
131        };
132
133        // update metric
134        let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
135        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
136
137        if !batch_filtered.is_empty() {
138            Ok(Some(batch_filtered))
139        } else {
140            Ok(None)
141        }
142    }
143}
144
145/// An iterator that prunes batches by time range.
146pub(crate) struct PruneTimeIterator {
147    iter: BoxedBatchIterator,
148    time_range: FileTimeRange,
149    /// Precise time filters.
150    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
151}
152
153impl PruneTimeIterator {
154    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
155    pub(crate) fn new(
156        iter: BoxedBatchIterator,
157        time_range: FileTimeRange,
158        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
159    ) -> Self {
160        Self {
161            iter,
162            time_range,
163            time_filters,
164        }
165    }
166
167    /// Prune batch by time range.
168    fn prune(&self, batch: Batch) -> Result<Batch> {
169        if batch.is_empty() {
170            return Ok(batch);
171        }
172
173        // fast path, the batch is within the time range.
174        // Note that the time range is inclusive.
175        if self.time_range.0 <= batch.first_timestamp().unwrap()
176            && batch.last_timestamp().unwrap() <= self.time_range.1
177        {
178            return self.prune_by_time_filters(batch, Vec::new());
179        }
180
181        // slow path, prune the batch by time range.
182        // Note that the timestamp precision may be different from the time range.
183        // Safety: We know this is the timestamp type.
184        let unit = batch
185            .timestamps()
186            .data_type()
187            .as_timestamp()
188            .unwrap()
189            .unit();
190        let mut mask = Vec::with_capacity(batch.timestamps().len());
191        let timestamps = batch.timestamps_native().unwrap();
192        for ts in timestamps {
193            let ts = Timestamp::new(*ts, unit);
194            if self.time_range.0 <= ts && ts <= self.time_range.1 {
195                mask.push(true);
196            } else {
197                mask.push(false);
198            }
199        }
200
201        self.prune_by_time_filters(batch, mask)
202    }
203
204    /// Prunes the batch by time filters.
205    /// Also applies existing mask to the batch if the mask is not empty.
206    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
207        if let Some(filters) = &self.time_filters {
208            let mut mask = BooleanBuffer::new_set(batch.num_rows());
209            for filter in filters.iter() {
210                let result = filter
211                    .evaluate_vector(batch.timestamps())
212                    .context(RecordBatchSnafu)?;
213                mask = mask.bitand(&result);
214            }
215
216            if !existing_mask.is_empty() {
217                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
218            }
219
220            batch.filter(&BooleanArray::from(mask).into())?;
221        } else if !existing_mask.is_empty() {
222            batch.filter(&BooleanArray::from(existing_mask).into())?;
223        }
224
225        Ok(batch)
226    }
227
228    // Prune and return the next non-empty batch.
229    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
230        while let Some(batch) = self.iter.next() {
231            let batch = batch?;
232            let pruned_batch = self.prune(batch)?;
233            if !pruned_batch.is_empty() {
234                return Ok(Some(pruned_batch));
235            }
236        }
237        Ok(None)
238    }
239}
240
241impl Iterator for PruneTimeIterator {
242    type Item = Result<Batch>;
243
244    fn next(&mut self) -> Option<Self::Item> {
245        self.next_non_empty_batch().transpose()
246    }
247}
248
249pub enum FlatSource {
250    RowGroup(FlatRowGroupReader),
251}
252
253impl FlatSource {
254    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
255        match self {
256            FlatSource::RowGroup(r) => r.next_batch(),
257        }
258    }
259}
260
261/// A flat format reader that returns RecordBatch instead of Batch.
262pub struct FlatPruneReader {
263    /// Context for file ranges.
264    context: FileRangeContextRef,
265    source: FlatSource,
266    metrics: ReaderMetrics,
267    /// Whether to skip field filters for this row group.
268    skip_fields: bool,
269}
270
271impl FlatPruneReader {
272    pub(crate) fn new_with_row_group_reader(
273        ctx: FileRangeContextRef,
274        reader: FlatRowGroupReader,
275        skip_fields: bool,
276    ) -> Self {
277        Self {
278            context: ctx,
279            source: FlatSource::RowGroup(reader),
280            metrics: Default::default(),
281            skip_fields,
282        }
283    }
284
285    /// Merge metrics with the inner reader and return the merged metrics.
286    pub(crate) fn metrics(&self) -> ReaderMetrics {
287        // FlatRowGroupReader doesn't collect metrics, so just return our own
288        self.metrics.clone()
289    }
290
291    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
292        while let Some(record_batch) = {
293            let start = std::time::Instant::now();
294            let batch = self.source.next_batch()?;
295            self.metrics.scan_cost += start.elapsed();
296            batch
297        } {
298            // Update metrics for the received batch
299            self.metrics.num_rows += record_batch.num_rows();
300            self.metrics.num_batches += 1;
301
302            match self.prune_flat(record_batch)? {
303                Some(filtered_batch) => {
304                    return Ok(Some(filtered_batch));
305                }
306                None => {
307                    continue;
308                }
309            }
310        }
311
312        Ok(None)
313    }
314
315    /// Prunes batches by the pushed down predicate and returns RecordBatch.
316    fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
317        // fast path
318        if self.context.filters().is_empty() {
319            return Ok(Some(record_batch));
320        }
321
322        let num_rows_before_filter = record_batch.num_rows();
323        let Some(filtered_batch) = self
324            .context
325            .precise_filter_flat(record_batch, self.skip_fields)?
326        else {
327            // the entire batch is filtered out
328            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
329            return Ok(None);
330        };
331
332        // update metric
333        let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
334        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
335
336        if filtered_batch.num_rows() > 0 {
337            Ok(Some(filtered_batch))
338        } else {
339            Ok(None)
340        }
341    }
342}
343
344#[cfg(test)]
345mod tests {
346    use api::v1::OpType;
347    use datafusion_common::ScalarValue;
348    use datafusion_expr::{Expr, col, lit};
349
350    use super::*;
351    use crate::test_util::new_batch;
352
353    #[test]
354    fn test_prune_time_iter_empty() {
355        let input = [];
356        let iter = input.into_iter().map(Ok);
357        let iter = PruneTimeIterator::new(
358            Box::new(iter),
359            (
360                Timestamp::new_millisecond(0),
361                Timestamp::new_millisecond(1000),
362            ),
363            None,
364        );
365        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
366        assert!(actual.is_empty());
367    }
368
369    #[test]
370    fn test_prune_time_iter_filter() {
371        let input = [
372            new_batch(
373                b"k1",
374                &[10, 11],
375                &[20, 20],
376                &[OpType::Put, OpType::Put],
377                &[110, 111],
378            ),
379            new_batch(
380                b"k1",
381                &[15, 16],
382                &[20, 20],
383                &[OpType::Put, OpType::Put],
384                &[115, 116],
385            ),
386            new_batch(
387                b"k1",
388                &[17, 18],
389                &[20, 20],
390                &[OpType::Put, OpType::Put],
391                &[117, 118],
392            ),
393        ];
394
395        let iter = input.clone().into_iter().map(Ok);
396        let iter = PruneTimeIterator::new(
397            Box::new(iter),
398            (
399                Timestamp::new_millisecond(10),
400                Timestamp::new_millisecond(15),
401            ),
402            None,
403        );
404        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
405        assert_eq!(
406            actual,
407            [
408                new_batch(
409                    b"k1",
410                    &[10, 11],
411                    &[20, 20],
412                    &[OpType::Put, OpType::Put],
413                    &[110, 111],
414                ),
415                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
416            ]
417        );
418
419        let iter = input.clone().into_iter().map(Ok);
420        let iter = PruneTimeIterator::new(
421            Box::new(iter),
422            (
423                Timestamp::new_millisecond(11),
424                Timestamp::new_millisecond(20),
425            ),
426            None,
427        );
428        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
429        assert_eq!(
430            actual,
431            [
432                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
433                new_batch(
434                    b"k1",
435                    &[15, 16],
436                    &[20, 20],
437                    &[OpType::Put, OpType::Put],
438                    &[115, 116],
439                ),
440                new_batch(
441                    b"k1",
442                    &[17, 18],
443                    &[20, 20],
444                    &[OpType::Put, OpType::Put],
445                    &[117, 118],
446                ),
447            ]
448        );
449
450        let iter = input.into_iter().map(Ok);
451        let iter = PruneTimeIterator::new(
452            Box::new(iter),
453            (
454                Timestamp::new_millisecond(10),
455                Timestamp::new_millisecond(18),
456            ),
457            None,
458        );
459        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
460        assert_eq!(
461            actual,
462            [
463                new_batch(
464                    b"k1",
465                    &[10, 11],
466                    &[20, 20],
467                    &[OpType::Put, OpType::Put],
468                    &[110, 111],
469                ),
470                new_batch(
471                    b"k1",
472                    &[15, 16],
473                    &[20, 20],
474                    &[OpType::Put, OpType::Put],
475                    &[115, 116],
476                ),
477                new_batch(
478                    b"k1",
479                    &[17, 18],
480                    &[20, 20],
481                    &[OpType::Put, OpType::Put],
482                    &[117, 118],
483                ),
484            ]
485        );
486    }
487
488    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
489        let filters = expr
490            .iter()
491            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
492            .collect();
493        Some(Arc::new(filters))
494    }
495
496    #[test]
497    fn test_prune_time_iter_with_time_filters() {
498        let input = [
499            new_batch(
500                b"k1",
501                &[10, 11],
502                &[20, 20],
503                &[OpType::Put, OpType::Put],
504                &[110, 111],
505            ),
506            new_batch(
507                b"k1",
508                &[15, 16],
509                &[20, 20],
510                &[OpType::Put, OpType::Put],
511                &[115, 116],
512            ),
513            new_batch(
514                b"k1",
515                &[17, 18],
516                &[20, 20],
517                &[OpType::Put, OpType::Put],
518                &[117, 118],
519            ),
520        ];
521
522        let iter = input.clone().into_iter().map(Ok);
523        // We won't use the column name.
524        let time_filters = create_time_filters(&[
525            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
526            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
527        ]);
528        let iter = PruneTimeIterator::new(
529            Box::new(iter),
530            (
531                Timestamp::new_millisecond(10),
532                Timestamp::new_millisecond(20),
533            ),
534            time_filters,
535        );
536        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
537        assert_eq!(
538            actual,
539            [
540                new_batch(
541                    b"k1",
542                    &[10, 11],
543                    &[20, 20],
544                    &[OpType::Put, OpType::Put],
545                    &[110, 111],
546                ),
547                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
548            ]
549        );
550    }
551
552    #[test]
553    fn test_prune_time_iter_in_range_with_time_filters() {
554        let input = [
555            new_batch(
556                b"k1",
557                &[10, 11],
558                &[20, 20],
559                &[OpType::Put, OpType::Put],
560                &[110, 111],
561            ),
562            new_batch(
563                b"k1",
564                &[15, 16],
565                &[20, 20],
566                &[OpType::Put, OpType::Put],
567                &[115, 116],
568            ),
569            new_batch(
570                b"k1",
571                &[17, 18],
572                &[20, 20],
573                &[OpType::Put, OpType::Put],
574                &[117, 118],
575            ),
576        ];
577
578        let iter = input.clone().into_iter().map(Ok);
579        // We won't use the column name.
580        let time_filters = create_time_filters(&[
581            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
582            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
583        ]);
584        let iter = PruneTimeIterator::new(
585            Box::new(iter),
586            (
587                Timestamp::new_millisecond(5),
588                Timestamp::new_millisecond(18),
589            ),
590            time_filters,
591        );
592        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
593        assert_eq!(
594            actual,
595            [
596                new_batch(
597                    b"k1",
598                    &[10, 11],
599                    &[20, 20],
600                    &[OpType::Put, OpType::Put],
601                    &[110, 111],
602                ),
603                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
604            ]
605        );
606    }
607}