1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::last_row::{FlatRowGroupLastRowCachedReader, RowGroupLastRowCachedReader};
28use crate::read::{Batch, BatchReader};
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics, RowGroupReader};
32
33#[allow(dead_code)]
34pub enum Source {
35 RowGroup(RowGroupReader),
36 LastRow(RowGroupLastRowCachedReader),
37}
38
39#[allow(dead_code)]
40impl Source {
41 async fn next_batch(&mut self) -> Result<Option<Batch>> {
42 match self {
43 Source::RowGroup(r) => r.next_batch().await,
44 Source::LastRow(r) => r.next_batch().await,
45 }
46 }
47}
48
49#[allow(dead_code)]
50pub struct PruneReader {
51 context: FileRangeContextRef,
53 source: Source,
54 metrics: ReaderMetrics,
55 skip_fields: bool,
57}
58
59#[allow(dead_code)]
60impl PruneReader {
61 pub(crate) fn new_with_row_group_reader(
62 ctx: FileRangeContextRef,
63 reader: RowGroupReader,
64 skip_fields: bool,
65 ) -> Self {
66 Self {
67 context: ctx,
68 source: Source::RowGroup(reader),
69 metrics: Default::default(),
70 skip_fields,
71 }
72 }
73
74 pub(crate) fn new_with_last_row_reader(
75 ctx: FileRangeContextRef,
76 reader: RowGroupLastRowCachedReader,
77 skip_fields: bool,
78 ) -> Self {
79 Self {
80 context: ctx,
81 source: Source::LastRow(reader),
82 metrics: Default::default(),
83 skip_fields,
84 }
85 }
86
87 pub(crate) fn metrics(&self) -> ReaderMetrics {
89 let mut metrics = self.metrics.clone();
90 match &self.source {
91 Source::RowGroup(r) => {
92 metrics.merge_from(r.metrics());
93 }
94 Source::LastRow(r) => {
95 if let Some(inner_metrics) = r.metrics() {
96 metrics.merge_from(inner_metrics);
97 }
98 }
99 }
100
101 metrics
102 }
103
104 pub(crate) async fn next_batch(&mut self) -> Result<Option<Batch>> {
105 while let Some(b) = self.source.next_batch().await? {
106 match self.prune(b)? {
107 Some(b) => {
108 return Ok(Some(b));
109 }
110 None => {
111 continue;
112 }
113 }
114 }
115 Ok(None)
116 }
117
118 fn prune(&mut self, batch: Batch) -> Result<Option<Batch>> {
120 if self.context.filters().is_empty() && !self.context.has_partition_filter() {
122 return Ok(Some(batch));
123 }
124
125 let num_rows_before_filter = batch.num_rows();
126 let Some(batch_filtered) = self.context.precise_filter(batch, self.skip_fields)? else {
127 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
129 return Ok(None);
130 };
131
132 let filtered_rows = num_rows_before_filter - batch_filtered.num_rows();
134 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
135
136 if !batch_filtered.is_empty() {
137 Ok(Some(batch_filtered))
138 } else {
139 Ok(None)
140 }
141 }
142}
143
144pub(crate) struct PruneTimeIterator {
146 iter: BoxedBatchIterator,
147 time_range: FileTimeRange,
148 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
150}
151
152impl PruneTimeIterator {
153 pub(crate) fn new(
155 iter: BoxedBatchIterator,
156 time_range: FileTimeRange,
157 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
158 ) -> Self {
159 Self {
160 iter,
161 time_range,
162 time_filters,
163 }
164 }
165
166 fn prune(&self, batch: Batch) -> Result<Batch> {
168 if batch.is_empty() {
169 return Ok(batch);
170 }
171
172 if self.time_range.0 <= batch.first_timestamp().unwrap()
175 && batch.last_timestamp().unwrap() <= self.time_range.1
176 {
177 return self.prune_by_time_filters(batch, Vec::new());
178 }
179
180 let unit = batch
184 .timestamps()
185 .data_type()
186 .as_timestamp()
187 .unwrap()
188 .unit();
189 let mut mask = Vec::with_capacity(batch.timestamps().len());
190 let timestamps = batch.timestamps_native().unwrap();
191 for ts in timestamps {
192 let ts = Timestamp::new(*ts, unit);
193 if self.time_range.0 <= ts && ts <= self.time_range.1 {
194 mask.push(true);
195 } else {
196 mask.push(false);
197 }
198 }
199
200 self.prune_by_time_filters(batch, mask)
201 }
202
203 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
206 if let Some(filters) = &self.time_filters {
207 let mut mask = BooleanBuffer::new_set(batch.num_rows());
208 for filter in filters.iter() {
209 let result = filter
210 .evaluate_vector(batch.timestamps())
211 .context(RecordBatchSnafu)?;
212 mask = mask.bitand(&result);
213 }
214
215 if !existing_mask.is_empty() {
216 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
217 }
218
219 batch.filter(&BooleanArray::from(mask).into())?;
220 } else if !existing_mask.is_empty() {
221 batch.filter(&BooleanArray::from(existing_mask).into())?;
222 }
223
224 Ok(batch)
225 }
226
227 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
229 while let Some(batch) = self.iter.next() {
230 let batch = batch?;
231 let pruned_batch = self.prune(batch)?;
232 if !pruned_batch.is_empty() {
233 return Ok(Some(pruned_batch));
234 }
235 }
236 Ok(None)
237 }
238}
239
240impl Iterator for PruneTimeIterator {
241 type Item = Result<Batch>;
242
243 fn next(&mut self) -> Option<Self::Item> {
244 self.next_non_empty_batch().transpose()
245 }
246}
247
248pub enum FlatSource {
249 RowGroup(FlatRowGroupReader),
250 LastRow(FlatRowGroupLastRowCachedReader),
251}
252
253impl FlatSource {
254 async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
255 match self {
256 FlatSource::RowGroup(r) => r.next_batch().await,
257 FlatSource::LastRow(r) => r.next_batch().await,
258 }
259 }
260}
261
262pub struct FlatPruneReader {
264 context: FileRangeContextRef,
266 source: FlatSource,
267 metrics: ReaderMetrics,
268 skip_fields: bool,
270}
271
272impl FlatPruneReader {
273 pub(crate) fn new_with_row_group_reader(
274 ctx: FileRangeContextRef,
275 reader: FlatRowGroupReader,
276 skip_fields: bool,
277 ) -> Self {
278 Self {
279 context: ctx,
280 source: FlatSource::RowGroup(reader),
281 metrics: Default::default(),
282 skip_fields,
283 }
284 }
285
286 pub(crate) fn new_with_last_row_reader(
287 ctx: FileRangeContextRef,
288 reader: FlatRowGroupLastRowCachedReader,
289 skip_fields: bool,
290 ) -> Self {
291 Self {
292 context: ctx,
293 source: FlatSource::LastRow(reader),
294 metrics: Default::default(),
295 skip_fields,
296 }
297 }
298
299 pub(crate) fn metrics(&self) -> ReaderMetrics {
301 self.metrics.clone()
302 }
303
304 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
305 loop {
306 let start = std::time::Instant::now();
307 let batch = self.source.next_batch().await?;
308 self.metrics.scan_cost += start.elapsed();
309
310 let Some(record_batch) = batch else {
311 return Ok(None);
312 };
313
314 self.metrics.num_rows += record_batch.num_rows();
316 self.metrics.num_batches += 1;
317
318 match self.prune_flat(record_batch)? {
319 Some(filtered_batch) => {
320 return Ok(Some(filtered_batch));
321 }
322 None => {
323 continue;
324 }
325 }
326 }
327 }
328
329 fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
331 if self.context.filters().is_empty() && !self.context.has_partition_filter() {
333 return Ok(Some(record_batch));
334 }
335
336 let num_rows_before_filter = record_batch.num_rows();
337 let Some(filtered_batch) = self
338 .context
339 .precise_filter_flat(record_batch, self.skip_fields)?
340 else {
341 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
343 return Ok(None);
344 };
345
346 let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
348 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
349
350 if filtered_batch.num_rows() > 0 {
351 Ok(Some(filtered_batch))
352 } else {
353 Ok(None)
354 }
355 }
356}
357
358#[cfg(test)]
359mod tests {
360 use api::v1::OpType;
361 use datafusion_common::ScalarValue;
362 use datafusion_expr::{Expr, col, lit};
363
364 use super::*;
365 use crate::test_util::new_batch;
366
367 #[test]
368 fn test_prune_time_iter_empty() {
369 let input = [];
370 let iter = input.into_iter().map(Ok);
371 let iter = PruneTimeIterator::new(
372 Box::new(iter),
373 (
374 Timestamp::new_millisecond(0),
375 Timestamp::new_millisecond(1000),
376 ),
377 None,
378 );
379 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
380 assert!(actual.is_empty());
381 }
382
383 #[test]
384 fn test_prune_time_iter_filter() {
385 let input = [
386 new_batch(
387 b"k1",
388 &[10, 11],
389 &[20, 20],
390 &[OpType::Put, OpType::Put],
391 &[110, 111],
392 ),
393 new_batch(
394 b"k1",
395 &[15, 16],
396 &[20, 20],
397 &[OpType::Put, OpType::Put],
398 &[115, 116],
399 ),
400 new_batch(
401 b"k1",
402 &[17, 18],
403 &[20, 20],
404 &[OpType::Put, OpType::Put],
405 &[117, 118],
406 ),
407 ];
408
409 let iter = input.clone().into_iter().map(Ok);
410 let iter = PruneTimeIterator::new(
411 Box::new(iter),
412 (
413 Timestamp::new_millisecond(10),
414 Timestamp::new_millisecond(15),
415 ),
416 None,
417 );
418 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
419 assert_eq!(
420 actual,
421 [
422 new_batch(
423 b"k1",
424 &[10, 11],
425 &[20, 20],
426 &[OpType::Put, OpType::Put],
427 &[110, 111],
428 ),
429 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
430 ]
431 );
432
433 let iter = input.clone().into_iter().map(Ok);
434 let iter = PruneTimeIterator::new(
435 Box::new(iter),
436 (
437 Timestamp::new_millisecond(11),
438 Timestamp::new_millisecond(20),
439 ),
440 None,
441 );
442 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
443 assert_eq!(
444 actual,
445 [
446 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
447 new_batch(
448 b"k1",
449 &[15, 16],
450 &[20, 20],
451 &[OpType::Put, OpType::Put],
452 &[115, 116],
453 ),
454 new_batch(
455 b"k1",
456 &[17, 18],
457 &[20, 20],
458 &[OpType::Put, OpType::Put],
459 &[117, 118],
460 ),
461 ]
462 );
463
464 let iter = input.into_iter().map(Ok);
465 let iter = PruneTimeIterator::new(
466 Box::new(iter),
467 (
468 Timestamp::new_millisecond(10),
469 Timestamp::new_millisecond(18),
470 ),
471 None,
472 );
473 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
474 assert_eq!(
475 actual,
476 [
477 new_batch(
478 b"k1",
479 &[10, 11],
480 &[20, 20],
481 &[OpType::Put, OpType::Put],
482 &[110, 111],
483 ),
484 new_batch(
485 b"k1",
486 &[15, 16],
487 &[20, 20],
488 &[OpType::Put, OpType::Put],
489 &[115, 116],
490 ),
491 new_batch(
492 b"k1",
493 &[17, 18],
494 &[20, 20],
495 &[OpType::Put, OpType::Put],
496 &[117, 118],
497 ),
498 ]
499 );
500 }
501
502 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
503 let filters = expr
504 .iter()
505 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
506 .collect();
507 Some(Arc::new(filters))
508 }
509
510 #[test]
511 fn test_prune_time_iter_with_time_filters() {
512 let input = [
513 new_batch(
514 b"k1",
515 &[10, 11],
516 &[20, 20],
517 &[OpType::Put, OpType::Put],
518 &[110, 111],
519 ),
520 new_batch(
521 b"k1",
522 &[15, 16],
523 &[20, 20],
524 &[OpType::Put, OpType::Put],
525 &[115, 116],
526 ),
527 new_batch(
528 b"k1",
529 &[17, 18],
530 &[20, 20],
531 &[OpType::Put, OpType::Put],
532 &[117, 118],
533 ),
534 ];
535
536 let iter = input.clone().into_iter().map(Ok);
537 let time_filters = create_time_filters(&[
539 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
540 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
541 ]);
542 let iter = PruneTimeIterator::new(
543 Box::new(iter),
544 (
545 Timestamp::new_millisecond(10),
546 Timestamp::new_millisecond(20),
547 ),
548 time_filters,
549 );
550 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
551 assert_eq!(
552 actual,
553 [
554 new_batch(
555 b"k1",
556 &[10, 11],
557 &[20, 20],
558 &[OpType::Put, OpType::Put],
559 &[110, 111],
560 ),
561 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
562 ]
563 );
564 }
565
566 #[test]
567 fn test_prune_time_iter_in_range_with_time_filters() {
568 let input = [
569 new_batch(
570 b"k1",
571 &[10, 11],
572 &[20, 20],
573 &[OpType::Put, OpType::Put],
574 &[110, 111],
575 ),
576 new_batch(
577 b"k1",
578 &[15, 16],
579 &[20, 20],
580 &[OpType::Put, OpType::Put],
581 &[115, 116],
582 ),
583 new_batch(
584 b"k1",
585 &[17, 18],
586 &[20, 20],
587 &[OpType::Put, OpType::Put],
588 &[117, 118],
589 ),
590 ];
591
592 let iter = input.clone().into_iter().map(Ok);
593 let time_filters = create_time_filters(&[
595 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
596 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
597 ]);
598 let iter = PruneTimeIterator::new(
599 Box::new(iter),
600 (
601 Timestamp::new_millisecond(5),
602 Timestamp::new_millisecond(18),
603 ),
604 time_filters,
605 );
606 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
607 assert_eq!(
608 actual,
609 [
610 new_batch(
611 b"k1",
612 &[10, 11],
613 &[20, 20],
614 &[OpType::Put, OpType::Put],
615 &[110, 111],
616 ),
617 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
618 ]
619 );
620 }
621}