1use std::ops::BitAnd;
16use std::sync::Arc;
17
18use common_recordbatch::filter::SimpleFilterEvaluator;
19use common_time::Timestamp;
20use datatypes::arrow::array::BooleanArray;
21use datatypes::arrow::buffer::BooleanBuffer;
22use datatypes::arrow::record_batch::RecordBatch;
23use snafu::ResultExt;
24
25use crate::error::{RecordBatchSnafu, Result};
26use crate::memtable::BoxedBatchIterator;
27use crate::read::Batch;
28use crate::read::last_row::FlatRowGroupLastRowCachedReader;
29use crate::sst::file::FileTimeRange;
30use crate::sst::parquet::file_range::FileRangeContextRef;
31use crate::sst::parquet::reader::{FlatRowGroupReader, ReaderMetrics};
32
33pub(crate) struct PruneTimeIterator {
35 iter: BoxedBatchIterator,
36 time_range: FileTimeRange,
37 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
39}
40
41impl PruneTimeIterator {
42 pub(crate) fn new(
44 iter: BoxedBatchIterator,
45 time_range: FileTimeRange,
46 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
47 ) -> Self {
48 Self {
49 iter,
50 time_range,
51 time_filters,
52 }
53 }
54
55 fn prune(&self, batch: Batch) -> Result<Batch> {
57 if batch.is_empty() {
58 return Ok(batch);
59 }
60
61 if self.time_range.0 <= batch.first_timestamp().unwrap()
64 && batch.last_timestamp().unwrap() <= self.time_range.1
65 {
66 return self.prune_by_time_filters(batch, Vec::new());
67 }
68
69 let unit = batch
73 .timestamps()
74 .data_type()
75 .as_timestamp()
76 .unwrap()
77 .unit();
78 let mut mask = Vec::with_capacity(batch.timestamps().len());
79 let timestamps = batch.timestamps_native().unwrap();
80 for ts in timestamps {
81 let ts = Timestamp::new(*ts, unit);
82 if self.time_range.0 <= ts && ts <= self.time_range.1 {
83 mask.push(true);
84 } else {
85 mask.push(false);
86 }
87 }
88
89 self.prune_by_time_filters(batch, mask)
90 }
91
92 fn prune_by_time_filters(&self, mut batch: Batch, existing_mask: Vec<bool>) -> Result<Batch> {
95 if let Some(filters) = &self.time_filters {
96 let mut mask = BooleanBuffer::new_set(batch.num_rows());
97 for filter in filters.iter() {
98 let result = filter
99 .evaluate_vector(batch.timestamps())
100 .context(RecordBatchSnafu)?;
101 mask = mask.bitand(&result);
102 }
103
104 if !existing_mask.is_empty() {
105 mask = mask.bitand(&BooleanBuffer::from(existing_mask));
106 }
107
108 batch.filter(&BooleanArray::from(mask).into())?;
109 } else if !existing_mask.is_empty() {
110 batch.filter(&BooleanArray::from(existing_mask).into())?;
111 }
112
113 Ok(batch)
114 }
115
116 fn next_non_empty_batch(&mut self) -> Result<Option<Batch>> {
118 while let Some(batch) = self.iter.next() {
119 let batch = batch?;
120 let pruned_batch = self.prune(batch)?;
121 if !pruned_batch.is_empty() {
122 return Ok(Some(pruned_batch));
123 }
124 }
125 Ok(None)
126 }
127}
128
129impl Iterator for PruneTimeIterator {
130 type Item = Result<Batch>;
131
132 fn next(&mut self) -> Option<Self::Item> {
133 self.next_non_empty_batch().transpose()
134 }
135}
136
137pub enum FlatSource {
138 RowGroup(FlatRowGroupReader),
139 LastRow(FlatRowGroupLastRowCachedReader),
140}
141
142impl FlatSource {
143 async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
144 match self {
145 FlatSource::RowGroup(r) => r.next_batch().await,
146 FlatSource::LastRow(r) => r.next_batch().await,
147 }
148 }
149}
150
151pub struct FlatPruneReader {
153 context: FileRangeContextRef,
155 source: FlatSource,
156 metrics: ReaderMetrics,
157 skip_fields: bool,
159}
160
161impl FlatPruneReader {
162 pub(crate) fn new_with_row_group_reader(
163 ctx: FileRangeContextRef,
164 reader: FlatRowGroupReader,
165 skip_fields: bool,
166 ) -> Self {
167 Self {
168 context: ctx,
169 source: FlatSource::RowGroup(reader),
170 metrics: Default::default(),
171 skip_fields,
172 }
173 }
174
175 pub(crate) fn new_with_last_row_reader(
176 ctx: FileRangeContextRef,
177 reader: FlatRowGroupLastRowCachedReader,
178 skip_fields: bool,
179 ) -> Self {
180 Self {
181 context: ctx,
182 source: FlatSource::LastRow(reader),
183 metrics: Default::default(),
184 skip_fields,
185 }
186 }
187
188 pub(crate) fn metrics(&self) -> ReaderMetrics {
190 self.metrics.clone()
191 }
192
193 pub(crate) async fn next_batch(&mut self) -> Result<Option<RecordBatch>> {
194 loop {
195 let start = std::time::Instant::now();
196 let batch = self.source.next_batch().await?;
197 self.metrics.scan_cost += start.elapsed();
198
199 let Some(record_batch) = batch else {
200 return Ok(None);
201 };
202
203 self.metrics.num_rows += record_batch.num_rows();
205 self.metrics.num_batches += 1;
206
207 match self.prune_flat(record_batch)? {
208 Some(filtered_batch) => {
209 return Ok(Some(filtered_batch));
210 }
211 None => {
212 continue;
213 }
214 }
215 }
216 }
217
218 fn prune_flat(&mut self, record_batch: RecordBatch) -> Result<Option<RecordBatch>> {
220 if self.context.filters().is_empty() && !self.context.has_partition_filter() {
222 return Ok(Some(record_batch));
223 }
224
225 let num_rows_before_filter = record_batch.num_rows();
226 let Some(filtered_batch) = self
227 .context
228 .precise_filter_flat(record_batch, self.skip_fields)?
229 else {
230 self.metrics.filter_metrics.rows_precise_filtered += num_rows_before_filter;
232 return Ok(None);
233 };
234
235 let filtered_rows = num_rows_before_filter - filtered_batch.num_rows();
237 self.metrics.filter_metrics.rows_precise_filtered += filtered_rows;
238
239 if filtered_batch.num_rows() > 0 {
240 Ok(Some(filtered_batch))
241 } else {
242 Ok(None)
243 }
244 }
245}
246
247#[cfg(test)]
248mod tests {
249 use api::v1::OpType;
250 use datafusion_common::ScalarValue;
251 use datafusion_expr::{Expr, col, lit};
252
253 use super::*;
254 use crate::test_util::new_batch;
255
256 #[test]
257 fn test_prune_time_iter_empty() {
258 let input = [];
259 let iter = input.into_iter().map(Ok);
260 let iter = PruneTimeIterator::new(
261 Box::new(iter),
262 (
263 Timestamp::new_millisecond(0),
264 Timestamp::new_millisecond(1000),
265 ),
266 None,
267 );
268 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
269 assert!(actual.is_empty());
270 }
271
272 #[test]
273 fn test_prune_time_iter_filter() {
274 let input = [
275 new_batch(
276 b"k1",
277 &[10, 11],
278 &[20, 20],
279 &[OpType::Put, OpType::Put],
280 &[110, 111],
281 ),
282 new_batch(
283 b"k1",
284 &[15, 16],
285 &[20, 20],
286 &[OpType::Put, OpType::Put],
287 &[115, 116],
288 ),
289 new_batch(
290 b"k1",
291 &[17, 18],
292 &[20, 20],
293 &[OpType::Put, OpType::Put],
294 &[117, 118],
295 ),
296 ];
297
298 let iter = input.clone().into_iter().map(Ok);
299 let iter = PruneTimeIterator::new(
300 Box::new(iter),
301 (
302 Timestamp::new_millisecond(10),
303 Timestamp::new_millisecond(15),
304 ),
305 None,
306 );
307 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
308 assert_eq!(
309 actual,
310 [
311 new_batch(
312 b"k1",
313 &[10, 11],
314 &[20, 20],
315 &[OpType::Put, OpType::Put],
316 &[110, 111],
317 ),
318 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
319 ]
320 );
321
322 let iter = input.clone().into_iter().map(Ok);
323 let iter = PruneTimeIterator::new(
324 Box::new(iter),
325 (
326 Timestamp::new_millisecond(11),
327 Timestamp::new_millisecond(20),
328 ),
329 None,
330 );
331 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
332 assert_eq!(
333 actual,
334 [
335 new_batch(b"k1", &[11], &[20], &[OpType::Put], &[111],),
336 new_batch(
337 b"k1",
338 &[15, 16],
339 &[20, 20],
340 &[OpType::Put, OpType::Put],
341 &[115, 116],
342 ),
343 new_batch(
344 b"k1",
345 &[17, 18],
346 &[20, 20],
347 &[OpType::Put, OpType::Put],
348 &[117, 118],
349 ),
350 ]
351 );
352
353 let iter = input.into_iter().map(Ok);
354 let iter = PruneTimeIterator::new(
355 Box::new(iter),
356 (
357 Timestamp::new_millisecond(10),
358 Timestamp::new_millisecond(18),
359 ),
360 None,
361 );
362 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
363 assert_eq!(
364 actual,
365 [
366 new_batch(
367 b"k1",
368 &[10, 11],
369 &[20, 20],
370 &[OpType::Put, OpType::Put],
371 &[110, 111],
372 ),
373 new_batch(
374 b"k1",
375 &[15, 16],
376 &[20, 20],
377 &[OpType::Put, OpType::Put],
378 &[115, 116],
379 ),
380 new_batch(
381 b"k1",
382 &[17, 18],
383 &[20, 20],
384 &[OpType::Put, OpType::Put],
385 &[117, 118],
386 ),
387 ]
388 );
389 }
390
391 fn create_time_filters(expr: &[Expr]) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
392 let filters = expr
393 .iter()
394 .map(|expr| SimpleFilterEvaluator::try_new(expr).unwrap())
395 .collect();
396 Some(Arc::new(filters))
397 }
398
399 #[test]
400 fn test_prune_time_iter_with_time_filters() {
401 let input = [
402 new_batch(
403 b"k1",
404 &[10, 11],
405 &[20, 20],
406 &[OpType::Put, OpType::Put],
407 &[110, 111],
408 ),
409 new_batch(
410 b"k1",
411 &[15, 16],
412 &[20, 20],
413 &[OpType::Put, OpType::Put],
414 &[115, 116],
415 ),
416 new_batch(
417 b"k1",
418 &[17, 18],
419 &[20, 20],
420 &[OpType::Put, OpType::Put],
421 &[117, 118],
422 ),
423 ];
424
425 let iter = input.clone().into_iter().map(Ok);
426 let time_filters = create_time_filters(&[
428 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
429 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
430 ]);
431 let iter = PruneTimeIterator::new(
432 Box::new(iter),
433 (
434 Timestamp::new_millisecond(10),
435 Timestamp::new_millisecond(20),
436 ),
437 time_filters,
438 );
439 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
440 assert_eq!(
441 actual,
442 [
443 new_batch(
444 b"k1",
445 &[10, 11],
446 &[20, 20],
447 &[OpType::Put, OpType::Put],
448 &[110, 111],
449 ),
450 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
451 ]
452 );
453 }
454
455 #[test]
456 fn test_prune_time_iter_in_range_with_time_filters() {
457 let input = [
458 new_batch(
459 b"k1",
460 &[10, 11],
461 &[20, 20],
462 &[OpType::Put, OpType::Put],
463 &[110, 111],
464 ),
465 new_batch(
466 b"k1",
467 &[15, 16],
468 &[20, 20],
469 &[OpType::Put, OpType::Put],
470 &[115, 116],
471 ),
472 new_batch(
473 b"k1",
474 &[17, 18],
475 &[20, 20],
476 &[OpType::Put, OpType::Put],
477 &[117, 118],
478 ),
479 ];
480
481 let iter = input.clone().into_iter().map(Ok);
482 let time_filters = create_time_filters(&[
484 col("ts").gt_eq(lit(ScalarValue::TimestampMillisecond(Some(10), None))),
485 col("ts").lt(lit(ScalarValue::TimestampMillisecond(Some(16), None))),
486 ]);
487 let iter = PruneTimeIterator::new(
488 Box::new(iter),
489 (
490 Timestamp::new_millisecond(5),
491 Timestamp::new_millisecond(18),
492 ),
493 time_filters,
494 );
495 let actual: Vec<_> = iter.map(|batch| batch.unwrap()).collect();
496 assert_eq!(
497 actual,
498 [
499 new_batch(
500 b"k1",
501 &[10, 11],
502 &[20, 20],
503 &[OpType::Put, OpType::Put],
504 &[110, 111],
505 ),
506 new_batch(b"k1", &[15], &[20], &[OpType::Put], &[115],),
507 ]
508 );
509 }
510}