1use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use common_time::range::TimestampRange;
23use datatypes::arrow::compute::concat_batches;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::region_engine::PartitionRange;
29use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector};
30use tokio::sync::{mpsc, oneshot};
31
32use crate::cache::CacheStrategy;
33use crate::error::{ComputeArrowSnafu, Result};
34use crate::read::BoxedRecordBatchStream;
35use crate::read::read_columns::ReadColumns;
36use crate::read::scan_region::StreamContext;
37use crate::read::scan_util::PartitionMetrics;
38use crate::region::options::MergeMode;
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
41
42const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub(crate) struct ScanRequestFingerprint {
52 inner: Arc<SharedScanRequestFingerprint>,
54 time_filters: Option<Arc<Vec<String>>>,
56 series_row_selector: Option<TimeSeriesRowSelector>,
57 append_mode: bool,
58 filter_deleted: bool,
59 merge_mode: MergeMode,
60 partition_expr_version: u64,
63}
64
65#[derive(Debug)]
66pub(crate) struct ScanRequestFingerprintBuilder {
67 pub(crate) read_columns: ReadColumns,
68 pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
69 pub(crate) filters: Vec<String>,
70 pub(crate) time_filters: Vec<String>,
71 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
72 pub(crate) append_mode: bool,
73 pub(crate) filter_deleted: bool,
74 pub(crate) merge_mode: MergeMode,
75 pub(crate) partition_expr_version: u64,
76}
77
78impl ScanRequestFingerprintBuilder {
79 pub(crate) fn build(self) -> ScanRequestFingerprint {
80 let Self {
81 read_columns,
82 read_column_types,
83 filters,
84 time_filters,
85 series_row_selector,
86 append_mode,
87 filter_deleted,
88 merge_mode,
89 partition_expr_version,
90 } = self;
91
92 ScanRequestFingerprint {
93 inner: Arc::new(SharedScanRequestFingerprint {
94 read_columns,
95 read_column_types,
96 filters,
97 }),
98 time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
99 series_row_selector,
100 append_mode,
101 filter_deleted,
102 merge_mode,
103 partition_expr_version,
104 }
105 }
106}
107
108#[derive(Debug, PartialEq, Eq, Hash)]
110struct SharedScanRequestFingerprint {
111 read_columns: ReadColumns,
113 read_column_types: Vec<Option<ConcreteDataType>>,
116 filters: Vec<String>,
118}
119
120impl ScanRequestFingerprint {
121 #[cfg(test)]
122 pub(crate) fn read_columns(&self) -> &ReadColumns {
123 &self.inner.read_columns
124 }
125
126 #[cfg(test)]
127 pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
128 &self.inner.read_column_types
129 }
130
131 #[cfg(test)]
132 pub(crate) fn filters(&self) -> &[String] {
133 &self.inner.filters
134 }
135
136 #[cfg(test)]
137 pub(crate) fn time_filters(&self) -> &[String] {
138 self.time_filters
139 .as_deref()
140 .map(Vec::as_slice)
141 .unwrap_or(&[])
142 }
143
144 pub(crate) fn without_time_filters(&self) -> Self {
145 Self {
146 inner: Arc::clone(&self.inner),
147 time_filters: None,
148 series_row_selector: self.series_row_selector,
149 append_mode: self.append_mode,
150 filter_deleted: self.filter_deleted,
151 merge_mode: self.merge_mode,
152 partition_expr_version: self.partition_expr_version,
153 }
154 }
155
156 pub(crate) fn estimated_size(&self) -> usize {
157 mem::size_of::<SharedScanRequestFingerprint>()
158 + self.inner.read_columns.estimated_size()
159 + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
160 + self.inner.filters.capacity() * mem::size_of::<String>()
161 + self
162 .inner
163 .filters
164 .iter()
165 .map(|filter| filter.capacity())
166 .sum::<usize>()
167 + self.time_filters.as_ref().map_or(0, |filters| {
168 mem::size_of::<Vec<String>>()
169 + filters.capacity() * mem::size_of::<String>()
170 + filters
171 .iter()
172 .map(|filter| filter.capacity())
173 .sum::<usize>()
174 })
175 }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub(crate) struct RangeScanCacheKey {
181 pub(crate) region_id: RegionId,
182 pub(crate) row_groups: Vec<(FileId, i64)>,
184 pub(crate) scan: ScanRequestFingerprint,
185}
186
187impl RangeScanCacheKey {
188 pub(crate) fn estimated_size(&self) -> usize {
189 mem::size_of::<Self>()
190 + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
191 + self.scan.estimated_size()
192 }
193}
194
195#[derive(Debug)]
197pub(crate) struct CachedBatchSlice {
198 batch: RecordBatch,
199 slice_lengths: Vec<usize>,
200}
201
202impl CachedBatchSlice {
203 fn metadata_size(&self) -> usize {
204 self.slice_lengths.capacity() * mem::size_of::<usize>()
205 }
206}
207
208pub(crate) struct RangeScanCacheValue {
209 cached_batches: Vec<CachedBatchSlice>,
210 estimated_batches_size: usize,
212}
213
214impl RangeScanCacheValue {
215 pub(crate) fn new(
216 cached_batches: Vec<CachedBatchSlice>,
217 estimated_batches_size: usize,
218 ) -> Self {
219 Self {
220 cached_batches,
221 estimated_batches_size,
222 }
223 }
224
225 pub(crate) fn estimated_size(&self) -> usize {
226 mem::size_of::<Self>()
227 + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
228 + self
229 .cached_batches
230 .iter()
231 .map(CachedBatchSlice::metadata_size)
232 .sum::<usize>()
233 + self.estimated_batches_size
234 }
235}
236
237pub(crate) struct PartitionRangeRowGroups {
239 pub(crate) row_groups: Vec<(FileId, i64)>,
241 pub(crate) only_file_sources: bool,
242}
243
244pub(crate) fn collect_partition_range_row_groups(
246 stream_ctx: &StreamContext,
247 part_range: &PartitionRange,
248) -> PartitionRangeRowGroups {
249 let range_meta = &stream_ctx.ranges[part_range.identifier];
250 let mut row_groups = Vec::new();
251 let mut only_file_sources = true;
252
253 for index in &range_meta.row_group_indices {
254 if stream_ctx.is_file_range_index(*index) {
255 let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
256 row_groups.push((file_id, index.row_group_index));
257 } else {
258 only_file_sources = false;
259 }
260 }
261
262 row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
263
264 PartitionRangeRowGroups {
265 row_groups,
266 only_file_sources,
267 }
268}
269
270pub(crate) fn build_range_cache_key(
272 stream_ctx: &StreamContext,
273 part_range: &PartitionRange,
274) -> Option<RangeScanCacheKey> {
275 if !stream_ctx.input.cache_strategy.has_range_result_cache() {
276 return None;
277 }
278
279 let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
280
281 let has_dyn_filters = stream_ctx
283 .input
284 .predicate_group()
285 .predicate_without_region()
286 .is_some_and(|p| !p.dyn_filters().is_empty());
287 if has_dyn_filters {
288 return None;
289 }
290
291 let rg = collect_partition_range_row_groups(stream_ctx, part_range);
292 if !rg.only_file_sources || rg.row_groups.is_empty() {
293 return None;
294 }
295
296 let range_meta = &stream_ctx.ranges[part_range.identifier];
297 let scan = if query_time_range_covers_partition_range(
298 stream_ctx.input.time_range.as_ref(),
299 range_meta.time_range,
300 ) {
301 fingerprint.without_time_filters()
302 } else {
303 fingerprint.clone()
304 };
305
306 Some(RangeScanCacheKey {
307 region_id: stream_ctx.input.region_metadata().region_id,
308 row_groups: rg.row_groups,
309 scan,
310 })
311}
312
313fn query_time_range_covers_partition_range(
314 query_time_range: Option<&TimestampRange>,
315 partition_time_range: FileTimeRange,
316) -> bool {
317 let Some(query_time_range) = query_time_range else {
318 return true;
319 };
320
321 let (part_start, part_end) = partition_time_range;
322 query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
323}
324
325pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
327 Box::pin(try_stream! {
328 for cached_batch in &value.cached_batches {
329 let mut offset = 0;
330 for &len in &cached_batch.slice_lengths {
331 yield cached_batch.batch.slice(offset, len);
332 offset += len;
333 }
334 }
335 })
336}
337
338enum CacheConcatCommand {
339 Compact(Vec<RecordBatch>),
340 Finish {
341 pending: Vec<RecordBatch>,
342 key: RangeScanCacheKey,
343 cache_strategy: CacheStrategy,
344 part_metrics: PartitionMetrics,
345 result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
346 },
347}
348
349#[derive(Default)]
350struct CacheConcatState {
351 cached_batches: Vec<CachedBatchSlice>,
352 estimated_size: usize,
353}
354
355impl CacheConcatState {
356 async fn compact(
357 &mut self,
358 batches: Vec<RecordBatch>,
359 limiter: &crate::cache::RangeResultMemoryLimiter,
360 ) -> Result<()> {
361 if batches.is_empty() {
362 return Ok(());
363 }
364
365 let input_size = batches
366 .iter()
367 .map(RecordBatch::get_array_memory_size)
368 .sum::<usize>();
369 let _permit = limiter.acquire(input_size).await?;
370
371 let compacted = compact_record_batches(batches)?;
372 self.estimated_size += compacted.batch.get_array_memory_size();
373 self.cached_batches.push(compacted);
374 Ok(())
375 }
376
377 fn finish(self) -> RangeScanCacheValue {
378 RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
379 }
380}
381
382fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
383 debug_assert!(!batches.is_empty());
384
385 let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
386 build_cached_batch_slice(batches, slice_lengths)
387}
388
389fn build_cached_batch_slice(
390 batches: Vec<RecordBatch>,
391 slice_lengths: Vec<usize>,
392) -> Result<CachedBatchSlice> {
393 let batch = if batches.len() == 1 {
394 batches.into_iter().next().unwrap()
395 } else {
396 let schema = batches[0].schema();
397 concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
398 };
399
400 Ok(CachedBatchSlice {
401 batch,
402 slice_lengths,
403 })
404}
405
406async fn run_cache_concat_task(
407 mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
408 limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
409 skip_threshold_bytes: usize,
410) {
411 let mut state = CacheConcatState::default();
412
413 while let Some(cmd) = rx.recv().await {
414 match cmd {
415 CacheConcatCommand::Compact(batches) => {
416 if let Err(err) = state.compact(batches, &limiter).await {
417 warn!(err; "Failed to compact range cache batches");
418 return;
419 }
420 if state.estimated_size > skip_threshold_bytes {
423 return;
424 }
425 }
426 CacheConcatCommand::Finish {
427 pending,
428 key,
429 cache_strategy,
430 part_metrics,
431 result_tx,
432 } => {
433 let compact_result = state
434 .compact(pending, &limiter)
435 .await
436 .map(|()| state.finish());
437 let result = match compact_result {
438 Ok(v) => {
439 let value = Arc::new(v);
440 part_metrics
441 .inc_range_cache_size(key.estimated_size() + value.estimated_size());
442 cache_strategy.put_range_result(key, value.clone());
443
444 Ok(value)
445 }
446 Err(e) => {
447 warn!(e; "Failed to finalize range cache batches");
448
449 Err(e)
450 }
451 };
452
453 if let Some(tx) = result_tx {
454 let _ = tx.send(result);
455 }
456
457 break;
458 }
459 }
460 }
461}
462
463struct CacheBatchBuffer {
464 buffered_batches: Vec<RecordBatch>,
465 buffered_rows: usize,
466 buffered_size: usize,
467 sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
468}
469
470impl CacheBatchBuffer {
471 fn new(cache_strategy: &CacheStrategy) -> Self {
472 let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
473 let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
474 let (tx, rx) = mpsc::unbounded_channel();
475 common_runtime::spawn_global(run_cache_concat_task(
476 rx,
477 limiter.clone(),
478 skip_threshold_bytes,
479 ));
480 tx
481 });
482
483 Self {
484 buffered_batches: Vec::new(),
485 buffered_rows: 0,
486 buffered_size: 0,
487 sender,
488 }
489 }
490
491 fn push(&mut self, batch: RecordBatch) -> Result<()> {
492 if self.sender.is_none() {
493 return Ok(());
494 }
495
496 self.buffered_rows += batch.num_rows();
497 self.buffered_size += batch.get_array_memory_size();
498 self.buffered_batches.push(batch);
499
500 if self.buffered_batches.len() > 1
501 && (self.buffered_rows > DEFAULT_READ_BATCH_SIZE
502 || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES)
503 {
504 self.notify_compact();
505 }
506
507 Ok(())
508 }
509
510 fn notify_compact(&mut self) {
511 if self.buffered_batches.is_empty() || self.sender.is_none() {
512 return;
513 }
514
515 let batches = mem::take(&mut self.buffered_batches);
516 self.buffered_rows = 0;
517 self.buffered_size = 0;
518
519 let Some(sender) = &self.sender else {
520 return;
521 };
522 if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
523 self.sender = None;
524 }
525 }
526
527 fn finish(
528 mut self,
529 key: RangeScanCacheKey,
530 cache_strategy: CacheStrategy,
531 part_metrics: PartitionMetrics,
532 result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
533 ) {
534 let Some(sender) = self.sender.take() else {
535 return;
536 };
537
538 if sender
539 .send(CacheConcatCommand::Finish {
540 pending: mem::take(&mut self.buffered_batches),
541 key,
542 cache_strategy,
543 part_metrics,
544 result_tx,
545 })
546 .is_err()
547 {
548 self.sender = None;
549 }
550 }
551}
552
553pub(crate) fn cache_flat_range_stream(
555 mut stream: BoxedRecordBatchStream,
556 cache_strategy: CacheStrategy,
557 key: RangeScanCacheKey,
558 part_metrics: PartitionMetrics,
559) -> BoxedRecordBatchStream {
560 Box::pin(try_stream! {
561 let mut buffer = CacheBatchBuffer::new(&cache_strategy);
562 while let Some(batch) = stream.try_next().await? {
563 buffer.push(batch.clone())?;
564 yield batch;
565 }
566
567 buffer.finish(key, cache_strategy, part_metrics, None);
568 })
569}
570
571#[cfg(feature = "test")]
576pub fn bench_cache_flat_range_stream(
577 stream: BoxedRecordBatchStream,
578 cache_size_bytes: u64,
579 region_id: RegionId,
580) -> BoxedRecordBatchStream {
581 use std::time::Instant;
582
583 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
584
585 use crate::region::options::MergeMode;
586
587 let cache_manager = Arc::new(
588 crate::cache::CacheManager::builder()
589 .range_result_cache_size(cache_size_bytes)
590 .build(),
591 );
592 let cache_strategy = CacheStrategy::EnableAll(cache_manager);
593
594 let fingerprint = ScanRequestFingerprintBuilder {
595 read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
596 read_column_types: vec![],
597 filters: vec![],
598 time_filters: vec![],
599 series_row_selector: None,
600 append_mode: false,
601 filter_deleted: false,
602 merge_mode: MergeMode::LastRow,
603 partition_expr_version: 0,
604 }
605 .build();
606
607 let key = RangeScanCacheKey {
608 region_id,
609 row_groups: vec![],
610 scan: fingerprint,
611 };
612
613 let metrics_set = ExecutionPlanMetricsSet::new();
614 let part_metrics =
615 PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
616
617 cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
618}
619
620#[cfg(test)]
621mod tests {
622 use std::sync::Arc;
623 use std::time::Instant;
624
625 use common_time::Timestamp;
626 use common_time::range::TimestampRange;
627 use common_time::timestamp::TimeUnit;
628 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
629 use datafusion_common::ScalarValue;
630 use datafusion_expr::{Expr, col, lit};
631 use smallvec::smallvec;
632 use store_api::storage::{FileId, RegionId};
633
634 use super::*;
635 use crate::cache::CacheManager;
636 use crate::read::flat_projection::FlatProjectionMapper;
637 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
638 use crate::read::scan_region::{PredicateGroup, ScanInput};
639 use crate::test_util::memtable_util::metadata_with_primary_key;
640 use crate::test_util::scheduler_util::SchedulerEnv;
641 use crate::test_util::sst_util::sst_file_handle_with_file_id;
642
643 fn test_cache_strategy() -> CacheStrategy {
644 CacheStrategy::EnableAll(Arc::new(
645 CacheManager::builder()
646 .range_result_cache_size(1024 * 1024)
647 .build(),
648 ))
649 }
650
651 fn test_scan_fingerprint(
652 filters: Vec<String>,
653 time_filters: Vec<String>,
654 series_row_selector: Option<TimeSeriesRowSelector>,
655 filter_deleted: bool,
656 partition_expr_version: u64,
657 ) -> ScanRequestFingerprint {
658 let read_columns = ReadColumns::from_deduped_column_ids([1, 2]);
659 ScanRequestFingerprintBuilder {
660 read_columns,
661 read_column_types: vec![None, None],
662 filters,
663 time_filters,
664 series_row_selector,
665 append_mode: false,
666 filter_deleted,
667 merge_mode: MergeMode::LastRow,
668 partition_expr_version,
669 }
670 .build()
671 }
672
673 fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
674 let region_id = RegionId::new(1, 1);
675 let key = RangeScanCacheKey {
676 region_id,
677 row_groups: vec![],
678 scan: test_scan_fingerprint(vec![], vec![], None, false, 0),
679 };
680
681 let metrics_set = ExecutionPlanMetricsSet::new();
682 let part_metrics =
683 PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
684
685 assert!(strategy.get_range_result(&key).is_none());
686 (key, part_metrics)
687 }
688
689 async fn finish_cache_batch_buffer(
690 buffer: CacheBatchBuffer,
691 key: RangeScanCacheKey,
692 cache_strategy: CacheStrategy,
693 part_metrics: PartitionMetrics,
694 ) -> Result<Arc<RangeScanCacheValue>> {
695 let (tx, rx) = oneshot::channel();
696 common_telemetry::info!("finish start");
697 buffer.finish(key, cache_strategy, part_metrics, Some(tx));
698 common_telemetry::info!("finish end");
699 rx.await.context(crate::error::RecvSnafu)?
700 }
701
702 async fn new_stream_context(
703 filters: Vec<Expr>,
704 query_time_range: Option<TimestampRange>,
705 partition_time_range: FileTimeRange,
706 ) -> (StreamContext, PartitionRange) {
707 let env = SchedulerEnv::new().await;
708 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
709 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
710 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
711 let file_id = FileId::random();
712 let file = sst_file_handle_with_file_id(
713 file_id,
714 partition_time_range.0.value(),
715 partition_time_range.1.value(),
716 );
717 let input = ScanInput::new(env.access_layer.clone(), mapper)
718 .with_predicate(predicate)
719 .with_time_range(query_time_range)
720 .with_files(vec![file])
721 .with_cache(test_cache_strategy());
722 let range_meta = RangeMeta {
723 time_range: partition_time_range,
724 indices: smallvec![SourceIndex {
725 index: 0,
726 num_row_groups: 1,
727 }],
728 row_group_indices: smallvec![RowGroupIndex {
729 index: 0,
730 row_group_index: 0,
731 }],
732 num_rows: 10,
733 };
734 let partition_range = range_meta.new_partition_range(0);
735 let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
736 let stream_ctx = StreamContext {
737 input,
738 ranges: vec![range_meta],
739 scan_fingerprint,
740 query_start: Instant::now(),
741 };
742
743 (stream_ctx, partition_range)
744 }
745
746 fn ts_lit(val: i64) -> Expr {
748 lit(ScalarValue::TimestampMillisecond(Some(val), None))
749 }
750
751 fn normalized_exprs(exprs: impl IntoIterator<Item = Expr>) -> Vec<String> {
752 let mut exprs = exprs
753 .into_iter()
754 .map(|expr| expr.to_string())
755 .collect::<Vec<_>>();
756 exprs.sort_unstable();
757 exprs
758 }
759
760 async fn assert_range_cache_filters(
761 filters: Vec<Expr>,
762 query_time_range: Option<TimestampRange>,
763 partition_time_range: FileTimeRange,
764 expected_filters: Vec<Expr>,
765 expected_time_filters: Vec<Expr>,
766 ) {
767 let (stream_ctx, part_range) =
768 new_stream_context(filters, query_time_range, partition_time_range).await;
769
770 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
771
772 assert_eq!(
773 key.scan.filters(),
774 normalized_exprs(expected_filters).as_slice()
775 );
776 assert_eq!(
777 key.scan.time_filters(),
778 normalized_exprs(expected_time_filters).as_slice()
779 );
780 }
781
782 #[tokio::test]
783 async fn strips_time_only_filters_when_query_covers_partition_range() {
784 assert_range_cache_filters(
785 vec![
786 col("ts").gt_eq(ts_lit(1000)),
787 col("ts").lt(ts_lit(2001)),
788 col("ts").is_not_null(),
789 col("k0").eq(lit("foo")),
790 ],
791 TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
792 (
793 Timestamp::new_millisecond(1000),
794 Timestamp::new_millisecond(2000),
795 ),
796 vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
797 vec![],
798 )
799 .await;
800 }
801
802 #[tokio::test]
803 async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
804 assert_range_cache_filters(
805 vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
806 TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
807 (
808 Timestamp::new_millisecond(1000),
809 Timestamp::new_millisecond(2000),
810 ),
811 vec![col("k0").eq(lit("foo"))],
812 vec![col("ts").gt_eq(ts_lit(1000))],
813 )
814 .await;
815 }
816
817 #[tokio::test]
818 async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
819 assert_range_cache_filters(
820 vec![
821 col("ts").gt_eq(ts_lit(1000)),
822 col("ts").is_not_null(),
823 col("k0").eq(lit("foo")),
824 ],
825 None,
826 (
827 Timestamp::new_millisecond(1000),
828 Timestamp::new_millisecond(2000),
829 ),
830 vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
831 vec![],
832 )
833 .await;
834 }
835
836 #[test]
837 fn normalizes_and_clears_time_filters() {
838 let normalized =
839 test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0);
840
841 assert!(normalized.time_filters().is_empty());
842
843 let fingerprint = test_scan_fingerprint(
844 vec!["k0 = 'foo'".to_string()],
845 vec!["ts >= 1000".to_string()],
846 Some(TimeSeriesRowSelector::LastRow),
847 true,
848 7,
849 );
850
851 let reset = fingerprint.without_time_filters();
852
853 assert_eq!(reset.read_columns(), fingerprint.read_columns());
854 assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
855 assert_eq!(reset.filters(), fingerprint.filters());
856 assert!(reset.time_filters().is_empty());
857 assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
858 assert_eq!(reset.append_mode, fingerprint.append_mode);
859 assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
860 assert_eq!(reset.merge_mode, fingerprint.merge_mode);
861 assert_eq!(
862 reset.partition_expr_version,
863 fingerprint.partition_expr_version
864 );
865 }
866
867 fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
868 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
869
870 Arc::new(Schema::new(vec![Field::new(
871 "value",
872 ArrowDataType::Int64,
873 false,
874 )]))
875 }
876
877 fn make_batch(values: &[i64]) -> RecordBatch {
878 use datatypes::arrow::array::Int64Array;
879
880 RecordBatch::try_new(
881 test_schema(),
882 vec![Arc::new(Int64Array::from(values.to_vec()))],
883 )
884 .unwrap()
885 }
886
887 fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
888 use datatypes::arrow::array::BinaryArray;
889 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
890
891 let schema = Arc::new(Schema::new(vec![Field::new(
892 "value",
893 ArrowDataType::Binary,
894 false,
895 )]));
896 let payload = vec![b'x'; bytes_per_row];
897 let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
898
899 RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
900 }
901
902 #[test]
903 fn compact_record_batches_keeps_original_boundaries() {
904 let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
905
906 let compacted = compact_record_batches(batches).unwrap();
907
908 assert_eq!(compacted.batch.num_rows(), 5);
909 assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
910 }
911
912 #[tokio::test]
913 async fn cached_flat_range_stream_replays_original_batches() {
914 let value = Arc::new(RangeScanCacheValue::new(
915 vec![CachedBatchSlice {
916 batch: make_batch(&[1, 2, 3]),
917 slice_lengths: vec![2, 1],
918 }],
919 make_batch(&[1, 2, 3]).get_array_memory_size(),
920 ));
921
922 let replayed = cached_flat_range_stream(value)
923 .try_collect::<Vec<_>>()
924 .await
925 .unwrap();
926
927 assert_eq!(replayed.len(), 2);
928 assert_eq!(replayed[0].num_rows(), 2);
929 assert_eq!(replayed[1].num_rows(), 1);
930 }
931
932 #[tokio::test]
933 async fn cache_batch_buffer_finishes_pending_batches() {
934 let strategy = test_cache_strategy();
935 let batch = make_batch(&[1, 2, 3]);
936 let expected_size = batch.get_array_memory_size();
937 let (key, part_metrics) = test_cache_context(&strategy);
938
939 let mut buffer = CacheBatchBuffer::new(&strategy);
940 buffer.push(batch).unwrap();
941
942 let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
943 .await
944 .unwrap();
945 assert_eq!(value.cached_batches.len(), 1);
946 assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
947 assert_eq!(value.estimated_batches_size, expected_size);
948 assert!(Arc::ptr_eq(
949 &value,
950 &strategy.get_range_result(&key).unwrap()
951 ));
952 }
953
954 #[tokio::test]
955 async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
956 let strategy = test_cache_strategy();
957 let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
958 let (key, part_metrics) = test_cache_context(&strategy);
959
960 let mut buffer = CacheBatchBuffer::new(&strategy);
961 buffer.push(batch.clone()).unwrap();
962 buffer.push(batch).unwrap();
963
964 assert_eq!(buffer.buffered_rows, 0);
965 assert!(buffer.buffered_batches.is_empty());
966
967 let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
968 .await
969 .unwrap();
970 assert_eq!(value.cached_batches.len(), 1);
971 assert_eq!(
972 value.cached_batches[0].slice_lengths,
973 vec![
974 DEFAULT_READ_BATCH_SIZE / 2 + 1,
975 DEFAULT_READ_BATCH_SIZE / 2 + 1
976 ]
977 );
978 }
979
980 #[tokio::test]
981 async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
982 let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
983 let strategy = CacheStrategy::EnableAll(Arc::new(
984 CacheManager::builder()
985 .range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64)
986 .build(),
987 ));
988 let (key, part_metrics) = test_cache_context(&strategy);
989
990 let mut buffer = CacheBatchBuffer::new(&strategy);
991 buffer.push(large_batch.clone()).unwrap();
992
993 assert_eq!(buffer.buffered_rows, large_batch.num_rows());
994 assert_eq!(buffer.buffered_batches.len(), 1);
995
996 buffer.push(large_batch.clone()).unwrap();
997
998 assert_eq!(buffer.buffered_rows, 0);
999 assert!(buffer.buffered_batches.is_empty());
1000
1001 let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1002 .await
1003 .unwrap();
1004 assert_eq!(value.cached_batches.len(), 1);
1005 assert_eq!(
1006 value.cached_batches[0].slice_lengths,
1007 vec![large_batch.num_rows(), large_batch.num_rows()]
1008 );
1009 }
1010
1011 #[tokio::test]
1012 async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() {
1013 let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096);
1014 let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1;
1016 let strategy = CacheStrategy::EnableAll(Arc::new(
1017 CacheManager::builder()
1018 .range_result_cache_size(budget)
1019 .build(),
1020 ));
1021 let (key, part_metrics) = test_cache_context(&strategy);
1022
1023 let mut buffer = CacheBatchBuffer::new(&strategy);
1024 for _ in 0..4 {
1025 buffer.push(large_batch.clone()).unwrap();
1026 }
1027 assert!(
1028 finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
1029 .await
1030 .is_err()
1031 );
1032 assert!(strategy.get_range_result(&key).is_none());
1033 }
1034}