1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use snafu::ResultExt;
23
24use crate::error::{FilterRecordBatchSnafu, Result};
25use crate::memtable::BoxedBatchIterator;
26use crate::read::last_row::RowGroupLastRowCachedReader;
27use crate::read::{Batch, BatchReader};
28use crate::sst::file::FileTimeRange;
29use crate::sst::parquet::file_range::FileRangeContextRef;
30use crate::sst::parquet::reader::{ReaderMetrics, RowGroupReader};
31
32pub enum Source {
33 RowGroup(RowGroupReader),
34 LastRow(RowGroupLastRowCachedReader),
35}
36
37impl Source {
38 async fn next_batch(&mut self) -> Result<Option<Batch>> {
39 match self {
40 Source::RowGroup(r) => r.next_batch().await,
41 Source::LastRow(r) => r.next_batch().await,
42 }
43 }
44}
45
46pub struct PruneReader {
47 context: FileRangeContextRef,
49 source: Source,
50 metrics: ReaderMetrics,
51}
52
53impl PruneReader {
54 pub(crate) fn new_with_row_group_reader(
55 ctx: FileRangeContextRef,
56 reader: RowGroupReader,
57 ) -> Self {
58 Self {
59 context: ctx,
60 source: Source::RowGroup(reader),
61 metrics: Default::default(),
62 }
63 }
64
65 pub(crate) fn new_with_last_row_reader(
66 ctx: FileRangeContextRef,
67 reader: RowGroupLastRowCachedReader,
68 ) -> Self {
69 Self {
70 context: ctx,
71 source: Source::LastRow(reader),
72 metrics: Default::default(),
73 }
74 }
75
76 pub(crate) fn reset_source(&mut self, source: Source) {
77 self.source = source;
78 }
79
80 pub(crate) fn metrics(&self) -> ReaderMetrics {
82 let mut metrics = self.metrics.clone();
83 match &self.source {
84 Source::RowGroup(r) => {
85 metrics.merge_from(r.metrics());
86 }
87 Source::LastRow(r) => {
88 if let Some(inner_metrics) = r.metrics() {
89 metrics.merge_from(inner_metrics);
90 }
91 }
92 }
93
94 metrics
95 }
96
97 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
98 while let Some(b) = self.source.next_batch().await? {
99 match self.prune(b)? {
100 Some(b) => {
101 return Ok(Some(b));
102 }
103 None => {
104 continue;
105 }
106 }
107 }
108 Ok(None)
109 }
110
111 fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
113 if self.context.filters().is_empty() {
115 return Ok(Some(batch));
116 }
117
118 let num_rows_before_filter = batch.num_rows();
119 let Some(batch_filtered) = self.context.precise_filter(batch)? else {
120 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
122 return Ok(None);
123 };
124
125 let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
127 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
128
129 if !batch_filtered.is_empty() {
130 Ok(Some(batch_filtered))
131 } else {
132 Ok(None)
133 }
134 }
135}
136
137pub(crate) struct PruneTimeIterator {
139 iter: BoxedBatchIterator,
140 time_range: FileTimeRange,
141 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
143}
144
145impl PruneTimeIterator {
146 pub(crate) fn new(
148 iter: BoxedBatchIterator,
149 time_range: FileTimeRange,
150 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
151 ) -> Self {
152 Self {
153 iter,
154 time_range,
155 time_filters,
156 }
157 }
158
159 fn prune(&self, batch: Batch) -> Result<Batch> {
161 if batch.is_empty() {
162 return Ok(batch);
163 }
164
165 if self.time_range.0 <= batch.first_timestamp().unwrap()
168 && batch.last_timestamp().unwrap() <= self.time_range.1
169 {
170 return self.prune_by_time_filters(batch, Vec::new());
171 }
172
173 let unit = batch
177 .timestamps()
178 .data_type()
179 .as_timestamp()
180 .unwrap()
181 .unit();
182 let mut mask = Vec::with_capacity(batch.timestamps().len());
183 let timestamps = batch.timestamps_native().unwrap();
184 for ts in timestamps {
185 let ts = Timestamp::new(*ts, unit);
186 if self.time_range.0 <= ts && ts <= self.time_range.1 {
187 mask.push(true);
188 } else {
189 mask.push(false);
190 }
191 }
192
193 self.prune_by_time_filters(batch, mask)
194 }
195
196 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
199 if let Some(filters) = &self.time_filters {
200 let mut mask = BooleanBuffer::new_set(batch.num_rows());
201 for filter in filters.iter() {
202 let result = filter
203 .evaluate_vector(batch.timestamps())
204 .context(FilterRecordBatchSnafu)?;
205 mask = mask.bitand(&result);
206 }
207
208 if !existing_mask.is_empty() {
209 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
210 }
211
212 batch.filter(&BooleanArray::from(mask).into())?;
213 } else if !existing_mask.is_empty() {
214 batch.filter(&BooleanArray::from(existing_mask).into())?;
215 }
216
217 Ok(batch)
218 }
219
220 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
222 while let Some(batch) = self.iter.next() {
223 let batch = batch?;
224 let pruned_batch = self.prune(batch)?;
225 if !pruned_batch.is_empty() {
226 return Ok(Some(pruned_batch));
227 }
228 }
229 Ok(None)
230 }
231}
232
233impl Iterator for PruneTimeIterator {
234 type Item = Result<Batch>;
235
236 fn next(&mut self) -> Option<Self::Item> {
237 self.next_non_empty_batch().transpose()
238 }
239}
240
241#[cfg(test)]
242mod tests {
243 use api::v1::OpType;
244 use datafusion_common::ScalarValue;
245 use datafusion_expr::{col, lit, Expr};
246
247 use super::*;
248 use crate::test_util::new_batch;
249
250 #[test]
251 fn test_prune_time_iter_empty() {
252 let input = [];
253 let iter = input.into_iter().map(Ok);
254 let iter = PruneTimeIterator::new(
255 Box::new(iter),
256 (
257 Timestamp::new_millisecond(0),
258 Timestamp::new_millisecond(1000),
259 ),
260 None,
261 );
262 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
263 assert!(actual.is_empty());
264 }
265
266 #[test]
267 fn test_prune_time_iter_filter() {
268 let input = [
269 new_batch(
270 b"k1",
271 &[10, 11],
272 &[20, 20],
273 &[OpType::Put, OpType::Put],
274 &[110, 111],
275 ),
276 new_batch(
277 b"k1",
278 &[15, 16],
279 &[20, 20],
280 &[OpType::Put, OpType::Put],
281 &[115, 116],
282 ),
283 new_batch(
284 b"k1",
285 &[17, 18],
286 &[20, 20],
287 &[OpType::Put, OpType::Put],
288 &[117, 118],
289 ),
290 ];
291
292 let iter = input.clone().into_iter().map(Ok);
293 let iter = PruneTimeIterator::new(
294 Box::new(iter),
295 (
296 Timestamp::new_millisecond(10),
297 Timestamp::new_millisecond(15),
298 ),
299 None,
300 );
301 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
302 assert_eq!(
303 actual,
304 [
305 new_batch(
306 b"k1",
307 &[10, 11],
308 &[20, 20],
309 &[OpType::Put, OpType::Put],
310 &[110, 111],
311 ),
312 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
313 ]
314 );
315
316 let iter = input.clone().into_iter().map(Ok);
317 let iter = PruneTimeIterator::new(
318 Box::new(iter),
319 (
320 Timestamp::new_millisecond(11),
321 Timestamp::new_millisecond(20),
322 ),
323 None,
324 );
325 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
326 assert_eq!(
327 actual,
328 [
329 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
330 new_batch(
331 b"k1",
332 &[15, 16],
333 &[20, 20],
334 &[OpType::Put, OpType::Put],
335 &[115, 116],
336 ),
337 new_batch(
338 b"k1",
339 &[17, 18],
340 &[20, 20],
341 &[OpType::Put, OpType::Put],
342 &[117, 118],
343 ),
344 ]
345 );
346
347 let iter = input.into_iter().map(Ok);
348 let iter = PruneTimeIterator::new(
349 Box::new(iter),
350 (
351 Timestamp::new_millisecond(10),
352 Timestamp::new_millisecond(18),
353 ),
354 None,
355 );
356 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
357 assert_eq!(
358 actual,
359 [
360 new_batch(
361 b"k1",
362 &[10, 11],
363 &[20, 20],
364 &[OpType::Put, OpType::Put],
365 &[110, 111],
366 ),
367 new_batch(
368 b"k1",
369 &[15, 16],
370 &[20, 20],
371 &[OpType::Put, OpType::Put],
372 &[115, 116],
373 ),
374 new_batch(
375 b"k1",
376 &[17, 18],
377 &[20, 20],
378 &[OpType::Put, OpType::Put],
379 &[117, 118],
380 ),
381 ]
382 );
383 }
384
385 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
386 let filters = expr
387 .iter()
388 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
389 .collect();
390 Some(Arc::new(filters))
391 }
392
393 #[test]
394 fn test_prune_time_iter_with_time_filters() {
395 let input = [
396 new_batch(
397 b"k1",
398 &[10, 11],
399 &[20, 20],
400 &[OpType::Put, OpType::Put],
401 &[110, 111],
402 ),
403 new_batch(
404 b"k1",
405 &[15, 16],
406 &[20, 20],
407 &[OpType::Put, OpType::Put],
408 &[115, 116],
409 ),
410 new_batch(
411 b"k1",
412 &[17, 18],
413 &[20, 20],
414 &[OpType::Put, OpType::Put],
415 &[117, 118],
416 ),
417 ];
418
419 let iter = input.clone().into_iter().map(Ok);
420 let time_filters = create_time_filters(&[
422 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
423 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
424 ]);
425 let iter = PruneTimeIterator::new(
426 Box::new(iter),
427 (
428 Timestamp::new_millisecond(10),
429 Timestamp::new_millisecond(20),
430 ),
431 time_filters,
432 );
433 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
434 assert_eq!(
435 actual,
436 [
437 new_batch(
438 b"k1",
439 &[10, 11],
440 &[20, 20],
441 &[OpType::Put, OpType::Put],
442 &[110, 111],
443 ),
444 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
445 ]
446 );
447 }
448
449 #[test]
450 fn test_prune_time_iter_in_range_with_time_filters() {
451 let input = [
452 new_batch(
453 b"k1",
454 &[10, 11],
455 &[20, 20],
456 &[OpType::Put, OpType::Put],
457 &[110, 111],
458 ),
459 new_batch(
460 b"k1",
461 &[15, 16],
462 &[20, 20],
463 &[OpType::Put, OpType::Put],
464 &[115, 116],
465 ),
466 new_batch(
467 b"k1",
468 &[17, 18],
469 &[20, 20],
470 &[OpType::Put, OpType::Put],
471 &[117, 118],
472 ),
473 ];
474
475 let iter = input.clone().into_iter().map(Ok);
476 let time_filters = create_time_filters(&[
478 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
479 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
480 ]);
481 let iter = PruneTimeIterator::new(
482 Box::new(iter),
483 (
484 Timestamp::new_millisecond(5),
485 Timestamp::new_millisecond(18),
486 ),
487 time_filters,
488 );
489 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
490 assert_eq!(
491 actual,
492 [
493 new_batch(
494 b"k1",
495 &[10, 11],
496 &[20, 20],
497 &[OpType::Put, OpType::Put],
498 &[110, 111],
499 ),
500 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
501 ]
502 );
503 }
504}