mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use snafu::ResultExt;
23
24use crate::error::{FilterRecordBatchSnafu, Result};
25use crate::memtable::BoxedBatchIterator;
26use crate::read::last_row::RowGroupLastRowCachedReader;
27use crate::read::{Batch, BatchReader};
28use crate::sst::file::FileTimeRange;
29use crate::sst::parquet::file_range::FileRangeContextRef;
30use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
31
32pub enum Source {
33    RowGroup(RowGroupReader),
34    LastRow(RowGroupLastRowCachedReader),
35}
36
37impl Source {
38    async fn next_batch(&mut self) -> Result<Option<Batch>> {
39        match self {
40            Source::RowGroup(r) => r.next_batch().await,
41            Source::LastRow(r) => r.next_batch().await,
42        }
43    }
44}
45
46pub struct PruneReader {
47    /// Context for file ranges.
48    context: FileRangeContextRef,
49    source: Source,
50    metrics: ReaderMetrics,
51}
52
53impl PruneReader {
54    pub(crate) fn new_with_row_group_reader(
55        ctx: FileRangeContextRef,
56        reader: RowGroupReader,
57    ) -> Self {
58        Self {
59            context: ctx,
60            source: Source::RowGroup(reader),
61            metrics: Default::default(),
62        }
63    }
64
65    pub(crate) fn new_with_last_row_reader(
66        ctx: FileRangeContextRef,
67        reader: RowGroupLastRowCachedReader,
68    ) -> Self {
69        Self {
70            context: ctx,
71            source: Source::LastRow(reader),
72            metrics: Default::default(),
73        }
74    }
75
76    pub(crate) fn reset_source(&mut self, source: Source) {
77        self.source = source;
78    }
79
80    /// Merge metrics with the inner reader and return the merged metrics.
81    pub(crate) fn metrics(&self) -> ReaderMetrics {
82        let mut metrics = self.metrics.clone();
83        match &self.source {
84            Source::RowGroup(r) => {
85                metrics.merge_from(r.metrics());
86            }
87            Source::LastRow(r) => {
88                if let Some(inner_metrics) = r.metrics() {
89                    metrics.merge_from(inner_metrics);
90                }
91            }
92        }
93
94        metrics
95    }
96
97    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
98        while let Some(b) = self.source.next_batch().await? {
99            match self.prune(b)? {
100                Some(b) => {
101                    return Ok(Some(b));
102                }
103                None => {
104                    continue;
105                }
106            }
107        }
108        Ok(None)
109    }
110
111    /// Prunes batches by the pushed down predicate.
112    fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
113        // fast path
114        if self.context.filters().is_empty() {
115            return Ok(Some(batch));
116        }
117
118        let num_rows_before_filter = batch.num_rows();
119        let Some(batch_filtered) = self.context.precise_filter(batch)? else {
120            // the entire batch is filtered out
121            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
122            return Ok(None);
123        };
124
125        // update metric
126        let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
127        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
128
129        if !batch_filtered.is_empty() {
130            Ok(Some(batch_filtered))
131        } else {
132            Ok(None)
133        }
134    }
135}
136
137/// An iterator that prunes batches by time range.
138pub(crate) struct PruneTimeIterator {
139    iter: BoxedBatchIterator,
140    time_range: FileTimeRange,
141    /// Precise time filters.
142    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
143}
144
145impl PruneTimeIterator {
146    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
147    pub(crate) fn new(
148        iter: BoxedBatchIterator,
149        time_range: FileTimeRange,
150        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
151    ) -> Self {
152        Self {
153            iter,
154            time_range,
155            time_filters,
156        }
157    }
158
159    /// Prune batch by time range.
160    fn prune(&self, batch: Batch) -> Result<Batch> {
161        if batch.is_empty() {
162            return Ok(batch);
163        }
164
165        // fast path, the batch is within the time range.
166        // Note that the time range is inclusive.
167        if self.time_range.0 <= batch.first_timestamp().unwrap()
168            && batch.last_timestamp().unwrap() <= self.time_range.1
169        {
170            return self.prune_by_time_filters(batch, Vec::new());
171        }
172
173        // slow path, prune the batch by time range.
174        // Note that the timestamp precision may be different from the time range.
175        // Safety: We know this is the timestamp type.
176        let unit = batch
177            .timestamps()
178            .data_type()
179            .as_timestamp()
180            .unwrap()
181            .unit();
182        let mut mask = Vec::with_capacity(batch.timestamps().len());
183        let timestamps = batch.timestamps_native().unwrap();
184        for ts in timestamps {
185            let ts = Timestamp::new(*ts, unit);
186            if self.time_range.0 <= ts && ts <= self.time_range.1 {
187                mask.push(true);
188            } else {
189                mask.push(false);
190            }
191        }
192
193        self.prune_by_time_filters(batch, mask)
194    }
195
196    /// Prunes the batch by time filters.
197    /// Also applies existing mask to the batch if the mask is not empty.
198    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
199        if let Some(filters) = &self.time_filters {
200            let mut mask = BooleanBuffer::new_set(batch.num_rows());
201            for filter in filters.iter() {
202                let result = filter
203                    .evaluate_vector(batch.timestamps())
204                    .context(FilterRecordBatchSnafu)?;
205                mask = mask.bitand(&result);
206            }
207
208            if !existing_mask.is_empty() {
209                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
210            }
211
212            batch.filter(&BooleanArray::from(mask).into())?;
213        } else if !existing_mask.is_empty() {
214            batch.filter(&BooleanArray::from(existing_mask).into())?;
215        }
216
217        Ok(batch)
218    }
219
220    // Prune and return the next non-empty batch.
221    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
222        while let Some(batch) = self.iter.next() {
223            let batch = batch?;
224            let pruned_batch = self.prune(batch)?;
225            if !pruned_batch.is_empty() {
226                return Ok(Some(pruned_batch));
227            }
228        }
229        Ok(None)
230    }
231}
232
233impl Iterator for PruneTimeIterator {
234    type Item = Result<Batch>;
235
236    fn next(&mut self) -> Option<Self::Item> {
237        self.next_non_empty_batch().transpose()
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use api::v1::OpType;
244    use datafusion_common::ScalarValue;
245    use datafusion_expr::{col, lit, Expr};
246
247    use super::*;
248    use crate::test_util::new_batch;
249
250    #[test]
251    fn test_prune_time_iter_empty() {
252        let input = [];
253        let iter = input.into_iter().map(Ok);
254        let iter = PruneTimeIterator::new(
255            Box::new(iter),
256            (
257                Timestamp::new_millisecond(0),
258                Timestamp::new_millisecond(1000),
259            ),
260            None,
261        );
262        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
263        assert!(actual.is_empty());
264    }
265
266    #[test]
267    fn test_prune_time_iter_filter() {
268        let input = [
269            new_batch(
270                b"k1",
271                &[10, 11],
272                &[20, 20],
273                &[OpType::Put, OpType::Put],
274                &[110, 111],
275            ),
276            new_batch(
277                b"k1",
278                &[15, 16],
279                &[20, 20],
280                &[OpType::Put, OpType::Put],
281                &[115, 116],
282            ),
283            new_batch(
284                b"k1",
285                &[17, 18],
286                &[20, 20],
287                &[OpType::Put, OpType::Put],
288                &[117, 118],
289            ),
290        ];
291
292        let iter = input.clone().into_iter().map(Ok);
293        let iter = PruneTimeIterator::new(
294            Box::new(iter),
295            (
296                Timestamp::new_millisecond(10),
297                Timestamp::new_millisecond(15),
298            ),
299            None,
300        );
301        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
302        assert_eq!(
303            actual,
304            [
305                new_batch(
306                    b"k1",
307                    &[10, 11],
308                    &[20, 20],
309                    &[OpType::Put, OpType::Put],
310                    &[110, 111],
311                ),
312                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
313            ]
314        );
315
316        let iter = input.clone().into_iter().map(Ok);
317        let iter = PruneTimeIterator::new(
318            Box::new(iter),
319            (
320                Timestamp::new_millisecond(11),
321                Timestamp::new_millisecond(20),
322            ),
323            None,
324        );
325        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
326        assert_eq!(
327            actual,
328            [
329                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
330                new_batch(
331                    b"k1",
332                    &[15, 16],
333                    &[20, 20],
334                    &[OpType::Put, OpType::Put],
335                    &[115, 116],
336                ),
337                new_batch(
338                    b"k1",
339                    &[17, 18],
340                    &[20, 20],
341                    &[OpType::Put, OpType::Put],
342                    &[117, 118],
343                ),
344            ]
345        );
346
347        let iter = input.into_iter().map(Ok);
348        let iter = PruneTimeIterator::new(
349            Box::new(iter),
350            (
351                Timestamp::new_millisecond(10),
352                Timestamp::new_millisecond(18),
353            ),
354            None,
355        );
356        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
357        assert_eq!(
358            actual,
359            [
360                new_batch(
361                    b"k1",
362                    &[10, 11],
363                    &[20, 20],
364                    &[OpType::Put, OpType::Put],
365                    &[110, 111],
366                ),
367                new_batch(
368                    b"k1",
369                    &[15, 16],
370                    &[20, 20],
371                    &[OpType::Put, OpType::Put],
372                    &[115, 116],
373                ),
374                new_batch(
375                    b"k1",
376                    &[17, 18],
377                    &[20, 20],
378                    &[OpType::Put, OpType::Put],
379                    &[117, 118],
380                ),
381            ]
382        );
383    }
384
385    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
386        let filters = expr
387            .iter()
388            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
389            .collect();
390        Some(Arc::new(filters))
391    }
392
393    #[test]
394    fn test_prune_time_iter_with_time_filters() {
395        let input = [
396            new_batch(
397                b"k1",
398                &[10, 11],
399                &[20, 20],
400                &[OpType::Put, OpType::Put],
401                &[110, 111],
402            ),
403            new_batch(
404                b"k1",
405                &[15, 16],
406                &[20, 20],
407                &[OpType::Put, OpType::Put],
408                &[115, 116],
409            ),
410            new_batch(
411                b"k1",
412                &[17, 18],
413                &[20, 20],
414                &[OpType::Put, OpType::Put],
415                &[117, 118],
416            ),
417        ];
418
419        let iter = input.clone().into_iter().map(Ok);
420        // We won't use the column name.
421        let time_filters = create_time_filters(&[
422            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
423            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
424        ]);
425        let iter = PruneTimeIterator::new(
426            Box::new(iter),
427            (
428                Timestamp::new_millisecond(10),
429                Timestamp::new_millisecond(20),
430            ),
431            time_filters,
432        );
433        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
434        assert_eq!(
435            actual,
436            [
437                new_batch(
438                    b"k1",
439                    &[10, 11],
440                    &[20, 20],
441                    &[OpType::Put, OpType::Put],
442                    &[110, 111],
443                ),
444                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
445            ]
446        );
447    }
448
449    #[test]
450    fn test_prune_time_iter_in_range_with_time_filters() {
451        let input = [
452            new_batch(
453                b"k1",
454                &[10, 11],
455                &[20, 20],
456                &[OpType::Put, OpType::Put],
457                &[110, 111],
458            ),
459            new_batch(
460                b"k1",
461                &[15, 16],
462                &[20, 20],
463                &[OpType::Put, OpType::Put],
464                &[115, 116],
465            ),
466            new_batch(
467                b"k1",
468                &[17, 18],
469                &[20, 20],
470                &[OpType::Put, OpType::Put],
471                &[117, 118],
472            ),
473        ];
474
475        let iter = input.clone().into_iter().map(Ok);
476        // We won't use the column name.
477        let time_filters = create_time_filters(&[
478            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
479            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
480        ]);
481        let iter = PruneTimeIterator::new(
482            Box::new(iter),
483            (
484                Timestamp::new_millisecond(5),
485                Timestamp::new_millisecond(18),
486            ),
487            time_filters,
488        );
489        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
490        assert_eq!(
491            actual,
492            [
493                new_batch(
494                    b"k1",
495                    &[10, 11],
496                    &[20, 20],
497                    &[OpType::Put, OpType::Put],
498                    &[110, 111],
499                ),
500                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
501            ]
502        );
503    }
504}