Skip to main content

mito2/read/
range_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for the partition range scan result cache.
16
17use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use common_time::range::TimestampRange;
23use datatypes::arrow::compute::concat_batches;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::region_engine::PartitionRange;
29use store_api::storage::{FileId, RegionId, TimeSeriesRowSelector};
30use tokio::sync::{mpsc, oneshot};
31
32use crate::cache::CacheStrategy;
33use crate::error::{ComputeArrowSnafu, Result};
34use crate::read::BoxedRecordBatchStream;
35use crate::read::read_columns::ReadColumns;
36use crate::read::scan_region::StreamContext;
37use crate::read::scan_util::PartitionMetrics;
38use crate::region::options::MergeMode;
39use crate::sst::file::FileTimeRange;
40use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
41
42const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 8 * 1024 * 1024;
43
44/// Fingerprint of the scan request fields that affect partition range cache reuse.
45///
46/// It records a normalized view of the projected columns and filters, plus
47/// scan options that can change the returned rows. Schema-dependent metadata
48/// and the partition expression version are included so cached results are not
49/// reused across incompatible schema or partitioning changes.
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub(crate) struct ScanRequestFingerprint {
52    /// Projection and filters without the time index and partition exprs.
53    inner: Arc<SharedScanRequestFingerprint>,
54    /// Filters with the time index column.
55    time_filters: Option<Arc<Vec<String>>>,
56    series_row_selector: Option<TimeSeriesRowSelector>,
57    append_mode: bool,
58    filter_deleted: bool,
59    merge_mode: MergeMode,
60    /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
61    /// We store the version instead of the whole partition expr or partition expr filters.
62    partition_expr_version: u64,
63}
64
65#[derive(Debug)]
66pub(crate) struct ScanRequestFingerprintBuilder {
67    pub(crate) read_columns: ReadColumns,
68    pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
69    pub(crate) filters: Vec<String>,
70    pub(crate) time_filters: Vec<String>,
71    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
72    pub(crate) append_mode: bool,
73    pub(crate) filter_deleted: bool,
74    pub(crate) merge_mode: MergeMode,
75    pub(crate) partition_expr_version: u64,
76}
77
78impl ScanRequestFingerprintBuilder {
79    pub(crate) fn build(self) -> ScanRequestFingerprint {
80        let Self {
81            read_columns,
82            read_column_types,
83            filters,
84            time_filters,
85            series_row_selector,
86            append_mode,
87            filter_deleted,
88            merge_mode,
89            partition_expr_version,
90        } = self;
91
92        ScanRequestFingerprint {
93            inner: Arc::new(SharedScanRequestFingerprint {
94                read_columns,
95                read_column_types,
96                filters,
97            }),
98            time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
99            series_row_selector,
100            append_mode,
101            filter_deleted,
102            merge_mode,
103            partition_expr_version,
104        }
105    }
106}
107
108/// Non-copiable struct of the fingerprint.
109#[derive(Debug, PartialEq, Eq, Hash)]
110struct SharedScanRequestFingerprint {
111    /// Logical columns of the projection.
112    read_columns: ReadColumns,
113    /// Column types of the projection.
114    /// We keep this to ensure we won't reuse the fingerprint after a schema change.
115    read_column_types: Vec<Option<ConcreteDataType>>,
116    /// Filters without the time index column and region partition exprs.
117    filters: Vec<String>,
118}
119
120impl ScanRequestFingerprint {
121    #[cfg(test)]
122    pub(crate) fn read_columns(&self) -> &ReadColumns {
123        &self.inner.read_columns
124    }
125
126    #[cfg(test)]
127    pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
128        &self.inner.read_column_types
129    }
130
131    #[cfg(test)]
132    pub(crate) fn filters(&self) -> &[String] {
133        &self.inner.filters
134    }
135
136    #[cfg(test)]
137    pub(crate) fn time_filters(&self) -> &[String] {
138        self.time_filters
139            .as_deref()
140            .map(Vec::as_slice)
141            .unwrap_or(&[])
142    }
143
144    pub(crate) fn without_time_filters(&self) -> Self {
145        Self {
146            inner: Arc::clone(&self.inner),
147            time_filters: None,
148            series_row_selector: self.series_row_selector,
149            append_mode: self.append_mode,
150            filter_deleted: self.filter_deleted,
151            merge_mode: self.merge_mode,
152            partition_expr_version: self.partition_expr_version,
153        }
154    }
155
156    pub(crate) fn estimated_size(&self) -> usize {
157        mem::size_of::<SharedScanRequestFingerprint>()
158            + self.inner.read_columns.estimated_size()
159            + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
160            + self.inner.filters.capacity() * mem::size_of::<String>()
161            + self
162                .inner
163                .filters
164                .iter()
165                .map(|filter| filter.capacity())
166                .sum::<usize>()
167            + self.time_filters.as_ref().map_or(0, |filters| {
168                mem::size_of::<Vec<String>>()
169                    + filters.capacity() * mem::size_of::<String>()
170                    + filters
171                        .iter()
172                        .map(|filter| filter.capacity())
173                        .sum::<usize>()
174            })
175    }
176}
177
178/// Cache key for range scan outputs.
179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub(crate) struct RangeScanCacheKey {
181    pub(crate) region_id: RegionId,
182    /// Sorted (file_id, row_group_index) pairs that uniquely identify the data this range covers.
183    pub(crate) row_groups: Vec<(FileId, i64)>,
184    pub(crate) scan: ScanRequestFingerprint,
185}
186
187impl RangeScanCacheKey {
188    pub(crate) fn estimated_size(&self) -> usize {
189        mem::size_of::<Self>()
190            + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
191            + self.scan.estimated_size()
192    }
193}
194
195/// Cached result for one range scan.
196#[derive(Debug)]
197pub(crate) struct CachedBatchSlice {
198    batch: RecordBatch,
199    slice_lengths: Vec<usize>,
200}
201
202impl CachedBatchSlice {
203    fn metadata_size(&self) -> usize {
204        self.slice_lengths.capacity() * mem::size_of::<usize>()
205    }
206}
207
208pub(crate) struct RangeScanCacheValue {
209    cached_batches: Vec<CachedBatchSlice>,
210    /// Precomputed size of all compacted batches.
211    estimated_batches_size: usize,
212}
213
214impl RangeScanCacheValue {
215    pub(crate) fn new(
216        cached_batches: Vec<CachedBatchSlice>,
217        estimated_batches_size: usize,
218    ) -> Self {
219        Self {
220            cached_batches,
221            estimated_batches_size,
222        }
223    }
224
225    pub(crate) fn estimated_size(&self) -> usize {
226        mem::size_of::<Self>()
227            + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
228            + self
229                .cached_batches
230                .iter()
231                .map(CachedBatchSlice::metadata_size)
232                .sum::<usize>()
233            + self.estimated_batches_size
234    }
235}
236
237/// Row groups and whether all sources are file-only for a partition range.
238pub(crate) struct PartitionRangeRowGroups {
239    /// Sorted (file_id, row_group_index) pairs.
240    pub(crate) row_groups: Vec<(FileId, i64)>,
241    pub(crate) only_file_sources: bool,
242}
243
244/// Collects (file_id, row_group_index) pairs from a partition range's row group indices.
245pub(crate) fn collect_partition_range_row_groups(
246    stream_ctx: &StreamContext,
247    part_range: &PartitionRange,
248) -> PartitionRangeRowGroups {
249    let range_meta = &stream_ctx.ranges[part_range.identifier];
250    let mut row_groups = Vec::new();
251    let mut only_file_sources = true;
252
253    for index in &range_meta.row_group_indices {
254        if stream_ctx.is_file_range_index(*index) {
255            let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
256            row_groups.push((file_id, index.row_group_index));
257        } else {
258            only_file_sources = false;
259        }
260    }
261
262    row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
263
264    PartitionRangeRowGroups {
265        row_groups,
266        only_file_sources,
267    }
268}
269
270/// Builds a cache key for the given partition range if it is eligible for caching.
271pub(crate) fn build_range_cache_key(
272    stream_ctx: &StreamContext,
273    part_range: &PartitionRange,
274) -> Option<RangeScanCacheKey> {
275    if !stream_ctx.input.cache_strategy.has_range_result_cache() {
276        return None;
277    }
278
279    let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
280
281    // Dyn filters can change at runtime, so we can't cache when they're present.
282    let has_dyn_filters = stream_ctx
283        .input
284        .predicate_group()
285        .predicate_without_region()
286        .is_some_and(|p| !p.dyn_filters().is_empty());
287    if has_dyn_filters {
288        return None;
289    }
290
291    let rg = collect_partition_range_row_groups(stream_ctx, part_range);
292    if !rg.only_file_sources || rg.row_groups.is_empty() {
293        return None;
294    }
295
296    let range_meta = &stream_ctx.ranges[part_range.identifier];
297    let scan = if query_time_range_covers_partition_range(
298        stream_ctx.input.time_range.as_ref(),
299        range_meta.time_range,
300    ) {
301        fingerprint.without_time_filters()
302    } else {
303        fingerprint.clone()
304    };
305
306    Some(RangeScanCacheKey {
307        region_id: stream_ctx.input.region_metadata().region_id,
308        row_groups: rg.row_groups,
309        scan,
310    })
311}
312
313fn query_time_range_covers_partition_range(
314    query_time_range: Option<&TimestampRange>,
315    partition_time_range: FileTimeRange,
316) -> bool {
317    let Some(query_time_range) = query_time_range else {
318        return true;
319    };
320
321    let (part_start, part_end) = partition_time_range;
322    query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
323}
324
325/// Returns a stream that replays cached record batches.
326pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
327    Box::pin(try_stream! {
328        for cached_batch in &value.cached_batches {
329            let mut offset = 0;
330            for &len in &cached_batch.slice_lengths {
331                yield cached_batch.batch.slice(offset, len);
332                offset += len;
333            }
334        }
335    })
336}
337
338enum CacheConcatCommand {
339    Compact(Vec<RecordBatch>),
340    Finish {
341        pending: Vec<RecordBatch>,
342        key: RangeScanCacheKey,
343        cache_strategy: CacheStrategy,
344        part_metrics: PartitionMetrics,
345        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
346    },
347}
348
349#[derive(Default)]
350struct CacheConcatState {
351    cached_batches: Vec<CachedBatchSlice>,
352    estimated_size: usize,
353}
354
355impl CacheConcatState {
356    async fn compact(
357        &mut self,
358        batches: Vec<RecordBatch>,
359        limiter: &crate::cache::RangeResultMemoryLimiter,
360    ) -> Result<()> {
361        if batches.is_empty() {
362            return Ok(());
363        }
364
365        let input_size = batches
366            .iter()
367            .map(RecordBatch::get_array_memory_size)
368            .sum::<usize>();
369        let _permit = limiter.acquire(input_size).await?;
370
371        let compacted = compact_record_batches(batches)?;
372        self.estimated_size += compacted.batch.get_array_memory_size();
373        self.cached_batches.push(compacted);
374        Ok(())
375    }
376
377    fn finish(self) -> RangeScanCacheValue {
378        RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
379    }
380}
381
382fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
383    debug_assert!(!batches.is_empty());
384
385    let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
386    build_cached_batch_slice(batches, slice_lengths)
387}
388
389fn build_cached_batch_slice(
390    batches: Vec<RecordBatch>,
391    slice_lengths: Vec<usize>,
392) -> Result<CachedBatchSlice> {
393    let batch = if batches.len() == 1 {
394        batches.into_iter().next().unwrap()
395    } else {
396        let schema = batches[0].schema();
397        concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
398    };
399
400    Ok(CachedBatchSlice {
401        batch,
402        slice_lengths,
403    })
404}
405
406async fn run_cache_concat_task(
407    mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
408    limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
409    skip_threshold_bytes: usize,
410) {
411    let mut state = CacheConcatState::default();
412
413    while let Some(cmd) = rx.recv().await {
414        match cmd {
415            CacheConcatCommand::Compact(batches) => {
416                if let Err(err) = state.compact(batches, &limiter).await {
417                    warn!(err; "Failed to compact range cache batches");
418                    return;
419                }
420                // Close the channel to stop further work as soon as the cached
421                // size exceeds the configured cache budget.
422                if state.estimated_size > skip_threshold_bytes {
423                    return;
424                }
425            }
426            CacheConcatCommand::Finish {
427                pending,
428                key,
429                cache_strategy,
430                part_metrics,
431                result_tx,
432            } => {
433                let compact_result = state
434                    .compact(pending, &limiter)
435                    .await
436                    .map(|()| state.finish());
437                let result = match compact_result {
438                    Ok(v) => {
439                        let value = Arc::new(v);
440                        part_metrics
441                            .inc_range_cache_size(key.estimated_size() + value.estimated_size());
442                        cache_strategy.put_range_result(key, value.clone());
443
444                        Ok(value)
445                    }
446                    Err(e) => {
447                        warn!(e; "Failed to finalize range cache batches");
448
449                        Err(e)
450                    }
451                };
452
453                if let Some(tx) = result_tx {
454                    let _ = tx.send(result);
455                }
456
457                break;
458            }
459        }
460    }
461}
462
463struct CacheBatchBuffer {
464    buffered_batches: Vec<RecordBatch>,
465    buffered_rows: usize,
466    buffered_size: usize,
467    sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
468}
469
470impl CacheBatchBuffer {
471    fn new(cache_strategy: &CacheStrategy) -> Self {
472        let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
473            let skip_threshold_bytes = cache_strategy.range_result_cache_size().unwrap_or(0);
474            let (tx, rx) = mpsc::unbounded_channel();
475            common_runtime::spawn_global(run_cache_concat_task(
476                rx,
477                limiter.clone(),
478                skip_threshold_bytes,
479            ));
480            tx
481        });
482
483        Self {
484            buffered_batches: Vec::new(),
485            buffered_rows: 0,
486            buffered_size: 0,
487            sender,
488        }
489    }
490
491    fn push(&mut self, batch: RecordBatch) -> Result<()> {
492        if self.sender.is_none() {
493            return Ok(());
494        }
495
496        self.buffered_rows += batch.num_rows();
497        self.buffered_size += batch.get_array_memory_size();
498        self.buffered_batches.push(batch);
499
500        if self.buffered_batches.len() > 1
501            && (self.buffered_rows > DEFAULT_READ_BATCH_SIZE
502                || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES)
503        {
504            self.notify_compact();
505        }
506
507        Ok(())
508    }
509
510    fn notify_compact(&mut self) {
511        if self.buffered_batches.is_empty() || self.sender.is_none() {
512            return;
513        }
514
515        let batches = mem::take(&mut self.buffered_batches);
516        self.buffered_rows = 0;
517        self.buffered_size = 0;
518
519        let Some(sender) = &self.sender else {
520            return;
521        };
522        if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
523            self.sender = None;
524        }
525    }
526
527    fn finish(
528        mut self,
529        key: RangeScanCacheKey,
530        cache_strategy: CacheStrategy,
531        part_metrics: PartitionMetrics,
532        result_tx: Option<oneshot::Sender<Result<Arc<RangeScanCacheValue>>>>,
533    ) {
534        let Some(sender) = self.sender.take() else {
535            return;
536        };
537
538        if sender
539            .send(CacheConcatCommand::Finish {
540                pending: mem::take(&mut self.buffered_batches),
541                key,
542                cache_strategy,
543                part_metrics,
544                result_tx,
545            })
546            .is_err()
547        {
548            self.sender = None;
549        }
550    }
551}
552
553/// Wraps a stream to cache its output for future range cache hits.
554pub(crate) fn cache_flat_range_stream(
555    mut stream: BoxedRecordBatchStream,
556    cache_strategy: CacheStrategy,
557    key: RangeScanCacheKey,
558    part_metrics: PartitionMetrics,
559) -> BoxedRecordBatchStream {
560    Box::pin(try_stream! {
561        let mut buffer = CacheBatchBuffer::new(&cache_strategy);
562        while let Some(batch) = stream.try_next().await? {
563            buffer.push(batch.clone())?;
564            yield batch;
565        }
566
567        buffer.finish(key, cache_strategy, part_metrics, None);
568    })
569}
570
571/// Creates a `cache_flat_range_stream` with dummy internals for benchmarking.
572///
573/// This avoids exposing `RangeScanCacheKey`, `ScanRequestFingerprint`, and
574/// `PartitionMetrics` publicly.
575#[cfg(feature = "test")]
576pub fn bench_cache_flat_range_stream(
577    stream: BoxedRecordBatchStream,
578    cache_size_bytes: u64,
579    region_id: RegionId,
580) -> BoxedRecordBatchStream {
581    use std::time::Instant;
582
583    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
584
585    use crate::region::options::MergeMode;
586
587    let cache_manager = Arc::new(
588        crate::cache::CacheManager::builder()
589            .range_result_cache_size(cache_size_bytes)
590            .build(),
591    );
592    let cache_strategy = CacheStrategy::EnableAll(cache_manager);
593
594    let fingerprint = ScanRequestFingerprintBuilder {
595        read_columns: ReadColumns::from_deduped_column_ids(std::iter::empty()),
596        read_column_types: vec![],
597        filters: vec![],
598        time_filters: vec![],
599        series_row_selector: None,
600        append_mode: false,
601        filter_deleted: false,
602        merge_mode: MergeMode::LastRow,
603        partition_expr_version: 0,
604    }
605    .build();
606
607    let key = RangeScanCacheKey {
608        region_id,
609        row_groups: vec![],
610        scan: fingerprint,
611    };
612
613    let metrics_set = ExecutionPlanMetricsSet::new();
614    let part_metrics =
615        PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
616
617    cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
618}
619
620#[cfg(test)]
621mod tests {
622    use std::sync::Arc;
623    use std::time::Instant;
624
625    use common_time::Timestamp;
626    use common_time::range::TimestampRange;
627    use common_time::timestamp::TimeUnit;
628    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
629    use datafusion_common::ScalarValue;
630    use datafusion_expr::{Expr, col, lit};
631    use smallvec::smallvec;
632    use store_api::storage::{FileId, RegionId};
633
634    use super::*;
635    use crate::cache::CacheManager;
636    use crate::read::flat_projection::FlatProjectionMapper;
637    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
638    use crate::read::scan_region::{PredicateGroup, ScanInput};
639    use crate::test_util::memtable_util::metadata_with_primary_key;
640    use crate::test_util::scheduler_util::SchedulerEnv;
641    use crate::test_util::sst_util::sst_file_handle_with_file_id;
642
643    fn test_cache_strategy() -> CacheStrategy {
644        CacheStrategy::EnableAll(Arc::new(
645            CacheManager::builder()
646                .range_result_cache_size(1024 * 1024)
647                .build(),
648        ))
649    }
650
651    fn test_scan_fingerprint(
652        filters: Vec<String>,
653        time_filters: Vec<String>,
654        series_row_selector: Option<TimeSeriesRowSelector>,
655        filter_deleted: bool,
656        partition_expr_version: u64,
657    ) -> ScanRequestFingerprint {
658        let read_columns = ReadColumns::from_deduped_column_ids([1, 2]);
659        ScanRequestFingerprintBuilder {
660            read_columns,
661            read_column_types: vec![None, None],
662            filters,
663            time_filters,
664            series_row_selector,
665            append_mode: false,
666            filter_deleted,
667            merge_mode: MergeMode::LastRow,
668            partition_expr_version,
669        }
670        .build()
671    }
672
673    fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
674        let region_id = RegionId::new(1, 1);
675        let key = RangeScanCacheKey {
676            region_id,
677            row_groups: vec![],
678            scan: test_scan_fingerprint(vec![], vec![], None, false, 0),
679        };
680
681        let metrics_set = ExecutionPlanMetricsSet::new();
682        let part_metrics =
683            PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
684
685        assert!(strategy.get_range_result(&key).is_none());
686        (key, part_metrics)
687    }
688
689    async fn finish_cache_batch_buffer(
690        buffer: CacheBatchBuffer,
691        key: RangeScanCacheKey,
692        cache_strategy: CacheStrategy,
693        part_metrics: PartitionMetrics,
694    ) -> Result<Arc<RangeScanCacheValue>> {
695        let (tx, rx) = oneshot::channel();
696        common_telemetry::info!("finish start");
697        buffer.finish(key, cache_strategy, part_metrics, Some(tx));
698        common_telemetry::info!("finish end");
699        rx.await.context(crate::error::RecvSnafu)?
700    }
701
702    async fn new_stream_context(
703        filters: Vec<Expr>,
704        query_time_range: Option<TimestampRange>,
705        partition_time_range: FileTimeRange,
706    ) -> (StreamContext, PartitionRange) {
707        let env = SchedulerEnv::new().await;
708        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
709        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
710        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
711        let file_id = FileId::random();
712        let file = sst_file_handle_with_file_id(
713            file_id,
714            partition_time_range.0.value(),
715            partition_time_range.1.value(),
716        );
717        let input = ScanInput::new(env.access_layer.clone(), mapper)
718            .with_predicate(predicate)
719            .with_time_range(query_time_range)
720            .with_files(vec![file])
721            .with_cache(test_cache_strategy());
722        let range_meta = RangeMeta {
723            time_range: partition_time_range,
724            indices: smallvec![SourceIndex {
725                index: 0,
726                num_row_groups: 1,
727            }],
728            row_group_indices: smallvec![RowGroupIndex {
729                index: 0,
730                row_group_index: 0,
731            }],
732            num_rows: 10,
733        };
734        let partition_range = range_meta.new_partition_range(0);
735        let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
736        let stream_ctx = StreamContext {
737            input,
738            ranges: vec![range_meta],
739            scan_fingerprint,
740            query_start: Instant::now(),
741        };
742
743        (stream_ctx, partition_range)
744    }
745
746    /// Helper to create a timestamp millisecond literal.
747    fn ts_lit(val: i64) -> Expr {
748        lit(ScalarValue::TimestampMillisecond(Some(val), None))
749    }
750
751    fn normalized_exprs(exprs: impl IntoIterator<Item = Expr>) -> Vec<String> {
752        let mut exprs = exprs
753            .into_iter()
754            .map(|expr| expr.to_string())
755            .collect::<Vec<_>>();
756        exprs.sort_unstable();
757        exprs
758    }
759
760    async fn assert_range_cache_filters(
761        filters: Vec<Expr>,
762        query_time_range: Option<TimestampRange>,
763        partition_time_range: FileTimeRange,
764        expected_filters: Vec<Expr>,
765        expected_time_filters: Vec<Expr>,
766    ) {
767        let (stream_ctx, part_range) =
768            new_stream_context(filters, query_time_range, partition_time_range).await;
769
770        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
771
772        assert_eq!(
773            key.scan.filters(),
774            normalized_exprs(expected_filters).as_slice()
775        );
776        assert_eq!(
777            key.scan.time_filters(),
778            normalized_exprs(expected_time_filters).as_slice()
779        );
780    }
781
782    #[tokio::test]
783    async fn strips_time_only_filters_when_query_covers_partition_range() {
784        assert_range_cache_filters(
785            vec![
786                col("ts").gt_eq(ts_lit(1000)),
787                col("ts").lt(ts_lit(2001)),
788                col("ts").is_not_null(),
789                col("k0").eq(lit("foo")),
790            ],
791            TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
792            (
793                Timestamp::new_millisecond(1000),
794                Timestamp::new_millisecond(2000),
795            ),
796            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
797            vec![],
798        )
799        .await;
800    }
801
802    #[tokio::test]
803    async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
804        assert_range_cache_filters(
805            vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
806            TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
807            (
808                Timestamp::new_millisecond(1000),
809                Timestamp::new_millisecond(2000),
810            ),
811            vec![col("k0").eq(lit("foo"))],
812            vec![col("ts").gt_eq(ts_lit(1000))],
813        )
814        .await;
815    }
816
817    #[tokio::test]
818    async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
819        assert_range_cache_filters(
820            vec![
821                col("ts").gt_eq(ts_lit(1000)),
822                col("ts").is_not_null(),
823                col("k0").eq(lit("foo")),
824            ],
825            None,
826            (
827                Timestamp::new_millisecond(1000),
828                Timestamp::new_millisecond(2000),
829            ),
830            vec![col("k0").eq(lit("foo")), col("ts").is_not_null()],
831            vec![],
832        )
833        .await;
834    }
835
836    #[test]
837    fn normalizes_and_clears_time_filters() {
838        let normalized =
839            test_scan_fingerprint(vec!["k0 = 'foo'".to_string()], vec![], None, true, 0);
840
841        assert!(normalized.time_filters().is_empty());
842
843        let fingerprint = test_scan_fingerprint(
844            vec!["k0 = 'foo'".to_string()],
845            vec!["ts >= 1000".to_string()],
846            Some(TimeSeriesRowSelector::LastRow),
847            true,
848            7,
849        );
850
851        let reset = fingerprint.without_time_filters();
852
853        assert_eq!(reset.read_columns(), fingerprint.read_columns());
854        assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
855        assert_eq!(reset.filters(), fingerprint.filters());
856        assert!(reset.time_filters().is_empty());
857        assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
858        assert_eq!(reset.append_mode, fingerprint.append_mode);
859        assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
860        assert_eq!(reset.merge_mode, fingerprint.merge_mode);
861        assert_eq!(
862            reset.partition_expr_version,
863            fingerprint.partition_expr_version
864        );
865    }
866
867    fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
868        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
869
870        Arc::new(Schema::new(vec![Field::new(
871            "value",
872            ArrowDataType::Int64,
873            false,
874        )]))
875    }
876
877    fn make_batch(values: &[i64]) -> RecordBatch {
878        use datatypes::arrow::array::Int64Array;
879
880        RecordBatch::try_new(
881            test_schema(),
882            vec![Arc::new(Int64Array::from(values.to_vec()))],
883        )
884        .unwrap()
885    }
886
887    fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
888        use datatypes::arrow::array::BinaryArray;
889        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
890
891        let schema = Arc::new(Schema::new(vec![Field::new(
892            "value",
893            ArrowDataType::Binary,
894            false,
895        )]));
896        let payload = vec![b'x'; bytes_per_row];
897        let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
898
899        RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
900    }
901
902    #[test]
903    fn compact_record_batches_keeps_original_boundaries() {
904        let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
905
906        let compacted = compact_record_batches(batches).unwrap();
907
908        assert_eq!(compacted.batch.num_rows(), 5);
909        assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
910    }
911
912    #[tokio::test]
913    async fn cached_flat_range_stream_replays_original_batches() {
914        let value = Arc::new(RangeScanCacheValue::new(
915            vec![CachedBatchSlice {
916                batch: make_batch(&[1, 2, 3]),
917                slice_lengths: vec![2, 1],
918            }],
919            make_batch(&[1, 2, 3]).get_array_memory_size(),
920        ));
921
922        let replayed = cached_flat_range_stream(value)
923            .try_collect::<Vec<_>>()
924            .await
925            .unwrap();
926
927        assert_eq!(replayed.len(), 2);
928        assert_eq!(replayed[0].num_rows(), 2);
929        assert_eq!(replayed[1].num_rows(), 1);
930    }
931
932    #[tokio::test]
933    async fn cache_batch_buffer_finishes_pending_batches() {
934        let strategy = test_cache_strategy();
935        let batch = make_batch(&[1, 2, 3]);
936        let expected_size = batch.get_array_memory_size();
937        let (key, part_metrics) = test_cache_context(&strategy);
938
939        let mut buffer = CacheBatchBuffer::new(&strategy);
940        buffer.push(batch).unwrap();
941
942        let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
943            .await
944            .unwrap();
945        assert_eq!(value.cached_batches.len(), 1);
946        assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
947        assert_eq!(value.estimated_batches_size, expected_size);
948        assert!(Arc::ptr_eq(
949            &value,
950            &strategy.get_range_result(&key).unwrap()
951        ));
952    }
953
954    #[tokio::test]
955    async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
956        let strategy = test_cache_strategy();
957        let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
958        let (key, part_metrics) = test_cache_context(&strategy);
959
960        let mut buffer = CacheBatchBuffer::new(&strategy);
961        buffer.push(batch.clone()).unwrap();
962        buffer.push(batch).unwrap();
963
964        assert_eq!(buffer.buffered_rows, 0);
965        assert!(buffer.buffered_batches.is_empty());
966
967        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
968            .await
969            .unwrap();
970        assert_eq!(value.cached_batches.len(), 1);
971        assert_eq!(
972            value.cached_batches[0].slice_lengths,
973            vec![
974                DEFAULT_READ_BATCH_SIZE / 2 + 1,
975                DEFAULT_READ_BATCH_SIZE / 2 + 1
976            ]
977        );
978    }
979
980    #[tokio::test]
981    async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
982        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
983        let strategy = CacheStrategy::EnableAll(Arc::new(
984            CacheManager::builder()
985                .range_result_cache_size((large_batch.get_array_memory_size() * 3) as u64)
986                .build(),
987        ));
988        let (key, part_metrics) = test_cache_context(&strategy);
989
990        let mut buffer = CacheBatchBuffer::new(&strategy);
991        buffer.push(large_batch.clone()).unwrap();
992
993        assert_eq!(buffer.buffered_rows, large_batch.num_rows());
994        assert_eq!(buffer.buffered_batches.len(), 1);
995
996        buffer.push(large_batch.clone()).unwrap();
997
998        assert_eq!(buffer.buffered_rows, 0);
999        assert!(buffer.buffered_batches.is_empty());
1000
1001        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1002            .await
1003            .unwrap();
1004        assert_eq!(value.cached_batches.len(), 1);
1005        assert_eq!(
1006            value.cached_batches[0].slice_lengths,
1007            vec![large_batch.num_rows(), large_batch.num_rows()]
1008        );
1009    }
1010
1011    #[tokio::test]
1012    async fn cache_batch_buffer_skips_cache_when_compacted_size_exceeds_limit() {
1013        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE / 2 + 1, 4096);
1014        // Budget only fits two large batches.
1015        let budget = (large_batch.get_array_memory_size() as u64) * 2 + 1;
1016        let strategy = CacheStrategy::EnableAll(Arc::new(
1017            CacheManager::builder()
1018                .range_result_cache_size(budget)
1019                .build(),
1020        ));
1021        let (key, part_metrics) = test_cache_context(&strategy);
1022
1023        let mut buffer = CacheBatchBuffer::new(&strategy);
1024        for _ in 0..4 {
1025            buffer.push(large_batch.clone()).unwrap();
1026        }
1027        assert!(
1028            finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
1029                .await
1030                .is_err()
1031        );
1032        assert!(strategy.get_range_result(&key).is_none());
1033    }
1034}