mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::RowGroupLastRowCachedReader;
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34    RowGroup(RowGroupReader),
35    LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39    async fn next_batch(&mut self) -> Result<Option<Batch>> {
40        match self {
41            Source::RowGroup(r) => r.next_batch().await,
42            Source::LastRow(r) => r.next_batch().await,
43        }
44    }
45}
46
47pub struct PruneReader {
48    /// Context for file ranges.
49    context: FileRangeContextRef,
50    source: Source,
51    metrics: ReaderMetrics,
52}
53
54impl PruneReader {
55    pub(crate) fn new_with_row_group_reader(
56        ctx: FileRangeContextRef,
57        reader: RowGroupReader,
58    ) -> Self {
59        Self {
60            context: ctx,
61            source: Source::RowGroup(reader),
62            metrics: Default::default(),
63        }
64    }
65
66    pub(crate) fn new_with_last_row_reader(
67        ctx: FileRangeContextRef,
68        reader: RowGroupLastRowCachedReader,
69    ) -> Self {
70        Self {
71            context: ctx,
72            source: Source::LastRow(reader),
73            metrics: Default::default(),
74        }
75    }
76
77    pub(crate) fn reset_source(&mut self, source: Source) {
78        self.source = source;
79    }
80
81    /// Merge metrics with the inner reader and return the merged metrics.
82    pub(crate) fn metrics(&self) -> ReaderMetrics {
83        let mut metrics = self.metrics.clone();
84        match &self.source {
85            Source::RowGroup(r) => {
86                metrics.merge_from(r.metrics());
87            }
88            Source::LastRow(r) => {
89                if let Some(inner_metrics) = r.metrics() {
90                    metrics.merge_from(inner_metrics);
91                }
92            }
93        }
94
95        metrics
96    }
97
98    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
99        while let Some(b) = self.source.next_batch().await? {
100            match self.prune(b)? {
101                Some(b) => {
102                    return Ok(Some(b));
103                }
104                None => {
105                    continue;
106                }
107            }
108        }
109        Ok(None)
110    }
111
112    /// Prunes batches by the pushed down predicate.
113    fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
114        // fast path
115        if self.context.filters().is_empty() {
116            return Ok(Some(batch));
117        }
118
119        let num_rows_before_filter = batch.num_rows();
120        let Some(batch_filtered) = self.context.precise_filter(batch)? else {
121            // the entire batch is filtered out
122            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
123            return Ok(None);
124        };
125
126        // update metric
127        let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
128        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
129
130        if !batch_filtered.is_empty() {
131            Ok(Some(batch_filtered))
132        } else {
133            Ok(None)
134        }
135    }
136}
137
138/// An iterator that prunes batches by time range.
139pub(crate) struct PruneTimeIterator {
140    iter: BoxedBatchIterator,
141    time_range: FileTimeRange,
142    /// Precise time filters.
143    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
144}
145
146impl PruneTimeIterator {
147    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
148    pub(crate) fn new(
149        iter: BoxedBatchIterator,
150        time_range: FileTimeRange,
151        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
152    ) -> Self {
153        Self {
154            iter,
155            time_range,
156            time_filters,
157        }
158    }
159
160    /// Prune batch by time range.
161    fn prune(&self, batch: Batch) -> Result<Batch> {
162        if batch.is_empty() {
163            return Ok(batch);
164        }
165
166        // fast path, the batch is within the time range.
167        // Note that the time range is inclusive.
168        if self.time_range.0 <= batch.first_timestamp().unwrap()
169            && batch.last_timestamp().unwrap() <= self.time_range.1
170        {
171            return self.prune_by_time_filters(batch, Vec::new());
172        }
173
174        // slow path, prune the batch by time range.
175        // Note that the timestamp precision may be different from the time range.
176        // Safety: We know this is the timestamp type.
177        let unit = batch
178            .timestamps()
179            .data_type()
180            .as_timestamp()
181            .unwrap()
182            .unit();
183        let mut mask = Vec::with_capacity(batch.timestamps().len());
184        let timestamps = batch.timestamps_native().unwrap();
185        for ts in timestamps {
186            let ts = Timestamp::new(*ts, unit);
187            if self.time_range.0 <= ts && ts <= self.time_range.1 {
188                mask.push(true);
189            } else {
190                mask.push(false);
191            }
192        }
193
194        self.prune_by_time_filters(batch, mask)
195    }
196
197    /// Prunes the batch by time filters.
198    /// Also applies existing mask to the batch if the mask is not empty.
199    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
200        if let Some(filters) = &self.time_filters {
201            let mut mask = BooleanBuffer::new_set(batch.num_rows());
202            for filter in filters.iter() {
203                let result = filter
204                    .evaluate_vector(batch.timestamps())
205                    .context(RecordBatchSnafu)?;
206                mask = mask.bitand(&result);
207            }
208
209            if !existing_mask.is_empty() {
210                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
211            }
212
213            batch.filter(&BooleanArray::from(mask).into())?;
214        } else if !existing_mask.is_empty() {
215            batch.filter(&BooleanArray::from(existing_mask).into())?;
216        }
217
218        Ok(batch)
219    }
220
221    // Prune and return the next non-empty batch.
222    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
223        while let Some(batch) = self.iter.next() {
224            let batch = batch?;
225            let pruned_batch = self.prune(batch)?;
226            if !pruned_batch.is_empty() {
227                return Ok(Some(pruned_batch));
228            }
229        }
230        Ok(None)
231    }
232}
233
234impl Iterator for PruneTimeIterator {
235    type Item = Result<Batch>;
236
237    fn next(&mut self) -> Option<Self::Item> {
238        self.next_non_empty_batch().transpose()
239    }
240}
241
242pub enum FlatSource {
243    RowGroup(FlatRowGroupReader),
244}
245
246impl FlatSource {
247    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
248        match self {
249            FlatSource::RowGroup(r) => r.next_batch(),
250        }
251    }
252}
253
254/// A flat format reader that returns RecordBatch instead of Batch.
255pub struct FlatPruneReader {
256    /// Context for file ranges.
257    context: FileRangeContextRef,
258    source: FlatSource,
259    metrics: ReaderMetrics,
260}
261
262impl FlatPruneReader {
263    pub(crate) fn new_with_row_group_reader(
264        ctx: FileRangeContextRef,
265        reader: FlatRowGroupReader,
266    ) -> Self {
267        Self {
268            context: ctx,
269            source: FlatSource::RowGroup(reader),
270            metrics: Default::default(),
271        }
272    }
273
274    /// Merge metrics with the inner reader and return the merged metrics.
275    pub(crate) fn metrics(&self) -> ReaderMetrics {
276        // FlatRowGroupReader doesn't collect metrics, so just return our own
277        self.metrics.clone()
278    }
279
280    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
281        while let Some(record_batch) = {
282            let start = std::time::Instant::now();
283            let batch = self.source.next_batch()?;
284            self.metrics.scan_cost += start.elapsed();
285            batch
286        } {
287            // Update metrics for the received batch
288            self.metrics.num_rows += record_batch.num_rows();
289            self.metrics.num_batches += 1;
290
291            match self.prune_flat(record_batch)? {
292                Some(filtered_batch) => {
293                    return Ok(Some(filtered_batch));
294                }
295                None => {
296                    continue;
297                }
298            }
299        }
300
301        Ok(None)
302    }
303
304    /// Prunes batches by the pushed down predicate and returns RecordBatch.
305    fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
306        // fast path
307        if self.context.filters().is_empty() {
308            return Ok(Some(record_batch));
309        }
310
311        let num_rows_before_filter = record_batch.num_rows();
312        let Some(filtered_batch) = self.context.precise_filter_flat(record_batch)? else {
313            // the entire batch is filtered out
314            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
315            return Ok(None);
316        };
317
318        // update metric
319        let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
320        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
321
322        if filtered_batch.num_rows() > 0 {
323            Ok(Some(filtered_batch))
324        } else {
325            Ok(None)
326        }
327    }
328}
329
330#[cfg(test)]
331mod tests {
332    use api::v1::OpType;
333    use datafusion_common::ScalarValue;
334    use datafusion_expr::{col, lit, Expr};
335
336    use super::*;
337    use crate::test_util::new_batch;
338
339    #[test]
340    fn test_prune_time_iter_empty() {
341        let input = [];
342        let iter = input.into_iter().map(Ok);
343        let iter = PruneTimeIterator::new(
344            Box::new(iter),
345            (
346                Timestamp::new_millisecond(0),
347                Timestamp::new_millisecond(1000),
348            ),
349            None,
350        );
351        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
352        assert!(actual.is_empty());
353    }
354
355    #[test]
356    fn test_prune_time_iter_filter() {
357        let input = [
358            new_batch(
359                b"k1",
360                &[10, 11],
361                &[20, 20],
362                &[OpType::Put, OpType::Put],
363                &[110, 111],
364            ),
365            new_batch(
366                b"k1",
367                &[15, 16],
368                &[20, 20],
369                &[OpType::Put, OpType::Put],
370                &[115, 116],
371            ),
372            new_batch(
373                b"k1",
374                &[17, 18],
375                &[20, 20],
376                &[OpType::Put, OpType::Put],
377                &[117, 118],
378            ),
379        ];
380
381        let iter = input.clone().into_iter().map(Ok);
382        let iter = PruneTimeIterator::new(
383            Box::new(iter),
384            (
385                Timestamp::new_millisecond(10),
386                Timestamp::new_millisecond(15),
387            ),
388            None,
389        );
390        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
391        assert_eq!(
392            actual,
393            [
394                new_batch(
395                    b"k1",
396                    &[10, 11],
397                    &[20, 20],
398                    &[OpType::Put, OpType::Put],
399                    &[110, 111],
400                ),
401                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
402            ]
403        );
404
405        let iter = input.clone().into_iter().map(Ok);
406        let iter = PruneTimeIterator::new(
407            Box::new(iter),
408            (
409                Timestamp::new_millisecond(11),
410                Timestamp::new_millisecond(20),
411            ),
412            None,
413        );
414        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
415        assert_eq!(
416            actual,
417            [
418                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
419                new_batch(
420                    b"k1",
421                    &[15, 16],
422                    &[20, 20],
423                    &[OpType::Put, OpType::Put],
424                    &[115, 116],
425                ),
426                new_batch(
427                    b"k1",
428                    &[17, 18],
429                    &[20, 20],
430                    &[OpType::Put, OpType::Put],
431                    &[117, 118],
432                ),
433            ]
434        );
435
436        let iter = input.into_iter().map(Ok);
437        let iter = PruneTimeIterator::new(
438            Box::new(iter),
439            (
440                Timestamp::new_millisecond(10),
441                Timestamp::new_millisecond(18),
442            ),
443            None,
444        );
445        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
446        assert_eq!(
447            actual,
448            [
449                new_batch(
450                    b"k1",
451                    &[10, 11],
452                    &[20, 20],
453                    &[OpType::Put, OpType::Put],
454                    &[110, 111],
455                ),
456                new_batch(
457                    b"k1",
458                    &[15, 16],
459                    &[20, 20],
460                    &[OpType::Put, OpType::Put],
461                    &[115, 116],
462                ),
463                new_batch(
464                    b"k1",
465                    &[17, 18],
466                    &[20, 20],
467                    &[OpType::Put, OpType::Put],
468                    &[117, 118],
469                ),
470            ]
471        );
472    }
473
474    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
475        let filters = expr
476            .iter()
477            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
478            .collect();
479        Some(Arc::new(filters))
480    }
481
482    #[test]
483    fn test_prune_time_iter_with_time_filters() {
484        let input = [
485            new_batch(
486                b"k1",
487                &[10, 11],
488                &[20, 20],
489                &[OpType::Put, OpType::Put],
490                &[110, 111],
491            ),
492            new_batch(
493                b"k1",
494                &[15, 16],
495                &[20, 20],
496                &[OpType::Put, OpType::Put],
497                &[115, 116],
498            ),
499            new_batch(
500                b"k1",
501                &[17, 18],
502                &[20, 20],
503                &[OpType::Put, OpType::Put],
504                &[117, 118],
505            ),
506        ];
507
508        let iter = input.clone().into_iter().map(Ok);
509        // We won't use the column name.
510        let time_filters = create_time_filters(&[
511            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
512            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
513        ]);
514        let iter = PruneTimeIterator::new(
515            Box::new(iter),
516            (
517                Timestamp::new_millisecond(10),
518                Timestamp::new_millisecond(20),
519            ),
520            time_filters,
521        );
522        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
523        assert_eq!(
524            actual,
525            [
526                new_batch(
527                    b"k1",
528                    &[10, 11],
529                    &[20, 20],
530                    &[OpType::Put, OpType::Put],
531                    &[110, 111],
532                ),
533                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
534            ]
535        );
536    }
537
538    #[test]
539    fn test_prune_time_iter_in_range_with_time_filters() {
540        let input = [
541            new_batch(
542                b"k1",
543                &[10, 11],
544                &[20, 20],
545                &[OpType::Put, OpType::Put],
546                &[110, 111],
547            ),
548            new_batch(
549                b"k1",
550                &[15, 16],
551                &[20, 20],
552                &[OpType::Put, OpType::Put],
553                &[115, 116],
554            ),
555            new_batch(
556                b"k1",
557                &[17, 18],
558                &[20, 20],
559                &[OpType::Put, OpType::Put],
560                &[117, 118],
561            ),
562        ];
563
564        let iter = input.clone().into_iter().map(Ok);
565        // We won't use the column name.
566        let time_filters = create_time_filters(&[
567            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
568            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
569        ]);
570        let iter = PruneTimeIterator::new(
571            Box::new(iter),
572            (
573                Timestamp::new_millisecond(5),
574                Timestamp::new_millisecond(18),
575            ),
576            time_filters,
577        );
578        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
579        assert_eq!(
580            actual,
581            [
582                new_batch(
583                    b"k1",
584                    &[10, 11],
585                    &[20, 20],
586                    &[OpType::Put, OpType::Put],
587                    &[110, 111],
588                ),
589                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
590            ]
591        );
592    }
593}