1use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use common_time::range::TimestampRange;
23use datatypes::arrow::compute::concat_batches;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::region_engine::PartitionRange;
29use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
30use tokio::sync::{mpsc, oneshot};
31
32use crate::cache::CacheStrategy;
33use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu};
34use crate::read::BoxedRecordBatchStream;
35use crate::read::scan_region::StreamContext;
36use crate::read::scan_util::PartitionMetrics;
37use crate::region::options::MergeMode;
38use crate::sst::file::FileTimeRange;
39use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
40
41const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024;
42const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024;
43
44#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub(crate) struct ScanRequestFingerprint {
52 inner: Arc<SharedScanRequestFingerprint>,
54 time_filters: Option<Arc<Vec<String>>>,
56 series_row_selector: Option<TimeSeriesRowSelector>,
57 append_mode: bool,
58 filter_deleted: bool,
59 merge_mode: MergeMode,
60 partition_expr_version: u64,
63}
64
65#[derive(Debug)]
66pub(crate) struct ScanRequestFingerprintBuilder {
67 pub(crate) read_column_ids: Vec<ColumnId>,
68 pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
69 pub(crate) filters: Vec<String>,
70 pub(crate) time_filters: Vec<String>,
71 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
72 pub(crate) append_mode: bool,
73 pub(crate) filter_deleted: bool,
74 pub(crate) merge_mode: MergeMode,
75 pub(crate) partition_expr_version: u64,
76}
77
78impl ScanRequestFingerprintBuilder {
79 pub(crate) fn build(self) -> ScanRequestFingerprint {
80 let Self {
81 read_column_ids,
82 read_column_types,
83 filters,
84 time_filters,
85 series_row_selector,
86 append_mode,
87 filter_deleted,
88 merge_mode,
89 partition_expr_version,
90 } = self;
91
92 ScanRequestFingerprint {
93 inner: Arc::new(SharedScanRequestFingerprint {
94 read_column_ids,
95 read_column_types,
96 filters,
97 }),
98 time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
99 series_row_selector,
100 append_mode,
101 filter_deleted,
102 merge_mode,
103 partition_expr_version,
104 }
105 }
106}
107
108#[derive(Debug, PartialEq, Eq, Hash)]
110struct SharedScanRequestFingerprint {
111 read_column_ids: Vec<ColumnId>,
113 read_column_types: Vec<Option<ConcreteDataType>>,
116 filters: Vec<String>,
118}
119
120impl ScanRequestFingerprint {
121 #[cfg(test)]
122 pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
123 &self.inner.read_column_ids
124 }
125
126 #[cfg(test)]
127 pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
128 &self.inner.read_column_types
129 }
130
131 #[cfg(test)]
132 pub(crate) fn filters(&self) -> &[String] {
133 &self.inner.filters
134 }
135
136 #[cfg(test)]
137 pub(crate) fn time_filters(&self) -> &[String] {
138 self.time_filters
139 .as_deref()
140 .map(Vec::as_slice)
141 .unwrap_or(&[])
142 }
143
144 pub(crate) fn without_time_filters(&self) -> Self {
145 Self {
146 inner: Arc::clone(&self.inner),
147 time_filters: None,
148 series_row_selector: self.series_row_selector,
149 append_mode: self.append_mode,
150 filter_deleted: self.filter_deleted,
151 merge_mode: self.merge_mode,
152 partition_expr_version: self.partition_expr_version,
153 }
154 }
155
156 pub(crate) fn estimated_size(&self) -> usize {
157 mem::size_of::<SharedScanRequestFingerprint>()
158 + self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
159 + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
160 + self.inner.filters.capacity() * mem::size_of::<String>()
161 + self
162 .inner
163 .filters
164 .iter()
165 .map(|filter| filter.capacity())
166 .sum::<usize>()
167 + self.time_filters.as_ref().map_or(0, |filters| {
168 mem::size_of::<Vec<String>>()
169 + filters.capacity() * mem::size_of::<String>()
170 + filters
171 .iter()
172 .map(|filter| filter.capacity())
173 .sum::<usize>()
174 })
175 }
176}
177
178#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub(crate) struct RangeScanCacheKey {
181 pub(crate) region_id: RegionId,
182 pub(crate) row_groups: Vec<(FileId, i64)>,
184 pub(crate) scan: ScanRequestFingerprint,
185}
186
187impl RangeScanCacheKey {
188 pub(crate) fn estimated_size(&self) -> usize {
189 mem::size_of::<Self>()
190 + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
191 + self.scan.estimated_size()
192 }
193}
194
195#[derive(Debug)]
197pub(crate) struct CachedBatchSlice {
198 batch: RecordBatch,
199 slice_lengths: Vec<usize>,
200}
201
202impl CachedBatchSlice {
203 fn metadata_size(&self) -> usize {
204 self.slice_lengths.capacity() * mem::size_of::<usize>()
205 }
206}
207
208pub(crate) struct RangeScanCacheValue {
209 cached_batches: Vec<CachedBatchSlice>,
210 estimated_batches_size: usize,
212}
213
214impl RangeScanCacheValue {
215 pub(crate) fn new(
216 cached_batches: Vec<CachedBatchSlice>,
217 estimated_batches_size: usize,
218 ) -> Self {
219 Self {
220 cached_batches,
221 estimated_batches_size,
222 }
223 }
224
225 pub(crate) fn estimated_size(&self) -> usize {
226 mem::size_of::<Self>()
227 + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
228 + self
229 .cached_batches
230 .iter()
231 .map(CachedBatchSlice::metadata_size)
232 .sum::<usize>()
233 + self.estimated_batches_size
234 }
235}
236
237pub(crate) struct PartitionRangeRowGroups {
239 pub(crate) row_groups: Vec<(FileId, i64)>,
241 pub(crate) only_file_sources: bool,
242}
243
244pub(crate) fn collect_partition_range_row_groups(
246 stream_ctx: &StreamContext,
247 part_range: &PartitionRange,
248) -> PartitionRangeRowGroups {
249 let range_meta = &stream_ctx.ranges[part_range.identifier];
250 let mut row_groups = Vec::new();
251 let mut only_file_sources = true;
252
253 for index in &range_meta.row_group_indices {
254 if stream_ctx.is_file_range_index(*index) {
255 let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
256 row_groups.push((file_id, index.row_group_index));
257 } else {
258 only_file_sources = false;
259 }
260 }
261
262 row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
263
264 PartitionRangeRowGroups {
265 row_groups,
266 only_file_sources,
267 }
268}
269
270pub(crate) fn build_range_cache_key(
272 stream_ctx: &StreamContext,
273 part_range: &PartitionRange,
274) -> Option<RangeScanCacheKey> {
275 if !stream_ctx.input.cache_strategy.has_range_result_cache() {
276 return None;
277 }
278
279 let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
280
281 let has_dyn_filters = stream_ctx
283 .input
284 .predicate_group()
285 .predicate_without_region()
286 .is_some_and(|p| !p.dyn_filters().is_empty());
287 if has_dyn_filters {
288 return None;
289 }
290
291 let rg = collect_partition_range_row_groups(stream_ctx, part_range);
292 if !rg.only_file_sources || rg.row_groups.is_empty() {
293 return None;
294 }
295
296 let range_meta = &stream_ctx.ranges[part_range.identifier];
297 let scan = if query_time_range_covers_partition_range(
298 stream_ctx.input.time_range.as_ref(),
299 range_meta.time_range,
300 ) {
301 fingerprint.without_time_filters()
302 } else {
303 fingerprint.clone()
304 };
305
306 Some(RangeScanCacheKey {
307 region_id: stream_ctx.input.region_metadata().region_id,
308 row_groups: rg.row_groups,
309 scan,
310 })
311}
312
313fn query_time_range_covers_partition_range(
314 query_time_range: Option<&TimestampRange>,
315 partition_time_range: FileTimeRange,
316) -> bool {
317 let Some(query_time_range) = query_time_range else {
318 return true;
319 };
320
321 let (part_start, part_end) = partition_time_range;
322 query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
323}
324
325pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
327 Box::pin(try_stream! {
328 for cached_batch in &value.cached_batches {
329 let mut offset = 0;
330 for &len in &cached_batch.slice_lengths {
331 yield cached_batch.batch.slice(offset, len);
332 offset += len;
333 }
334 }
335 })
336}
337
338enum CacheConcatCommand {
339 Compact(Vec<RecordBatch>),
340 Finish {
341 pending: Vec<RecordBatch>,
342 key: RangeScanCacheKey,
343 cache_strategy: CacheStrategy,
344 part_metrics: PartitionMetrics,
345 result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
346 },
347}
348
349#[derive(Default)]
350struct CacheConcatState {
351 cached_batches: Vec<CachedBatchSlice>,
352 estimated_size: usize,
353}
354
355impl CacheConcatState {
356 async fn compact(
357 &mut self,
358 batches: Vec<RecordBatch>,
359 limiter: &crate::cache::RangeResultMemoryLimiter,
360 ) -> Result<()> {
361 if batches.is_empty() {
362 return Ok(());
363 }
364
365 let input_size = batches
366 .iter()
367 .map(RecordBatch::get_array_memory_size)
368 .sum::<usize>();
369 let _permit = limiter.acquire(input_size).await.map_err(|_| {
370 UnexpectedSnafu {
371 reason: "range result memory limiter is unexpectedly closed",
372 }
373 .build()
374 })?;
375
376 let compacted = compact_record_batches(batches)?;
377 self.estimated_size += compacted.batch.get_array_memory_size();
378 self.cached_batches.push(compacted);
379 Ok(())
380 }
381
382 fn finish(self) -> RangeScanCacheValue {
383 RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
384 }
385}
386
387fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
388 debug_assert!(!batches.is_empty());
389
390 let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
391 build_cached_batch_slice(batches, slice_lengths)
392}
393
394fn build_cached_batch_slice(
395 batches: Vec<RecordBatch>,
396 slice_lengths: Vec<usize>,
397) -> Result<CachedBatchSlice> {
398 let batch = if batches.len() == 1 {
399 batches.into_iter().next().unwrap()
400 } else {
401 let schema = batches[0].schema();
402 concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
403 };
404
405 Ok(CachedBatchSlice {
406 batch,
407 slice_lengths,
408 })
409}
410
411async fn run_cache_concat_task(
412 mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
413 limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
414) {
415 let mut state = CacheConcatState::default();
416
417 while let Some(cmd) = rx.recv().await {
418 match cmd {
419 CacheConcatCommand::Compact(batches) => {
420 if let Err(err) = state.compact(batches, &limiter).await {
421 warn!(err; "Failed to compact range cache batches");
422 return;
423 }
424 }
425 CacheConcatCommand::Finish {
426 pending,
427 key,
428 cache_strategy,
429 part_metrics,
430 result_tx,
431 } => {
432 let result = state
433 .compact(pending, &limiter)
434 .await
435 .map(|()| state.finish());
436 if let Err(err) = &result {
437 warn!(err; "Failed to finalize range cache batches");
438 }
439
440 let value = result.ok().map(Arc::new);
441 if let Some(value) = &value {
442 part_metrics
443 .inc_range_cache_size(key.estimated_size() + value.estimated_size());
444 cache_strategy.put_range_result(key, value.clone());
445 }
446 if let Some(tx) = result_tx {
447 let _ = tx.send(value);
448 }
449 return;
450 }
451 }
452 }
453}
454
455struct CacheBatchBuffer {
456 buffered_batches: Vec<RecordBatch>,
457 buffered_rows: usize,
458 buffered_size: usize,
459 total_weight: usize,
460 sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
461}
462
463impl CacheBatchBuffer {
464 fn new(cache_strategy: &CacheStrategy) -> Self {
465 let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
466 let (tx, rx) = mpsc::unbounded_channel();
467 common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone()));
468 tx
469 });
470
471 Self {
472 buffered_batches: Vec::new(),
473 buffered_rows: 0,
474 buffered_size: 0,
475 total_weight: 0,
476 sender,
477 }
478 }
479
480 fn push(&mut self, batch: RecordBatch) -> Result<()> {
481 if self.sender.is_none() {
482 return Ok(());
483 }
484
485 let batch_size = batch.get_array_memory_size();
486 self.total_weight += batch_size;
487 if self.total_weight > RANGE_CACHE_SKIP_BYTES {
488 self.buffered_batches.clear();
489 self.buffered_rows = 0;
490 self.buffered_size = 0;
491 self.sender = None;
492 return Ok(());
493 }
494
495 self.buffered_rows += batch.num_rows();
496 self.buffered_size += batch_size;
497 self.buffered_batches.push(batch);
498
499 if self.buffered_rows > DEFAULT_READ_BATCH_SIZE
500 || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES
501 {
502 self.notify_compact();
503 }
504
505 Ok(())
506 }
507
508 fn notify_compact(&mut self) {
509 if self.buffered_batches.is_empty() || self.sender.is_none() {
510 return;
511 }
512
513 let batches = mem::take(&mut self.buffered_batches);
514 self.buffered_rows = 0;
515 self.buffered_size = 0;
516
517 let Some(sender) = &self.sender else {
518 return;
519 };
520 if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
521 self.sender = None;
522 }
523 }
524
525 fn finish(
526 mut self,
527 key: RangeScanCacheKey,
528 cache_strategy: CacheStrategy,
529 part_metrics: PartitionMetrics,
530 result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
531 ) {
532 let Some(sender) = self.sender.take() else {
533 return;
534 };
535
536 if sender
537 .send(CacheConcatCommand::Finish {
538 pending: mem::take(&mut self.buffered_batches),
539 key,
540 cache_strategy,
541 part_metrics,
542 result_tx,
543 })
544 .is_err()
545 {
546 self.sender = None;
547 }
548 }
549}
550
551pub(crate) fn cache_flat_range_stream(
553 mut stream: BoxedRecordBatchStream,
554 cache_strategy: CacheStrategy,
555 key: RangeScanCacheKey,
556 part_metrics: PartitionMetrics,
557) -> BoxedRecordBatchStream {
558 Box::pin(try_stream! {
559 let mut buffer = CacheBatchBuffer::new(&cache_strategy);
560 while let Some(batch) = stream.try_next().await? {
561 buffer.push(batch.clone())?;
562 yield batch;
563 }
564
565 buffer.finish(key, cache_strategy, part_metrics, None);
566 })
567}
568
569#[cfg(feature = "test")]
574pub fn bench_cache_flat_range_stream(
575 stream: BoxedRecordBatchStream,
576 cache_size_bytes: u64,
577 region_id: RegionId,
578) -> BoxedRecordBatchStream {
579 use std::time::Instant;
580
581 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
582
583 use crate::region::options::MergeMode;
584
585 let cache_manager = Arc::new(
586 crate::cache::CacheManager::builder()
587 .range_result_cache_size(cache_size_bytes)
588 .build(),
589 );
590 let cache_strategy = CacheStrategy::EnableAll(cache_manager);
591
592 let fingerprint = ScanRequestFingerprintBuilder {
593 read_column_ids: vec![],
594 read_column_types: vec![],
595 filters: vec![],
596 time_filters: vec![],
597 series_row_selector: None,
598 append_mode: false,
599 filter_deleted: false,
600 merge_mode: MergeMode::LastRow,
601 partition_expr_version: 0,
602 }
603 .build();
604
605 let key = RangeScanCacheKey {
606 region_id,
607 row_groups: vec![],
608 scan: fingerprint,
609 };
610
611 let metrics_set = ExecutionPlanMetricsSet::new();
612 let part_metrics =
613 PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
614
615 cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
616}
617
618#[cfg(test)]
619mod tests {
620 use std::sync::Arc;
621 use std::time::Instant;
622
623 use common_time::Timestamp;
624 use common_time::range::TimestampRange;
625 use common_time::timestamp::TimeUnit;
626 use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
627 use datafusion_common::ScalarValue;
628 use datafusion_expr::{Expr, col, lit};
629 use smallvec::smallvec;
630 use store_api::storage::{FileId, RegionId};
631
632 use super::*;
633 use crate::cache::CacheManager;
634 use crate::read::projection::ProjectionMapper;
635 use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
636 use crate::read::scan_region::{PredicateGroup, ScanInput};
637 use crate::test_util::memtable_util::metadata_with_primary_key;
638 use crate::test_util::scheduler_util::SchedulerEnv;
639 use crate::test_util::sst_util::sst_file_handle_with_file_id;
640
641 fn test_cache_strategy() -> CacheStrategy {
642 CacheStrategy::EnableAll(Arc::new(
643 CacheManager::builder()
644 .range_result_cache_size(1024)
645 .build(),
646 ))
647 }
648
649 fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
650 let region_id = RegionId::new(1, 1);
651 let key = RangeScanCacheKey {
652 region_id,
653 row_groups: vec![],
654 scan: ScanRequestFingerprintBuilder {
655 read_column_ids: vec![],
656 read_column_types: vec![],
657 filters: vec![],
658 time_filters: vec![],
659 series_row_selector: None,
660 append_mode: false,
661 filter_deleted: false,
662 merge_mode: MergeMode::LastRow,
663 partition_expr_version: 0,
664 }
665 .build(),
666 };
667
668 let metrics_set = ExecutionPlanMetricsSet::new();
669 let part_metrics =
670 PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
671
672 assert!(strategy.get_range_result(&key).is_none());
673 (key, part_metrics)
674 }
675
676 async fn finish_cache_batch_buffer(
677 buffer: CacheBatchBuffer,
678 key: RangeScanCacheKey,
679 cache_strategy: CacheStrategy,
680 part_metrics: PartitionMetrics,
681 ) -> Option<Arc<RangeScanCacheValue>> {
682 let (tx, rx) = oneshot::channel();
683 buffer.finish(key, cache_strategy, part_metrics, Some(tx));
684 rx.await.context(crate::error::RecvSnafu).ok().flatten()
685 }
686
687 async fn new_stream_context(
688 filters: Vec<Expr>,
689 query_time_range: Option<TimestampRange>,
690 partition_time_range: FileTimeRange,
691 ) -> (StreamContext, PartitionRange) {
692 let env = SchedulerEnv::new().await;
693 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
694 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
695 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
696 let file_id = FileId::random();
697 let file = sst_file_handle_with_file_id(
698 file_id,
699 partition_time_range.0.value(),
700 partition_time_range.1.value(),
701 );
702 let input = ScanInput::new(env.access_layer.clone(), mapper)
703 .with_predicate(predicate)
704 .with_time_range(query_time_range)
705 .with_files(vec![file])
706 .with_cache(test_cache_strategy());
707 let range_meta = RangeMeta {
708 time_range: partition_time_range,
709 indices: smallvec![SourceIndex {
710 index: 0,
711 num_row_groups: 1,
712 }],
713 row_group_indices: smallvec![RowGroupIndex {
714 index: 0,
715 row_group_index: 0,
716 }],
717 num_rows: 10,
718 };
719 let partition_range = range_meta.new_partition_range(0);
720 let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
721 let stream_ctx = StreamContext {
722 input,
723 ranges: vec![range_meta],
724 scan_fingerprint,
725 query_start: Instant::now(),
726 };
727
728 (stream_ctx, partition_range)
729 }
730
731 fn ts_lit(val: i64) -> Expr {
733 lit(ScalarValue::TimestampMillisecond(Some(val), None))
734 }
735
736 #[tokio::test]
737 async fn strips_time_only_filters_when_query_covers_partition_range() {
738 let (stream_ctx, part_range) = new_stream_context(
739 vec![
740 col("ts").gt_eq(ts_lit(1000)),
741 col("ts").lt(ts_lit(2001)),
742 col("ts").is_not_null(),
743 col("k0").eq(lit("foo")),
744 ],
745 TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
746 (
747 Timestamp::new_millisecond(1000),
748 Timestamp::new_millisecond(2000),
749 ),
750 )
751 .await;
752
753 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
754
755 assert!(key.scan.time_filters().is_empty());
757 let mut expected_filters = [
759 col("k0").eq(lit("foo")).to_string(),
760 col("ts").is_not_null().to_string(),
761 ];
762 expected_filters.sort_unstable();
763 assert_eq!(key.scan.filters(), expected_filters.as_slice());
764 }
765
766 #[tokio::test]
767 async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
768 let (stream_ctx, part_range) = new_stream_context(
769 vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
770 TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
771 (
772 Timestamp::new_millisecond(1000),
773 Timestamp::new_millisecond(2000),
774 ),
775 )
776 .await;
777
778 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
779
780 assert_eq!(
782 key.scan.time_filters(),
783 [col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice()
784 );
785 assert_eq!(
786 key.scan.filters(),
787 [col("k0").eq(lit("foo")).to_string()].as_slice()
788 );
789 }
790
791 #[tokio::test]
792 async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
793 let (stream_ctx, part_range) = new_stream_context(
794 vec![
795 col("ts").gt_eq(ts_lit(1000)),
796 col("ts").is_not_null(),
797 col("k0").eq(lit("foo")),
798 ],
799 None,
800 (
801 Timestamp::new_millisecond(1000),
802 Timestamp::new_millisecond(2000),
803 ),
804 )
805 .await;
806
807 let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
808
809 assert!(key.scan.time_filters().is_empty());
811 let mut expected_filters = [
813 col("k0").eq(lit("foo")).to_string(),
814 col("ts").is_not_null().to_string(),
815 ];
816 expected_filters.sort_unstable();
817 assert_eq!(key.scan.filters(), expected_filters.as_slice());
818 }
819
820 #[test]
821 fn normalizes_and_clears_time_filters() {
822 let normalized = ScanRequestFingerprintBuilder {
823 read_column_ids: vec![1, 2],
824 read_column_types: vec![None, None],
825 filters: vec!["k0 = 'foo'".to_string()],
826 time_filters: vec![],
827 series_row_selector: None,
828 append_mode: false,
829 filter_deleted: true,
830 merge_mode: MergeMode::LastRow,
831 partition_expr_version: 0,
832 }
833 .build();
834
835 assert!(normalized.time_filters().is_empty());
836
837 let fingerprint = ScanRequestFingerprintBuilder {
838 read_column_ids: vec![1, 2],
839 read_column_types: vec![None, None],
840 filters: vec!["k0 = 'foo'".to_string()],
841 time_filters: vec!["ts >= 1000".to_string()],
842 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
843 append_mode: false,
844 filter_deleted: true,
845 merge_mode: MergeMode::LastRow,
846 partition_expr_version: 7,
847 }
848 .build();
849
850 let reset = fingerprint.without_time_filters();
851
852 assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
853 assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
854 assert_eq!(reset.filters(), fingerprint.filters());
855 assert!(reset.time_filters().is_empty());
856 assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
857 assert_eq!(reset.append_mode, fingerprint.append_mode);
858 assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
859 assert_eq!(reset.merge_mode, fingerprint.merge_mode);
860 assert_eq!(
861 reset.partition_expr_version,
862 fingerprint.partition_expr_version
863 );
864 }
865
866 fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
867 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
868
869 Arc::new(Schema::new(vec![Field::new(
870 "value",
871 ArrowDataType::Int64,
872 false,
873 )]))
874 }
875
876 fn make_batch(values: &[i64]) -> RecordBatch {
877 use datatypes::arrow::array::Int64Array;
878
879 RecordBatch::try_new(
880 test_schema(),
881 vec![Arc::new(Int64Array::from(values.to_vec()))],
882 )
883 .unwrap()
884 }
885
886 fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
887 use datatypes::arrow::array::BinaryArray;
888 use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
889
890 let schema = Arc::new(Schema::new(vec![Field::new(
891 "value",
892 ArrowDataType::Binary,
893 false,
894 )]));
895 let payload = vec![b'x'; bytes_per_row];
896 let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
897
898 RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
899 }
900
901 #[test]
902 fn compact_record_batches_keeps_original_boundaries() {
903 let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
904
905 let compacted = compact_record_batches(batches).unwrap();
906
907 assert_eq!(compacted.batch.num_rows(), 5);
908 assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
909 }
910
911 #[tokio::test]
912 async fn cached_flat_range_stream_replays_original_batches() {
913 let value = Arc::new(RangeScanCacheValue::new(
914 vec![CachedBatchSlice {
915 batch: make_batch(&[1, 2, 3]),
916 slice_lengths: vec![2, 1],
917 }],
918 make_batch(&[1, 2, 3]).get_array_memory_size(),
919 ));
920
921 let replayed = cached_flat_range_stream(value)
922 .try_collect::<Vec<_>>()
923 .await
924 .unwrap();
925
926 assert_eq!(replayed.len(), 2);
927 assert_eq!(replayed[0].num_rows(), 2);
928 assert_eq!(replayed[1].num_rows(), 1);
929 }
930
931 #[tokio::test]
932 async fn cache_batch_buffer_finishes_pending_batches() {
933 let strategy = test_cache_strategy();
934 let batch = make_batch(&[1, 2, 3]);
935 let expected_size = batch.get_array_memory_size();
936 let (key, part_metrics) = test_cache_context(&strategy);
937
938 let mut buffer = CacheBatchBuffer::new(&strategy);
939 buffer.push(batch).unwrap();
940
941 let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
942 .await
943 .unwrap();
944 assert_eq!(value.cached_batches.len(), 1);
945 assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
946 assert_eq!(value.estimated_batches_size, expected_size);
947 assert!(Arc::ptr_eq(
948 &value,
949 &strategy.get_range_result(&key).unwrap()
950 ));
951 }
952
953 #[tokio::test]
954 async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
955 let strategy = test_cache_strategy();
956 let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
957 let (key, part_metrics) = test_cache_context(&strategy);
958
959 let mut buffer = CacheBatchBuffer::new(&strategy);
960 buffer.push(batch.clone()).unwrap();
961 buffer.push(batch).unwrap();
962
963 assert_eq!(buffer.buffered_rows, 0);
964 assert!(buffer.buffered_batches.is_empty());
965
966 let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
967 .await
968 .unwrap();
969 assert_eq!(value.cached_batches.len(), 1);
970 assert_eq!(
971 value.cached_batches[0].slice_lengths,
972 vec![
973 DEFAULT_READ_BATCH_SIZE / 2 + 1,
974 DEFAULT_READ_BATCH_SIZE / 2 + 1
975 ]
976 );
977 }
978
979 #[tokio::test]
980 async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
981 let strategy = test_cache_strategy();
982 let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
983 let (key, part_metrics) = test_cache_context(&strategy);
984
985 let mut buffer = CacheBatchBuffer::new(&strategy);
986 buffer.push(large_batch.clone()).unwrap();
987
988 assert_eq!(buffer.buffered_rows, 0);
989 assert!(buffer.buffered_batches.is_empty());
990
991 let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
992 .await
993 .unwrap();
994 assert_eq!(value.cached_batches.len(), 1);
995 assert_eq!(
996 value.cached_batches[0].slice_lengths,
997 vec![large_batch.num_rows()]
998 );
999 }
1000
1001 #[tokio::test]
1002 async fn cache_batch_buffer_uses_compacted_size_for_weight() {
1003 let strategy = test_cache_strategy();
1004 let batch1 = make_batch(&[1, 2]);
1005 let batch2 = make_batch(&[3, 4]);
1006 let (key, part_metrics) = test_cache_context(&strategy);
1007 let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()])
1008 .unwrap()
1009 .get_array_memory_size();
1010
1011 let mut buffer = CacheBatchBuffer::new(&strategy);
1012 buffer.push(batch1).unwrap();
1013 buffer.push(batch2).unwrap();
1014
1015 let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1016 .await
1017 .unwrap();
1018 assert_eq!(value.estimated_batches_size, expected);
1019 }
1020
1021 #[tokio::test]
1022 async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() {
1023 let strategy = test_cache_strategy();
1024 let (key, part_metrics) = test_cache_context(&strategy);
1025 let mut buffer = CacheBatchBuffer::new(&strategy);
1026 buffer.total_weight = RANGE_CACHE_SKIP_BYTES;
1027
1028 buffer.push(make_batch(&[1])).unwrap();
1029
1030 assert!(buffer.sender.is_none());
1031 assert!(
1032 finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1033 .await
1034 .is_none()
1035 );
1036 }
1037}