Skip to main content

mito2/read/
prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::Batch;
28use crate::read::last_row::FlatRowGroupLastRowCachedReader;
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics};
32
33/// An iterator that prunes batches by time range.
34pub(crate) struct PruneTimeIterator {
35    iter: BoxedBatchIterator,
36    time_range: FileTimeRange,
37    /// Precise time filters.
38    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
39}
40
41impl PruneTimeIterator {
42    /// Creates a new `PruneTimeIterator` with the given iterator and time range.
43    pub(crate) fn new(
44        iter: BoxedBatchIterator,
45        time_range: FileTimeRange,
46        time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
47    ) -> Self {
48        Self {
49            iter,
50            time_range,
51            time_filters,
52        }
53    }
54
55    /// Prune batch by time range.
56    fn prune(&self, batch: Batch) -> Result<Batch> {
57        if batch.is_empty() {
58            return Ok(batch);
59        }
60
61        // fast path, the batch is within the time range.
62        // Note that the time range is inclusive.
63        if self.time_range.0 <= batch.first_timestamp().unwrap()
64            && batch.last_timestamp().unwrap() <= self.time_range.1
65        {
66            return self.prune_by_time_filters(batch, Vec::new());
67        }
68
69        // slow path, prune the batch by time range.
70        // Note that the timestamp precision may be different from the time range.
71        // Safety: We know this is the timestamp type.
72        let unit = batch
73            .timestamps()
74            .data_type()
75            .as_timestamp()
76            .unwrap()
77            .unit();
78        let mut mask = Vec::with_capacity(batch.timestamps().len());
79        let timestamps = batch.timestamps_native().unwrap();
80        for ts in timestamps {
81            let ts = Timestamp::new(*ts, unit);
82            if self.time_range.0 <= ts && ts <= self.time_range.1 {
83                mask.push(true);
84            } else {
85                mask.push(false);
86            }
87        }
88
89        self.prune_by_time_filters(batch, mask)
90    }
91
92    /// Prunes the batch by time filters.
93    /// Also applies existing mask to the batch if the mask is not empty.
94    fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
95        if let Some(filters) = &self.time_filters {
96            let mut mask = BooleanBuffer::new_set(batch.num_rows());
97            for filter in filters.iter() {
98                let result = filter
99                    .evaluate_vector(batch.timestamps())
100                    .context(RecordBatchSnafu)?;
101                mask = mask.bitand(&result);
102            }
103
104            if !existing_mask.is_empty() {
105                mask = mask.bitand(&BooleanBuffer::from(existing_mask));
106            }
107
108            batch.filter(&BooleanArray::from(mask).into())?;
109        } else if !existing_mask.is_empty() {
110            batch.filter(&BooleanArray::from(existing_mask).into())?;
111        }
112
113        Ok(batch)
114    }
115
116    // Prune and return the next non-empty batch.
117    fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
118        while let Some(batch) = self.iter.next() {
119            let batch = batch?;
120            let pruned_batch = self.prune(batch)?;
121            if !pruned_batch.is_empty() {
122                return Ok(Some(pruned_batch));
123            }
124        }
125        Ok(None)
126    }
127}
128
129impl Iterator for PruneTimeIterator {
130    type Item = Result<Batch>;
131
132    fn next(&mut self) -> Option<Self::Item> {
133        self.next_non_empty_batch().transpose()
134    }
135}
136
137pub enum FlatSource {
138    RowGroup(FlatRowGroupReader),
139    LastRow(FlatRowGroupLastRowCachedReader),
140}
141
142impl FlatSource {
143    async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
144        match self {
145            FlatSource::RowGroup(r) => r.next_batch().await,
146            FlatSource::LastRow(r) => r.next_batch().await,
147        }
148    }
149}
150
151/// A flat format reader that returns RecordBatch instead of Batch.
152pub struct FlatPruneReader {
153    /// Context for file ranges.
154    context: FileRangeContextRef,
155    source: FlatSource,
156    metrics: ReaderMetrics,
157    /// Whether to skip field filters for this row group.
158    skip_fields: bool,
159}
160
161impl FlatPruneReader {
162    pub(crate) fn new_with_row_group_reader(
163        ctx: FileRangeContextRef,
164        reader: FlatRowGroupReader,
165        skip_fields: bool,
166    ) -> Self {
167        Self {
168            context: ctx,
169            source: FlatSource::RowGroup(reader),
170            metrics: Default::default(),
171            skip_fields,
172        }
173    }
174
175    pub(crate) fn new_with_last_row_reader(
176        ctx: FileRangeContextRef,
177        reader: FlatRowGroupLastRowCachedReader,
178        skip_fields: bool,
179    ) -> Self {
180        Self {
181            context: ctx,
182            source: FlatSource::LastRow(reader),
183            metrics: Default::default(),
184            skip_fields,
185        }
186    }
187
188    /// Returns metrics.
189    pub(crate) fn metrics(&self) -> ReaderMetrics {
190        self.metrics.clone()
191    }
192
193    pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
194        loop {
195            let start = std::time::Instant::now();
196            let batch = self.source.next_batch().await?;
197            self.metrics.scan_cost += start.elapsed();
198
199            let Some(record_batch) = batch else {
200                return Ok(None);
201            };
202
203            // Update metrics for the received batch
204            self.metrics.num_rows += record_batch.num_rows();
205            self.metrics.num_batches += 1;
206
207            match self.prune_flat(record_batch)? {
208                Some(filtered_batch) => {
209                    return Ok(Some(filtered_batch));
210                }
211                None => {
212                    continue;
213                }
214            }
215        }
216    }
217
218    /// Prunes batches by the pushed down predicate and returns RecordBatch.
219    fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
220        // fast path
221        if self.context.filters().is_empty() && !self.context.has_partition_filter() {
222            return Ok(Some(record_batch));
223        }
224
225        let num_rows_before_filter = record_batch.num_rows();
226        let Some(filtered_batch) = self
227            .context
228            .precise_filter_flat(record_batch, self.skip_fields)?
229        else {
230            // the entire batch is filtered out
231            self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
232            return Ok(None);
233        };
234
235        // update metric
236        let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
237        self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
238
239        if filtered_batch.num_rows() > 0 {
240            Ok(Some(filtered_batch))
241        } else {
242            Ok(None)
243        }
244    }
245}
246
247#[cfg(test)]
248mod tests {
249    use api::v1::OpType;
250    use datafusion_common::ScalarValue;
251    use datafusion_expr::{Expr, col, lit};
252
253    use super::*;
254    use crate::test_util::new_batch;
255
256    #[test]
257    fn test_prune_time_iter_empty() {
258        let input = [];
259        let iter = input.into_iter().map(Ok);
260        let iter = PruneTimeIterator::new(
261            Box::new(iter),
262            (
263                Timestamp::new_millisecond(0),
264                Timestamp::new_millisecond(1000),
265            ),
266            None,
267        );
268        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
269        assert!(actual.is_empty());
270    }
271
272    #[test]
273    fn test_prune_time_iter_filter() {
274        let input = [
275            new_batch(
276                b"k1",
277                &[10, 11],
278                &[20, 20],
279                &[OpType::Put, OpType::Put],
280                &[110, 111],
281            ),
282            new_batch(
283                b"k1",
284                &[15, 16],
285                &[20, 20],
286                &[OpType::Put, OpType::Put],
287                &[115, 116],
288            ),
289            new_batch(
290                b"k1",
291                &[17, 18],
292                &[20, 20],
293                &[OpType::Put, OpType::Put],
294                &[117, 118],
295            ),
296        ];
297
298        let iter = input.clone().into_iter().map(Ok);
299        let iter = PruneTimeIterator::new(
300            Box::new(iter),
301            (
302                Timestamp::new_millisecond(10),
303                Timestamp::new_millisecond(15),
304            ),
305            None,
306        );
307        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
308        assert_eq!(
309            actual,
310            [
311                new_batch(
312                    b"k1",
313                    &[10, 11],
314                    &[20, 20],
315                    &[OpType::Put, OpType::Put],
316                    &[110, 111],
317                ),
318                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
319            ]
320        );
321
322        let iter = input.clone().into_iter().map(Ok);
323        let iter = PruneTimeIterator::new(
324            Box::new(iter),
325            (
326                Timestamp::new_millisecond(11),
327                Timestamp::new_millisecond(20),
328            ),
329            None,
330        );
331        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
332        assert_eq!(
333            actual,
334            [
335                new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
336                new_batch(
337                    b"k1",
338                    &[15, 16],
339                    &[20, 20],
340                    &[OpType::Put, OpType::Put],
341                    &[115, 116],
342                ),
343                new_batch(
344                    b"k1",
345                    &[17, 18],
346                    &[20, 20],
347                    &[OpType::Put, OpType::Put],
348                    &[117, 118],
349                ),
350            ]
351        );
352
353        let iter = input.into_iter().map(Ok);
354        let iter = PruneTimeIterator::new(
355            Box::new(iter),
356            (
357                Timestamp::new_millisecond(10),
358                Timestamp::new_millisecond(18),
359            ),
360            None,
361        );
362        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
363        assert_eq!(
364            actual,
365            [
366                new_batch(
367                    b"k1",
368                    &[10, 11],
369                    &[20, 20],
370                    &[OpType::Put, OpType::Put],
371                    &[110, 111],
372                ),
373                new_batch(
374                    b"k1",
375                    &[15, 16],
376                    &[20, 20],
377                    &[OpType::Put, OpType::Put],
378                    &[115, 116],
379                ),
380                new_batch(
381                    b"k1",
382                    &[17, 18],
383                    &[20, 20],
384                    &[OpType::Put, OpType::Put],
385                    &[117, 118],
386                ),
387            ]
388        );
389    }
390
391    fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
392        let filters = expr
393            .iter()
394            .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
395            .collect();
396        Some(Arc::new(filters))
397    }
398
399    #[test]
400    fn test_prune_time_iter_with_time_filters() {
401        let input = [
402            new_batch(
403                b"k1",
404                &[10, 11],
405                &[20, 20],
406                &[OpType::Put, OpType::Put],
407                &[110, 111],
408            ),
409            new_batch(
410                b"k1",
411                &[15, 16],
412                &[20, 20],
413                &[OpType::Put, OpType::Put],
414                &[115, 116],
415            ),
416            new_batch(
417                b"k1",
418                &[17, 18],
419                &[20, 20],
420                &[OpType::Put, OpType::Put],
421                &[117, 118],
422            ),
423        ];
424
425        let iter = input.clone().into_iter().map(Ok);
426        // We won't use the column name.
427        let time_filters = create_time_filters(&[
428            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
429            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
430        ]);
431        let iter = PruneTimeIterator::new(
432            Box::new(iter),
433            (
434                Timestamp::new_millisecond(10),
435                Timestamp::new_millisecond(20),
436            ),
437            time_filters,
438        );
439        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
440        assert_eq!(
441            actual,
442            [
443                new_batch(
444                    b"k1",
445                    &[10, 11],
446                    &[20, 20],
447                    &[OpType::Put, OpType::Put],
448                    &[110, 111],
449                ),
450                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
451            ]
452        );
453    }
454
455    #[test]
456    fn test_prune_time_iter_in_range_with_time_filters() {
457        let input = [
458            new_batch(
459                b"k1",
460                &[10, 11],
461                &[20, 20],
462                &[OpType::Put, OpType::Put],
463                &[110, 111],
464            ),
465            new_batch(
466                b"k1",
467                &[15, 16],
468                &[20, 20],
469                &[OpType::Put, OpType::Put],
470                &[115, 116],
471            ),
472            new_batch(
473                b"k1",
474                &[17, 18],
475                &[20, 20],
476                &[OpType::Put, OpType::Put],
477                &[117, 118],
478            ),
479        ];
480
481        let iter = input.clone().into_iter().map(Ok);
482        // We won't use the column name.
483        let time_filters = create_time_filters(&[
484            col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
485            col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
486        ]);
487        let iter = PruneTimeIterator::new(
488            Box::new(iter),
489            (
490                Timestamp::new_millisecond(5),
491                Timestamp::new_millisecond(18),
492            ),
493            time_filters,
494        );
495        let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
496        assert_eq!(
497            actual,
498            [
499                new_batch(
500                    b"k1",
501                    &[10, 11],
502                    &[20, 20],
503                    &[OpType::Put, OpType::Put],
504                    &[110, 111],
505                ),
506                new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
507            ]
508        );
509    }
510}