Skip to main content

mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33#[allow(dead_code)]
34pub enum Source {
35    RowGroup(RowGroupReader),
36    LastRow(RowGroupLastRowCachedReader),
37}
38
39#[allow(dead_code)]
40impl Source {
41    async fn next_batch(&mut self) -> Result<Option<Batch>> {
42        match self {
43            Source::RowGroup(r) => r.next_batch().await,
44            Source::LastRow(r) => r.next_batch().await,
45        }
46    }
47}
48
49#[allow(dead_code)]
50pub struct PruneReader {
51    /// Context for file ranges.
52    context: FileRangeContextRef,
53    source: Source,
54    metrics: ReaderMetrics,
55    /// Whether to skip field filters for this row group.
56    skip_fields: bool,
57}
58
59#[allow(dead_code)]
60impl PruneReader {
61    pub(crate) fn new_with_row_group_reader(
62        ctx: FileRangeContextRef,
63        reader: RowGroupReader,
64        skip_fields: bool,
65    ) -> Self {
66        Self {
67            context: ctx,
68            source: Source::RowGroup(reader),
69            metrics: Default::default(),
70            skip_fields,
71        }
72    }
73
74    pub(crate) fn new_with_last_row_reader(
75        ctx: FileRangeContextRef,
76        reader: RowGroupLastRowCachedReader,
77        skip_fields: bool,
78    ) -> Self {
79        Self {
80            context: ctx,
81            source: Source::LastRow(reader),
82            metrics: Default::default(),
83            skip_fields,
84        }
85    }
86
87    /// Merge metrics with the inner reader and return the merged metrics.
88    pub(crate) fn metrics(&self) -> ReaderMetrics {
89        let mut metrics = self.metrics.clone();
90        match &self.source {
91            Source::RowGroup(r) => {
92                metrics.merge_from(r.metrics());
93            }
94            Source::LastRow(r) => {
95                if let Some(inner_metrics) = r.metrics() {
96                    metrics.merge_from(inner_metrics);
97                }
98            }
99        }
100
101        metrics
102    }
103
104    pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
105        while let Some(b) = self.source.next_batch().await? {
106            match self.prune(b)? {
107                Some(b) => {
108                    return Ok(Some(b));
109                }
110                None => {
111                    continue;
112                }
113            }
114        }
115        Ok(None)
116    }
117
118    /// Prunes batches by the pushed down predicate.
119    fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
120        // fast path
121        if self.context.filters().is_empty() && !self.context.has_partition_filter() {
122            return Ok(Some(batch));
123        }
124
125        let num_rows_before_filter = batch.num_rows();
126        let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
127            // the entire batch is filtered out
128            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
129            return Ok(None);
130        };
131
132        // update metric
133        let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
134        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
135
136        if !batch_filtered.is_empty() {
137            Ok(Some(batch_filtered))
138        } else {
139            Ok(None)
140        }
141    }
142}
143
144/// An iterator that prunes batches by time range.
145pub(crate) struct PruneTimeIterator {
146    iter: BoxedBatchIterator,
147    time_range: FileTimeRange,
148    /// Precise time filters.
149    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
150}
151
152impl PruneTimeIterator {
153    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
154    pub(crate) fn new(
155        iter: BoxedBatchIterator,
156        time_range: FileTimeRange,
157        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
158    ) -> Self {
159        Self {
160            iter,
161            time_range,
162            time_filters,
163        }
164    }
165
166    /// Prune batch by time range.
167    fn prune(&self, batch: Batch) -> Result<Batch> {
168        if batch.is_empty() {
169            return Ok(batch);
170        }
171
172        // fast path, the batch is within the time range.
173        // Note that the time range is inclusive.
174        if self.time_range.0 <= batch.first_timestamp().unwrap()
175            && batch.last_timestamp().unwrap() <= self.time_range.1
176        {
177            return self.prune_by_time_filters(batch, Vec::new());
178        }
179
180        // slow path, prune the batch by time range.
181        // Note that the timestamp precision may be different from the time range.
182        // Safety: We know this is the timestamp type.
183        let unit = batch
184            .timestamps()
185            .data_type()
186            .as_timestamp()
187            .unwrap()
188            .unit();
189        let mut mask = Vec::with_capacity(batch.timestamps().len());
190        let timestamps = batch.timestamps_native().unwrap();
191        for ts in timestamps {
192            let ts = Timestamp::new(*ts, unit);
193            if self.time_range.0 <= ts && ts <= self.time_range.1 {
194                mask.push(true);
195            } else {
196                mask.push(false);
197            }
198        }
199
200        self.prune_by_time_filters(batch, mask)
201    }
202
203    /// Prunes the batch by time filters.
204    /// Also applies existing mask to the batch if the mask is not empty.
205    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
206        if let Some(filters) = &self.time_filters {
207            let mut mask = BooleanBuffer::new_set(batch.num_rows());
208            for filter in filters.iter() {
209                let result = filter
210                    .evaluate_vector(batch.timestamps())
211                    .context(RecordBatchSnafu)?;
212                mask = mask.bitand(&result);
213            }
214
215            if !existing_mask.is_empty() {
216                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
217            }
218
219            batch.filter(&BooleanArray::from(mask).into())?;
220        } else if !existing_mask.is_empty() {
221            batch.filter(&BooleanArray::from(existing_mask).into())?;
222        }
223
224        Ok(batch)
225    }
226
227    // Prune and return the next non-empty batch.
228    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
229        while let Some(batch) = self.iter.next() {
230            let batch = batch?;
231            let pruned_batch = self.prune(batch)?;
232            if !pruned_batch.is_empty() {
233                return Ok(Some(pruned_batch));
234            }
235        }
236        Ok(None)
237    }
238}
239
240impl Iterator for PruneTimeIterator {
241    type Item = Result<Batch>;
242
243    fn next(&mut self) -> Option<Self::Item> {
244        self.next_non_empty_batch().transpose()
245    }
246}
247
248pub enum FlatSource {
249    RowGroup(FlatRowGroupReader),
250    LastRow(FlatRowGroupLastRowCachedReader),
251}
252
253impl FlatSource {
254    async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
255        match self {
256            FlatSource::RowGroup(r) => r.next_batch().await,
257            FlatSource::LastRow(r) => r.next_batch().await,
258        }
259    }
260}
261
262/// A flat format reader that returns RecordBatch instead of Batch.
263pub struct FlatPruneReader {
264    /// Context for file ranges.
265    context: FileRangeContextRef,
266    source: FlatSource,
267    metrics: ReaderMetrics,
268    /// Whether to skip field filters for this row group.
269    skip_fields: bool,
270}
271
272impl FlatPruneReader {
273    pub(crate) fn new_with_row_group_reader(
274        ctx: FileRangeContextRef,
275        reader: FlatRowGroupReader,
276        skip_fields: bool,
277    ) -> Self {
278        Self {
279            context: ctx,
280            source: FlatSource::RowGroup(reader),
281            metrics: Default::default(),
282            skip_fields,
283        }
284    }
285
286    pub(crate) fn new_with_last_row_reader(
287        ctx: FileRangeContextRef,
288        reader: FlatRowGroupLastRowCachedReader,
289        skip_fields: bool,
290    ) -> Self {
291        Self {
292            context: ctx,
293            source: FlatSource::LastRow(reader),
294            metrics: Default::default(),
295            skip_fields,
296        }
297    }
298
299    /// Returns metrics.
300    pub(crate) fn metrics(&self) -> ReaderMetrics {
301        self.metrics.clone()
302    }
303
304    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
305        loop {
306            let start = std::time::Instant::now();
307            let batch = self.source.next_batch().await?;
308            self.metrics.scan_cost += start.elapsed();
309
310            let Some(record_batch) = batch else {
311                return Ok(None);
312            };
313
314            // Update metrics for the received batch
315            self.metrics.num_rows += record_batch.num_rows();
316            self.metrics.num_batches += 1;
317
318            match self.prune_flat(record_batch)? {
319                Some(filtered_batch) => {
320                    return Ok(Some(filtered_batch));
321                }
322                None => {
323                    continue;
324                }
325            }
326        }
327    }
328
329    /// Prunes batches by the pushed down predicate and returns RecordBatch.
330    fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
331        // fast path
332        if self.context.filters().is_empty() && !self.context.has_partition_filter() {
333            return Ok(Some(record_batch));
334        }
335
336        let num_rows_before_filter = record_batch.num_rows();
337        let Some(filtered_batch) = self
338            .context
339            .precise_filter_flat(record_batch, self.skip_fields)?
340        else {
341            // the entire batch is filtered out
342            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
343            return Ok(None);
344        };
345
346        // update metric
347        let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
348        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
349
350        if filtered_batch.num_rows() > 0 {
351            Ok(Some(filtered_batch))
352        } else {
353            Ok(None)
354        }
355    }
356}
357
358#[cfg(test)]
359mod tests {
360    use api::v1::OpType;
361    use datafusion_common::ScalarValue;
362    use datafusion_expr::{Expr, col, lit};
363
364    use super::*;
365    use crate::test_util::new_batch;
366
367    #[test]
368    fn test_prune_time_iter_empty() {
369        let input = [];
370        let iter = input.into_iter().map(Ok);
371        let iter = PruneTimeIterator::new(
372            Box::new(iter),
373            (
374                Timestamp::new_millisecond(0),
375                Timestamp::new_millisecond(1000),
376            ),
377            None,
378        );
379        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
380        assert!(actual.is_empty());
381    }
382
383    #[test]
384    fn test_prune_time_iter_filter() {
385        let input = [
386            new_batch(
387                b"k1",
388                &[10, 11],
389                &[20, 20],
390                &[OpType::Put, OpType::Put],
391                &[110, 111],
392            ),
393            new_batch(
394                b"k1",
395                &[15, 16],
396                &[20, 20],
397                &[OpType::Put, OpType::Put],
398                &[115, 116],
399            ),
400            new_batch(
401                b"k1",
402                &[17, 18],
403                &[20, 20],
404                &[OpType::Put, OpType::Put],
405                &[117, 118],
406            ),
407        ];
408
409        let iter = input.clone().into_iter().map(Ok);
410        let iter = PruneTimeIterator::new(
411            Box::new(iter),
412            (
413                Timestamp::new_millisecond(10),
414                Timestamp::new_millisecond(15),
415            ),
416            None,
417        );
418        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
419        assert_eq!(
420            actual,
421            [
422                new_batch(
423                    b"k1",
424                    &[10, 11],
425                    &[20, 20],
426                    &[OpType::Put, OpType::Put],
427                    &[110, 111],
428                ),
429                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
430            ]
431        );
432
433        let iter = input.clone().into_iter().map(Ok);
434        let iter = PruneTimeIterator::new(
435            Box::new(iter),
436            (
437                Timestamp::new_millisecond(11),
438                Timestamp::new_millisecond(20),
439            ),
440            None,
441        );
442        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
443        assert_eq!(
444            actual,
445            [
446                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
447                new_batch(
448                    b"k1",
449                    &[15, 16],
450                    &[20, 20],
451                    &[OpType::Put, OpType::Put],
452                    &[115, 116],
453                ),
454                new_batch(
455                    b"k1",
456                    &[17, 18],
457                    &[20, 20],
458                    &[OpType::Put, OpType::Put],
459                    &[117, 118],
460                ),
461            ]
462        );
463
464        let iter = input.into_iter().map(Ok);
465        let iter = PruneTimeIterator::new(
466            Box::new(iter),
467            (
468                Timestamp::new_millisecond(10),
469                Timestamp::new_millisecond(18),
470            ),
471            None,
472        );
473        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
474        assert_eq!(
475            actual,
476            [
477                new_batch(
478                    b"k1",
479                    &[10, 11],
480                    &[20, 20],
481                    &[OpType::Put, OpType::Put],
482                    &[110, 111],
483                ),
484                new_batch(
485                    b"k1",
486                    &[15, 16],
487                    &[20, 20],
488                    &[OpType::Put, OpType::Put],
489                    &[115, 116],
490                ),
491                new_batch(
492                    b"k1",
493                    &[17, 18],
494                    &[20, 20],
495                    &[OpType::Put, OpType::Put],
496                    &[117, 118],
497                ),
498            ]
499        );
500    }
501
502    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
503        let filters = expr
504            .iter()
505            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
506            .collect();
507        Some(Arc::new(filters))
508    }
509
510    #[test]
511    fn test_prune_time_iter_with_time_filters() {
512        let input = [
513            new_batch(
514                b"k1",
515                &[10, 11],
516                &[20, 20],
517                &[OpType::Put, OpType::Put],
518                &[110, 111],
519            ),
520            new_batch(
521                b"k1",
522                &[15, 16],
523                &[20, 20],
524                &[OpType::Put, OpType::Put],
525                &[115, 116],
526            ),
527            new_batch(
528                b"k1",
529                &[17, 18],
530                &[20, 20],
531                &[OpType::Put, OpType::Put],
532                &[117, 118],
533            ),
534        ];
535
536        let iter = input.clone().into_iter().map(Ok);
537        // We won't use the column name.
538        let time_filters = create_time_filters(&[
539            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
540            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
541        ]);
542        let iter = PruneTimeIterator::new(
543            Box::new(iter),
544            (
545                Timestamp::new_millisecond(10),
546                Timestamp::new_millisecond(20),
547            ),
548            time_filters,
549        );
550        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
551        assert_eq!(
552            actual,
553            [
554                new_batch(
555                    b"k1",
556                    &[10, 11],
557                    &[20, 20],
558                    &[OpType::Put, OpType::Put],
559                    &[110, 111],
560                ),
561                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
562            ]
563        );
564    }
565
566    #[test]
567    fn test_prune_time_iter_in_range_with_time_filters() {
568        let input = [
569            new_batch(
570                b"k1",
571                &[10, 11],
572                &[20, 20],
573                &[OpType::Put, OpType::Put],
574                &[110, 111],
575            ),
576            new_batch(
577                b"k1",
578                &[15, 16],
579                &[20, 20],
580                &[OpType::Put, OpType::Put],
581                &[115, 116],
582            ),
583            new_batch(
584                b"k1",
585                &[17, 18],
586                &[20, 20],
587                &[OpType::Put, OpType::Put],
588                &[117, 118],
589            ),
590        ];
591
592        let iter = input.clone().into_iter().map(Ok);
593        // We won't use the column name.
594        let time_filters = create_time_filters(&[
595            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
596            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
597        ]);
598        let iter = PruneTimeIterator::new(
599            Box::new(iter),
600            (
601                Timestamp::new_millisecond(5),
602                Timestamp::new_millisecond(18),
603            ),
604            time_filters,
605        );
606        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
607        assert_eq!(
608            actual,
609            [
610                new_batch(
611                    b"k1",
612                    &[10, 11],
613                    &[20, 20],
614                    &[OpType::Put, OpType::Put],
615                    &[110, 111],
616                ),
617                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
618            ]
619        );
620    }
621}