mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33pub enum Source {
34    RowGroup(RowGroupReader),
35    LastRow(RowGroupLastRowCachedReader),
36}
37
38impl Source {
39    async fn next_batch(&mut self) -> Result<Option<Batch>> {
40        match self {
41            Source::RowGroup(r) => r.next_batch().await,
42            Source::LastRow(r) => r.next_batch().await,
43        }
44    }
45}
46
47pub struct PruneReader {
48    /// Context for file ranges.
49    context: FileRangeContextRef,
50    source: Source,
51    metrics: ReaderMetrics,
52    /// Whether to skip field filters for this row group.
53    skip_fields: bool,
54}
55
56impl PruneReader {
57    pub(crate) fn new_with_row_group_reader(
58        ctx: FileRangeContextRef,
59        reader: RowGroupReader,
60        skip_fields: bool,
61    ) -> Self {
62        Self {
63            context: ctx,
64            source: Source::RowGroup(reader),
65            metrics: Default::default(),
66            skip_fields,
67        }
68    }
69
70    pub(crate) fn new_with_last_row_reader(
71        ctx: FileRangeContextRef,
72        reader: RowGroupLastRowCachedReader,
73        skip_fields: bool,
74    ) -> Self {
75        Self {
76            context: ctx,
77            source: Source::LastRow(reader),
78            metrics: Default::default(),
79            skip_fields,
80        }
81    }
82
83    /// Merge metrics with the inner reader and return the merged metrics.
84    pub(crate) fn metrics(&self) -> ReaderMetrics {
85        let mut metrics = self.metrics.clone();
86        match &self.source {
87            Source::RowGroup(r) => {
88                metrics.merge_from(r.metrics());
89            }
90            Source::LastRow(r) => {
91                if let Some(inner_metrics) = r.metrics() {
92                    metrics.merge_from(inner_metrics);
93                }
94            }
95        }
96
97        metrics
98    }
99
100    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
101        while let Some(b) = self.source.next_batch().await? {
102            match self.prune(b)? {
103                Some(b) => {
104                    return Ok(Some(b));
105                }
106                None => {
107                    continue;
108                }
109            }
110        }
111        Ok(None)
112    }
113
114    /// Prunes batches by the pushed down predicate.
115    fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
116        // fast path
117        if self.context.filters().is_empty() && !self.context.has_partition_filter() {
118            return Ok(Some(batch));
119        }
120
121        let num_rows_before_filter = batch.num_rows();
122        let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
123            // the entire batch is filtered out
124            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
125            return Ok(None);
126        };
127
128        // update metric
129        let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
130        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
131
132        if !batch_filtered.is_empty() {
133            Ok(Some(batch_filtered))
134        } else {
135            Ok(None)
136        }
137    }
138}
139
140/// An iterator that prunes batches by time range.
141pub(crate) struct PruneTimeIterator {
142    iter: BoxedBatchIterator,
143    time_range: FileTimeRange,
144    /// Precise time filters.
145    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
146}
147
148impl PruneTimeIterator {
149    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
150    pub(crate) fn new(
151        iter: BoxedBatchIterator,
152        time_range: FileTimeRange,
153        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
154    ) -> Self {
155        Self {
156            iter,
157            time_range,
158            time_filters,
159        }
160    }
161
162    /// Prune batch by time range.
163    fn prune(&self, batch: Batch) -> Result<Batch> {
164        if batch.is_empty() {
165            return Ok(batch);
166        }
167
168        // fast path, the batch is within the time range.
169        // Note that the time range is inclusive.
170        if self.time_range.0 <= batch.first_timestamp().unwrap()
171            && batch.last_timestamp().unwrap() <= self.time_range.1
172        {
173            return self.prune_by_time_filters(batch, Vec::new());
174        }
175
176        // slow path, prune the batch by time range.
177        // Note that the timestamp precision may be different from the time range.
178        // Safety: We know this is the timestamp type.
179        let unit = batch
180            .timestamps()
181            .data_type()
182            .as_timestamp()
183            .unwrap()
184            .unit();
185        let mut mask = Vec::with_capacity(batch.timestamps().len());
186        let timestamps = batch.timestamps_native().unwrap();
187        for ts in timestamps {
188            let ts = Timestamp::new(*ts, unit);
189            if self.time_range.0 <= ts && ts <= self.time_range.1 {
190                mask.push(true);
191            } else {
192                mask.push(false);
193            }
194        }
195
196        self.prune_by_time_filters(batch, mask)
197    }
198
199    /// Prunes the batch by time filters.
200    /// Also applies existing mask to the batch if the mask is not empty.
201    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
202        if let Some(filters) = &self.time_filters {
203            let mut mask = BooleanBuffer::new_set(batch.num_rows());
204            for filter in filters.iter() {
205                let result = filter
206                    .evaluate_vector(batch.timestamps())
207                    .context(RecordBatchSnafu)?;
208                mask = mask.bitand(&result);
209            }
210
211            if !existing_mask.is_empty() {
212                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
213            }
214
215            batch.filter(&BooleanArray::from(mask).into())?;
216        } else if !existing_mask.is_empty() {
217            batch.filter(&BooleanArray::from(existing_mask).into())?;
218        }
219
220        Ok(batch)
221    }
222
223    // Prune and return the next non-empty batch.
224    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
225        while let Some(batch) = self.iter.next() {
226            let batch = batch?;
227            let pruned_batch = self.prune(batch)?;
228            if !pruned_batch.is_empty() {
229                return Ok(Some(pruned_batch));
230            }
231        }
232        Ok(None)
233    }
234}
235
236impl Iterator for PruneTimeIterator {
237    type Item = Result<Batch>;
238
239    fn next(&mut self) -> Option<Self::Item> {
240        self.next_non_empty_batch().transpose()
241    }
242}
243
244pub enum FlatSource {
245    RowGroup(FlatRowGroupReader),
246    LastRow(FlatRowGroupLastRowCachedReader),
247}
248
249impl FlatSource {
250    fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
251        match self {
252            FlatSource::RowGroup(r) => r.next_batch(),
253            FlatSource::LastRow(r) => r.next_batch(),
254        }
255    }
256}
257
258/// A flat format reader that returns RecordBatch instead of Batch.
259pub struct FlatPruneReader {
260    /// Context for file ranges.
261    context: FileRangeContextRef,
262    source: FlatSource,
263    metrics: ReaderMetrics,
264    /// Whether to skip field filters for this row group.
265    skip_fields: bool,
266}
267
268impl FlatPruneReader {
269    pub(crate) fn new_with_row_group_reader(
270        ctx: FileRangeContextRef,
271        reader: FlatRowGroupReader,
272        skip_fields: bool,
273    ) -> Self {
274        Self {
275            context: ctx,
276            source: FlatSource::RowGroup(reader),
277            metrics: Default::default(),
278            skip_fields,
279        }
280    }
281
282    pub(crate) fn new_with_last_row_reader(
283        ctx: FileRangeContextRef,
284        reader: FlatRowGroupLastRowCachedReader,
285        skip_fields: bool,
286    ) -> Self {
287        Self {
288            context: ctx,
289            source: FlatSource::LastRow(reader),
290            metrics: Default::default(),
291            skip_fields,
292        }
293    }
294
295    /// Returns metrics.
296    pub(crate) fn metrics(&self) -> ReaderMetrics {
297        self.metrics.clone()
298    }
299
300    pub(crate) fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
301        while let Some(record_batch) = {
302            let start = std::time::Instant::now();
303            let batch = self.source.next_batch()?;
304            self.metrics.scan_cost += start.elapsed();
305            batch
306        } {
307            // Update metrics for the received batch
308            self.metrics.num_rows += record_batch.num_rows();
309            self.metrics.num_batches += 1;
310
311            match self.prune_flat(record_batch)? {
312                Some(filtered_batch) => {
313                    return Ok(Some(filtered_batch));
314                }
315                None => {
316                    continue;
317                }
318            }
319        }
320
321        Ok(None)
322    }
323
324    /// Prunes batches by the pushed down predicate and returns RecordBatch.
325    fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
326        // fast path
327        if self.context.filters().is_empty() && !self.context.has_partition_filter() {
328            return Ok(Some(record_batch));
329        }
330
331        let num_rows_before_filter = record_batch.num_rows();
332        let Some(filtered_batch) = self
333            .context
334            .precise_filter_flat(record_batch, self.skip_fields)?
335        else {
336            // the entire batch is filtered out
337            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
338            return Ok(None);
339        };
340
341        // update metric
342        let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
343        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
344
345        if filtered_batch.num_rows() > 0 {
346            Ok(Some(filtered_batch))
347        } else {
348            Ok(None)
349        }
350    }
351}
352
353#[cfg(test)]
354mod tests {
355    use api::v1::OpType;
356    use datafusion_common::ScalarValue;
357    use datafusion_expr::{Expr, col, lit};
358
359    use super::*;
360    use crate::test_util::new_batch;
361
362    #[test]
363    fn test_prune_time_iter_empty() {
364        let input = [];
365        let iter = input.into_iter().map(Ok);
366        let iter = PruneTimeIterator::new(
367            Box::new(iter),
368            (
369                Timestamp::new_millisecond(0),
370                Timestamp::new_millisecond(1000),
371            ),
372            None,
373        );
374        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
375        assert!(actual.is_empty());
376    }
377
378    #[test]
379    fn test_prune_time_iter_filter() {
380        let input = [
381            new_batch(
382                b"k1",
383                &[10, 11],
384                &[20, 20],
385                &[OpType::Put, OpType::Put],
386                &[110, 111],
387            ),
388            new_batch(
389                b"k1",
390                &[15, 16],
391                &[20, 20],
392                &[OpType::Put, OpType::Put],
393                &[115, 116],
394            ),
395            new_batch(
396                b"k1",
397                &[17, 18],
398                &[20, 20],
399                &[OpType::Put, OpType::Put],
400                &[117, 118],
401            ),
402        ];
403
404        let iter = input.clone().into_iter().map(Ok);
405        let iter = PruneTimeIterator::new(
406            Box::new(iter),
407            (
408                Timestamp::new_millisecond(10),
409                Timestamp::new_millisecond(15),
410            ),
411            None,
412        );
413        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
414        assert_eq!(
415            actual,
416            [
417                new_batch(
418                    b"k1",
419                    &[10, 11],
420                    &[20, 20],
421                    &[OpType::Put, OpType::Put],
422                    &[110, 111],
423                ),
424                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
425            ]
426        );
427
428        let iter = input.clone().into_iter().map(Ok);
429        let iter = PruneTimeIterator::new(
430            Box::new(iter),
431            (
432                Timestamp::new_millisecond(11),
433                Timestamp::new_millisecond(20),
434            ),
435            None,
436        );
437        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
438        assert_eq!(
439            actual,
440            [
441                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
442                new_batch(
443                    b"k1",
444                    &[15, 16],
445                    &[20, 20],
446                    &[OpType::Put, OpType::Put],
447                    &[115, 116],
448                ),
449                new_batch(
450                    b"k1",
451                    &[17, 18],
452                    &[20, 20],
453                    &[OpType::Put, OpType::Put],
454                    &[117, 118],
455                ),
456            ]
457        );
458
459        let iter = input.into_iter().map(Ok);
460        let iter = PruneTimeIterator::new(
461            Box::new(iter),
462            (
463                Timestamp::new_millisecond(10),
464                Timestamp::new_millisecond(18),
465            ),
466            None,
467        );
468        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
469        assert_eq!(
470            actual,
471            [
472                new_batch(
473                    b"k1",
474                    &[10, 11],
475                    &[20, 20],
476                    &[OpType::Put, OpType::Put],
477                    &[110, 111],
478                ),
479                new_batch(
480                    b"k1",
481                    &[15, 16],
482                    &[20, 20],
483                    &[OpType::Put, OpType::Put],
484                    &[115, 116],
485                ),
486                new_batch(
487                    b"k1",
488                    &[17, 18],
489                    &[20, 20],
490                    &[OpType::Put, OpType::Put],
491                    &[117, 118],
492                ),
493            ]
494        );
495    }
496
497    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
498        let filters = expr
499            .iter()
500            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
501            .collect();
502        Some(Arc::new(filters))
503    }
504
505    #[test]
506    fn test_prune_time_iter_with_time_filters() {
507        let input = [
508            new_batch(
509                b"k1",
510                &[10, 11],
511                &[20, 20],
512                &[OpType::Put, OpType::Put],
513                &[110, 111],
514            ),
515            new_batch(
516                b"k1",
517                &[15, 16],
518                &[20, 20],
519                &[OpType::Put, OpType::Put],
520                &[115, 116],
521            ),
522            new_batch(
523                b"k1",
524                &[17, 18],
525                &[20, 20],
526                &[OpType::Put, OpType::Put],
527                &[117, 118],
528            ),
529        ];
530
531        let iter = input.clone().into_iter().map(Ok);
532        // We won't use the column name.
533        let time_filters = create_time_filters(&[
534            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
535            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
536        ]);
537        let iter = PruneTimeIterator::new(
538            Box::new(iter),
539            (
540                Timestamp::new_millisecond(10),
541                Timestamp::new_millisecond(20),
542            ),
543            time_filters,
544        );
545        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
546        assert_eq!(
547            actual,
548            [
549                new_batch(
550                    b"k1",
551                    &[10, 11],
552                    &[20, 20],
553                    &[OpType::Put, OpType::Put],
554                    &[110, 111],
555                ),
556                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
557            ]
558        );
559    }
560
561    #[test]
562    fn test_prune_time_iter_in_range_with_time_filters() {
563        let input = [
564            new_batch(
565                b"k1",
566                &[10, 11],
567                &[20, 20],
568                &[OpType::Put, OpType::Put],
569                &[110, 111],
570            ),
571            new_batch(
572                b"k1",
573                &[15, 16],
574                &[20, 20],
575                &[OpType::Put, OpType::Put],
576                &[115, 116],
577            ),
578            new_batch(
579                b"k1",
580                &[17, 18],
581                &[20, 20],
582                &[OpType::Put, OpType::Put],
583                &[117, 118],
584            ),
585        ];
586
587        let iter = input.clone().into_iter().map(Ok);
588        // We won't use the column name.
589        let time_filters = create_time_filters(&[
590            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
591            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
592        ]);
593        let iter = PruneTimeIterator::new(
594            Box::new(iter),
595            (
596                Timestamp::new_millisecond(5),
597                Timestamp::new_millisecond(18),
598            ),
599            time_filters,
600        );
601        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
602        assert_eq!(
603            actual,
604            [
605                new_batch(
606                    b"k1",
607                    &[10, 11],
608                    &[20, 20],
609                    &[OpType::Put, OpType::Put],
610                    &[110, 111],
611                ),
612                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
613            ]
614        );
615    }
616}