Skip to main content

mito2/read/
range_cache.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for the partition range scan result cache.
16
17use std::mem;
18use std::sync::Arc;
19
20use async_stream::try_stream;
21use common_telemetry::warn;
22use common_time::range::TimestampRange;
23use datatypes::arrow::compute::concat_batches;
24use datatypes::arrow::record_batch::RecordBatch;
25use datatypes::prelude::ConcreteDataType;
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::region_engine::PartitionRange;
29use store_api::storage::{ColumnId, FileId, RegionId, TimeSeriesRowSelector};
30use tokio::sync::{mpsc, oneshot};
31
32use crate::cache::CacheStrategy;
33use crate::error::{ComputeArrowSnafu, Result, UnexpectedSnafu};
34use crate::read::BoxedRecordBatchStream;
35use crate::read::scan_region::StreamContext;
36use crate::read::scan_util::PartitionMetrics;
37use crate::region::options::MergeMode;
38use crate::sst::file::FileTimeRange;
39use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
40
41const RANGE_CACHE_COMPACT_THRESHOLD_BYTES: usize = 2 * 1024 * 1024;
42const RANGE_CACHE_SKIP_BYTES: usize = 512 * 1024 * 1024;
43
44/// Fingerprint of the scan request fields that affect partition range cache reuse.
45///
46/// It records a normalized view of the projected columns and filters, plus
47/// scan options that can change the returned rows. Schema-dependent metadata
48/// and the partition expression version are included so cached results are not
49/// reused across incompatible schema or partitioning changes.
50#[derive(Debug, Clone, PartialEq, Eq, Hash)]
51pub(crate) struct ScanRequestFingerprint {
52    /// Projection and filters without the time index and partition exprs.
53    inner: Arc<SharedScanRequestFingerprint>,
54    /// Filters with the time index column.
55    time_filters: Option<Arc<Vec<String>>>,
56    series_row_selector: Option<TimeSeriesRowSelector>,
57    append_mode: bool,
58    filter_deleted: bool,
59    merge_mode: MergeMode,
60    /// We keep the partition expr version to ensure we won't reuse the fingerprint after we change the partition expr.
61    /// We store the version instead of the whole partition expr or partition expr filters.
62    partition_expr_version: u64,
63}
64
65#[derive(Debug)]
66pub(crate) struct ScanRequestFingerprintBuilder {
67    pub(crate) read_column_ids: Vec<ColumnId>,
68    pub(crate) read_column_types: Vec<Option<ConcreteDataType>>,
69    pub(crate) filters: Vec<String>,
70    pub(crate) time_filters: Vec<String>,
71    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
72    pub(crate) append_mode: bool,
73    pub(crate) filter_deleted: bool,
74    pub(crate) merge_mode: MergeMode,
75    pub(crate) partition_expr_version: u64,
76}
77
78impl ScanRequestFingerprintBuilder {
79    pub(crate) fn build(self) -> ScanRequestFingerprint {
80        let Self {
81            read_column_ids,
82            read_column_types,
83            filters,
84            time_filters,
85            series_row_selector,
86            append_mode,
87            filter_deleted,
88            merge_mode,
89            partition_expr_version,
90        } = self;
91
92        ScanRequestFingerprint {
93            inner: Arc::new(SharedScanRequestFingerprint {
94                read_column_ids,
95                read_column_types,
96                filters,
97            }),
98            time_filters: (!time_filters.is_empty()).then(|| Arc::new(time_filters)),
99            series_row_selector,
100            append_mode,
101            filter_deleted,
102            merge_mode,
103            partition_expr_version,
104        }
105    }
106}
107
108/// Non-copiable struct of the fingerprint.
109#[derive(Debug, PartialEq, Eq, Hash)]
110struct SharedScanRequestFingerprint {
111    /// Column ids of the projection.
112    read_column_ids: Vec<ColumnId>,
113    /// Column types of the projection.
114    /// We keep this to ensure we won't reuse the fingerprint after a schema change.
115    read_column_types: Vec<Option<ConcreteDataType>>,
116    /// Filters without the time index column and region partition exprs.
117    filters: Vec<String>,
118}
119
120impl ScanRequestFingerprint {
121    #[cfg(test)]
122    pub(crate) fn read_column_ids(&self) -> &[ColumnId] {
123        &self.inner.read_column_ids
124    }
125
126    #[cfg(test)]
127    pub(crate) fn read_column_types(&self) -> &[Option<ConcreteDataType>] {
128        &self.inner.read_column_types
129    }
130
131    #[cfg(test)]
132    pub(crate) fn filters(&self) -> &[String] {
133        &self.inner.filters
134    }
135
136    #[cfg(test)]
137    pub(crate) fn time_filters(&self) -> &[String] {
138        self.time_filters
139            .as_deref()
140            .map(Vec::as_slice)
141            .unwrap_or(&[])
142    }
143
144    pub(crate) fn without_time_filters(&self) -> Self {
145        Self {
146            inner: Arc::clone(&self.inner),
147            time_filters: None,
148            series_row_selector: self.series_row_selector,
149            append_mode: self.append_mode,
150            filter_deleted: self.filter_deleted,
151            merge_mode: self.merge_mode,
152            partition_expr_version: self.partition_expr_version,
153        }
154    }
155
156    pub(crate) fn estimated_size(&self) -> usize {
157        mem::size_of::<SharedScanRequestFingerprint>()
158            + self.inner.read_column_ids.capacity() * mem::size_of::<ColumnId>()
159            + self.inner.read_column_types.capacity() * mem::size_of::<Option<ConcreteDataType>>()
160            + self.inner.filters.capacity() * mem::size_of::<String>()
161            + self
162                .inner
163                .filters
164                .iter()
165                .map(|filter| filter.capacity())
166                .sum::<usize>()
167            + self.time_filters.as_ref().map_or(0, |filters| {
168                mem::size_of::<Vec<String>>()
169                    + filters.capacity() * mem::size_of::<String>()
170                    + filters
171                        .iter()
172                        .map(|filter| filter.capacity())
173                        .sum::<usize>()
174            })
175    }
176}
177
178/// Cache key for range scan outputs.
179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub(crate) struct RangeScanCacheKey {
181    pub(crate) region_id: RegionId,
182    /// Sorted (file_id, row_group_index) pairs that uniquely identify the data this range covers.
183    pub(crate) row_groups: Vec<(FileId, i64)>,
184    pub(crate) scan: ScanRequestFingerprint,
185}
186
187impl RangeScanCacheKey {
188    pub(crate) fn estimated_size(&self) -> usize {
189        mem::size_of::<Self>()
190            + self.row_groups.capacity() * mem::size_of::<(FileId, i64)>()
191            + self.scan.estimated_size()
192    }
193}
194
195/// Cached result for one range scan.
196#[derive(Debug)]
197pub(crate) struct CachedBatchSlice {
198    batch: RecordBatch,
199    slice_lengths: Vec<usize>,
200}
201
202impl CachedBatchSlice {
203    fn metadata_size(&self) -> usize {
204        self.slice_lengths.capacity() * mem::size_of::<usize>()
205    }
206}
207
208pub(crate) struct RangeScanCacheValue {
209    cached_batches: Vec<CachedBatchSlice>,
210    /// Precomputed size of all compacted batches.
211    estimated_batches_size: usize,
212}
213
214impl RangeScanCacheValue {
215    pub(crate) fn new(
216        cached_batches: Vec<CachedBatchSlice>,
217        estimated_batches_size: usize,
218    ) -> Self {
219        Self {
220            cached_batches,
221            estimated_batches_size,
222        }
223    }
224
225    pub(crate) fn estimated_size(&self) -> usize {
226        mem::size_of::<Self>()
227            + self.cached_batches.capacity() * mem::size_of::<CachedBatchSlice>()
228            + self
229                .cached_batches
230                .iter()
231                .map(CachedBatchSlice::metadata_size)
232                .sum::<usize>()
233            + self.estimated_batches_size
234    }
235}
236
237/// Row groups and whether all sources are file-only for a partition range.
238pub(crate) struct PartitionRangeRowGroups {
239    /// Sorted (file_id, row_group_index) pairs.
240    pub(crate) row_groups: Vec<(FileId, i64)>,
241    pub(crate) only_file_sources: bool,
242}
243
244/// Collects (file_id, row_group_index) pairs from a partition range's row group indices.
245pub(crate) fn collect_partition_range_row_groups(
246    stream_ctx: &StreamContext,
247    part_range: &PartitionRange,
248) -> PartitionRangeRowGroups {
249    let range_meta = &stream_ctx.ranges[part_range.identifier];
250    let mut row_groups = Vec::new();
251    let mut only_file_sources = true;
252
253    for index in &range_meta.row_group_indices {
254        if stream_ctx.is_file_range_index(*index) {
255            let file_id = stream_ctx.input.file_from_index(*index).file_id().file_id();
256            row_groups.push((file_id, index.row_group_index));
257        } else {
258            only_file_sources = false;
259        }
260    }
261
262    row_groups.sort_unstable_by(|a, b| a.0.as_bytes().cmp(b.0.as_bytes()).then(a.1.cmp(&b.1)));
263
264    PartitionRangeRowGroups {
265        row_groups,
266        only_file_sources,
267    }
268}
269
270/// Builds a cache key for the given partition range if it is eligible for caching.
271pub(crate) fn build_range_cache_key(
272    stream_ctx: &StreamContext,
273    part_range: &PartitionRange,
274) -> Option<RangeScanCacheKey> {
275    if !stream_ctx.input.cache_strategy.has_range_result_cache() {
276        return None;
277    }
278
279    let fingerprint = stream_ctx.scan_fingerprint.as_ref()?;
280
281    // Dyn filters can change at runtime, so we can't cache when they're present.
282    let has_dyn_filters = stream_ctx
283        .input
284        .predicate_group()
285        .predicate_without_region()
286        .is_some_and(|p| !p.dyn_filters().is_empty());
287    if has_dyn_filters {
288        return None;
289    }
290
291    let rg = collect_partition_range_row_groups(stream_ctx, part_range);
292    if !rg.only_file_sources || rg.row_groups.is_empty() {
293        return None;
294    }
295
296    let range_meta = &stream_ctx.ranges[part_range.identifier];
297    let scan = if query_time_range_covers_partition_range(
298        stream_ctx.input.time_range.as_ref(),
299        range_meta.time_range,
300    ) {
301        fingerprint.without_time_filters()
302    } else {
303        fingerprint.clone()
304    };
305
306    Some(RangeScanCacheKey {
307        region_id: stream_ctx.input.region_metadata().region_id,
308        row_groups: rg.row_groups,
309        scan,
310    })
311}
312
313fn query_time_range_covers_partition_range(
314    query_time_range: Option<&TimestampRange>,
315    partition_time_range: FileTimeRange,
316) -> bool {
317    let Some(query_time_range) = query_time_range else {
318        return true;
319    };
320
321    let (part_start, part_end) = partition_time_range;
322    query_time_range.contains(&part_start) && query_time_range.contains(&part_end)
323}
324
325/// Returns a stream that replays cached record batches.
326pub(crate) fn cached_flat_range_stream(value: Arc<RangeScanCacheValue>) -> BoxedRecordBatchStream {
327    Box::pin(try_stream! {
328        for cached_batch in &value.cached_batches {
329            let mut offset = 0;
330            for &len in &cached_batch.slice_lengths {
331                yield cached_batch.batch.slice(offset, len);
332                offset += len;
333            }
334        }
335    })
336}
337
338enum CacheConcatCommand {
339    Compact(Vec<RecordBatch>),
340    Finish {
341        pending: Vec<RecordBatch>,
342        key: RangeScanCacheKey,
343        cache_strategy: CacheStrategy,
344        part_metrics: PartitionMetrics,
345        result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
346    },
347}
348
349#[derive(Default)]
350struct CacheConcatState {
351    cached_batches: Vec<CachedBatchSlice>,
352    estimated_size: usize,
353}
354
355impl CacheConcatState {
356    async fn compact(
357        &mut self,
358        batches: Vec<RecordBatch>,
359        limiter: &crate::cache::RangeResultMemoryLimiter,
360    ) -> Result<()> {
361        if batches.is_empty() {
362            return Ok(());
363        }
364
365        let input_size = batches
366            .iter()
367            .map(RecordBatch::get_array_memory_size)
368            .sum::<usize>();
369        let _permit = limiter.acquire(input_size).await.map_err(|_| {
370            UnexpectedSnafu {
371                reason: "range result memory limiter is unexpectedly closed",
372            }
373            .build()
374        })?;
375
376        let compacted = compact_record_batches(batches)?;
377        self.estimated_size += compacted.batch.get_array_memory_size();
378        self.cached_batches.push(compacted);
379        Ok(())
380    }
381
382    fn finish(self) -> RangeScanCacheValue {
383        RangeScanCacheValue::new(self.cached_batches, self.estimated_size)
384    }
385}
386
387fn compact_record_batches(batches: Vec<RecordBatch>) -> Result<CachedBatchSlice> {
388    debug_assert!(!batches.is_empty());
389
390    let slice_lengths = batches.iter().map(RecordBatch::num_rows).collect();
391    build_cached_batch_slice(batches, slice_lengths)
392}
393
394fn build_cached_batch_slice(
395    batches: Vec<RecordBatch>,
396    slice_lengths: Vec<usize>,
397) -> Result<CachedBatchSlice> {
398    let batch = if batches.len() == 1 {
399        batches.into_iter().next().unwrap()
400    } else {
401        let schema = batches[0].schema();
402        concat_batches(&schema, &batches).context(ComputeArrowSnafu)?
403    };
404
405    Ok(CachedBatchSlice {
406        batch,
407        slice_lengths,
408    })
409}
410
411async fn run_cache_concat_task(
412    mut rx: mpsc::UnboundedReceiver<CacheConcatCommand>,
413    limiter: Arc<crate::cache::RangeResultMemoryLimiter>,
414) {
415    let mut state = CacheConcatState::default();
416
417    while let Some(cmd) = rx.recv().await {
418        match cmd {
419            CacheConcatCommand::Compact(batches) => {
420                if let Err(err) = state.compact(batches, &limiter).await {
421                    warn!(err; "Failed to compact range cache batches");
422                    return;
423                }
424            }
425            CacheConcatCommand::Finish {
426                pending,
427                key,
428                cache_strategy,
429                part_metrics,
430                result_tx,
431            } => {
432                let result = state
433                    .compact(pending, &limiter)
434                    .await
435                    .map(|()| state.finish());
436                if let Err(err) = &result {
437                    warn!(err; "Failed to finalize range cache batches");
438                }
439
440                let value = result.ok().map(Arc::new);
441                if let Some(value) = &value {
442                    part_metrics
443                        .inc_range_cache_size(key.estimated_size() + value.estimated_size());
444                    cache_strategy.put_range_result(key, value.clone());
445                }
446                if let Some(tx) = result_tx {
447                    let _ = tx.send(value);
448                }
449                return;
450            }
451        }
452    }
453}
454
455struct CacheBatchBuffer {
456    buffered_batches: Vec<RecordBatch>,
457    buffered_rows: usize,
458    buffered_size: usize,
459    total_weight: usize,
460    sender: Option<mpsc::UnboundedSender<CacheConcatCommand>>,
461}
462
463impl CacheBatchBuffer {
464    fn new(cache_strategy: &CacheStrategy) -> Self {
465        let sender = cache_strategy.range_result_memory_limiter().map(|limiter| {
466            let (tx, rx) = mpsc::unbounded_channel();
467            common_runtime::spawn_global(run_cache_concat_task(rx, limiter.clone()));
468            tx
469        });
470
471        Self {
472            buffered_batches: Vec::new(),
473            buffered_rows: 0,
474            buffered_size: 0,
475            total_weight: 0,
476            sender,
477        }
478    }
479
480    fn push(&mut self, batch: RecordBatch) -> Result<()> {
481        if self.sender.is_none() {
482            return Ok(());
483        }
484
485        let batch_size = batch.get_array_memory_size();
486        self.total_weight += batch_size;
487        if self.total_weight > RANGE_CACHE_SKIP_BYTES {
488            self.buffered_batches.clear();
489            self.buffered_rows = 0;
490            self.buffered_size = 0;
491            self.sender = None;
492            return Ok(());
493        }
494
495        self.buffered_rows += batch.num_rows();
496        self.buffered_size += batch_size;
497        self.buffered_batches.push(batch);
498
499        if self.buffered_rows > DEFAULT_READ_BATCH_SIZE
500            || self.buffered_size > RANGE_CACHE_COMPACT_THRESHOLD_BYTES
501        {
502            self.notify_compact();
503        }
504
505        Ok(())
506    }
507
508    fn notify_compact(&mut self) {
509        if self.buffered_batches.is_empty() || self.sender.is_none() {
510            return;
511        }
512
513        let batches = mem::take(&mut self.buffered_batches);
514        self.buffered_rows = 0;
515        self.buffered_size = 0;
516
517        let Some(sender) = &self.sender else {
518            return;
519        };
520        if sender.send(CacheConcatCommand::Compact(batches)).is_err() {
521            self.sender = None;
522        }
523    }
524
525    fn finish(
526        mut self,
527        key: RangeScanCacheKey,
528        cache_strategy: CacheStrategy,
529        part_metrics: PartitionMetrics,
530        result_tx: Option<oneshot::Sender<Option<Arc<RangeScanCacheValue>>>>,
531    ) {
532        let Some(sender) = self.sender.take() else {
533            return;
534        };
535
536        if sender
537            .send(CacheConcatCommand::Finish {
538                pending: mem::take(&mut self.buffered_batches),
539                key,
540                cache_strategy,
541                part_metrics,
542                result_tx,
543            })
544            .is_err()
545        {
546            self.sender = None;
547        }
548    }
549}
550
551/// Wraps a stream to cache its output for future range cache hits.
552pub(crate) fn cache_flat_range_stream(
553    mut stream: BoxedRecordBatchStream,
554    cache_strategy: CacheStrategy,
555    key: RangeScanCacheKey,
556    part_metrics: PartitionMetrics,
557) -> BoxedRecordBatchStream {
558    Box::pin(try_stream! {
559        let mut buffer = CacheBatchBuffer::new(&cache_strategy);
560        while let Some(batch) = stream.try_next().await? {
561            buffer.push(batch.clone())?;
562            yield batch;
563        }
564
565        buffer.finish(key, cache_strategy, part_metrics, None);
566    })
567}
568
569/// Creates a `cache_flat_range_stream` with dummy internals for benchmarking.
570///
571/// This avoids exposing `RangeScanCacheKey`, `ScanRequestFingerprint`, and
572/// `PartitionMetrics` publicly.
573#[cfg(feature = "test")]
574pub fn bench_cache_flat_range_stream(
575    stream: BoxedRecordBatchStream,
576    cache_size_bytes: u64,
577    region_id: RegionId,
578) -> BoxedRecordBatchStream {
579    use std::time::Instant;
580
581    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
582
583    use crate::region::options::MergeMode;
584
585    let cache_manager = Arc::new(
586        crate::cache::CacheManager::builder()
587            .range_result_cache_size(cache_size_bytes)
588            .build(),
589    );
590    let cache_strategy = CacheStrategy::EnableAll(cache_manager);
591
592    let fingerprint = ScanRequestFingerprintBuilder {
593        read_column_ids: vec![],
594        read_column_types: vec![],
595        filters: vec![],
596        time_filters: vec![],
597        series_row_selector: None,
598        append_mode: false,
599        filter_deleted: false,
600        merge_mode: MergeMode::LastRow,
601        partition_expr_version: 0,
602    }
603    .build();
604
605    let key = RangeScanCacheKey {
606        region_id,
607        row_groups: vec![],
608        scan: fingerprint,
609    };
610
611    let metrics_set = ExecutionPlanMetricsSet::new();
612    let part_metrics =
613        PartitionMetrics::new(region_id, 0, "bench", Instant::now(), false, &metrics_set);
614
615    cache_flat_range_stream(stream, cache_strategy, key, part_metrics)
616}
617
618#[cfg(test)]
619mod tests {
620    use std::sync::Arc;
621    use std::time::Instant;
622
623    use common_time::Timestamp;
624    use common_time::range::TimestampRange;
625    use common_time::timestamp::TimeUnit;
626    use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
627    use datafusion_common::ScalarValue;
628    use datafusion_expr::{Expr, col, lit};
629    use smallvec::smallvec;
630    use store_api::storage::{FileId, RegionId};
631
632    use super::*;
633    use crate::cache::CacheManager;
634    use crate::read::projection::ProjectionMapper;
635    use crate::read::range::{RangeMeta, RowGroupIndex, SourceIndex};
636    use crate::read::scan_region::{PredicateGroup, ScanInput};
637    use crate::test_util::memtable_util::metadata_with_primary_key;
638    use crate::test_util::scheduler_util::SchedulerEnv;
639    use crate::test_util::sst_util::sst_file_handle_with_file_id;
640
641    fn test_cache_strategy() -> CacheStrategy {
642        CacheStrategy::EnableAll(Arc::new(
643            CacheManager::builder()
644                .range_result_cache_size(1024)
645                .build(),
646        ))
647    }
648
649    fn test_cache_context(strategy: &CacheStrategy) -> (RangeScanCacheKey, PartitionMetrics) {
650        let region_id = RegionId::new(1, 1);
651        let key = RangeScanCacheKey {
652            region_id,
653            row_groups: vec![],
654            scan: ScanRequestFingerprintBuilder {
655                read_column_ids: vec![],
656                read_column_types: vec![],
657                filters: vec![],
658                time_filters: vec![],
659                series_row_selector: None,
660                append_mode: false,
661                filter_deleted: false,
662                merge_mode: MergeMode::LastRow,
663                partition_expr_version: 0,
664            }
665            .build(),
666        };
667
668        let metrics_set = ExecutionPlanMetricsSet::new();
669        let part_metrics =
670            PartitionMetrics::new(region_id, 0, "test", Instant::now(), false, &metrics_set);
671
672        assert!(strategy.get_range_result(&key).is_none());
673        (key, part_metrics)
674    }
675
676    async fn finish_cache_batch_buffer(
677        buffer: CacheBatchBuffer,
678        key: RangeScanCacheKey,
679        cache_strategy: CacheStrategy,
680        part_metrics: PartitionMetrics,
681    ) -> Option<Arc<RangeScanCacheValue>> {
682        let (tx, rx) = oneshot::channel();
683        buffer.finish(key, cache_strategy, part_metrics, Some(tx));
684        rx.await.context(crate::error::RecvSnafu).ok().flatten()
685    }
686
687    async fn new_stream_context(
688        filters: Vec<Expr>,
689        query_time_range: Option<TimestampRange>,
690        partition_time_range: FileTimeRange,
691    ) -> (StreamContext, PartitionRange) {
692        let env = SchedulerEnv::new().await;
693        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
694        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
695        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
696        let file_id = FileId::random();
697        let file = sst_file_handle_with_file_id(
698            file_id,
699            partition_time_range.0.value(),
700            partition_time_range.1.value(),
701        );
702        let input = ScanInput::new(env.access_layer.clone(), mapper)
703            .with_predicate(predicate)
704            .with_time_range(query_time_range)
705            .with_files(vec![file])
706            .with_cache(test_cache_strategy());
707        let range_meta = RangeMeta {
708            time_range: partition_time_range,
709            indices: smallvec![SourceIndex {
710                index: 0,
711                num_row_groups: 1,
712            }],
713            row_group_indices: smallvec![RowGroupIndex {
714                index: 0,
715                row_group_index: 0,
716            }],
717            num_rows: 10,
718        };
719        let partition_range = range_meta.new_partition_range(0);
720        let scan_fingerprint = crate::read::scan_region::build_scan_fingerprint(&input);
721        let stream_ctx = StreamContext {
722            input,
723            ranges: vec![range_meta],
724            scan_fingerprint,
725            query_start: Instant::now(),
726        };
727
728        (stream_ctx, partition_range)
729    }
730
731    /// Helper to create a timestamp millisecond literal.
732    fn ts_lit(val: i64) -> Expr {
733        lit(ScalarValue::TimestampMillisecond(Some(val), None))
734    }
735
736    #[tokio::test]
737    async fn strips_time_only_filters_when_query_covers_partition_range() {
738        let (stream_ctx, part_range) = new_stream_context(
739            vec![
740                col("ts").gt_eq(ts_lit(1000)),
741                col("ts").lt(ts_lit(2001)),
742                col("ts").is_not_null(),
743                col("k0").eq(lit("foo")),
744            ],
745            TimestampRange::with_unit(1000, 2002, TimeUnit::Millisecond),
746            (
747                Timestamp::new_millisecond(1000),
748                Timestamp::new_millisecond(2000),
749            ),
750        )
751        .await;
752
753        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
754
755        // Range-reducible time filters should be cleared when query covers partition range.
756        assert!(key.scan.time_filters().is_empty());
757        // Non-range time predicates stay in filters.
758        let mut expected_filters = [
759            col("k0").eq(lit("foo")).to_string(),
760            col("ts").is_not_null().to_string(),
761        ];
762        expected_filters.sort_unstable();
763        assert_eq!(key.scan.filters(), expected_filters.as_slice());
764    }
765
766    #[tokio::test]
767    async fn preserves_time_filters_when_query_does_not_cover_partition_range() {
768        let (stream_ctx, part_range) = new_stream_context(
769            vec![col("ts").gt_eq(ts_lit(1000)), col("k0").eq(lit("foo"))],
770            TimestampRange::with_unit(1000, 1500, TimeUnit::Millisecond),
771            (
772                Timestamp::new_millisecond(1000),
773                Timestamp::new_millisecond(2000),
774            ),
775        )
776        .await;
777
778        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
779
780        // Time filters should be preserved when query does not cover partition range.
781        assert_eq!(
782            key.scan.time_filters(),
783            [col("ts").gt_eq(ts_lit(1000)).to_string()].as_slice()
784        );
785        assert_eq!(
786            key.scan.filters(),
787            [col("k0").eq(lit("foo")).to_string()].as_slice()
788        );
789    }
790
791    #[tokio::test]
792    async fn strips_time_only_filters_when_query_has_no_time_range_limit() {
793        let (stream_ctx, part_range) = new_stream_context(
794            vec![
795                col("ts").gt_eq(ts_lit(1000)),
796                col("ts").is_not_null(),
797                col("k0").eq(lit("foo")),
798            ],
799            None,
800            (
801                Timestamp::new_millisecond(1000),
802                Timestamp::new_millisecond(2000),
803            ),
804        )
805        .await;
806
807        let key = build_range_cache_key(&stream_ctx, &part_range).unwrap();
808
809        // Range-reducible time filters should be cleared when query has no time range limit.
810        assert!(key.scan.time_filters().is_empty());
811        // Non-range time predicates stay in filters.
812        let mut expected_filters = [
813            col("k0").eq(lit("foo")).to_string(),
814            col("ts").is_not_null().to_string(),
815        ];
816        expected_filters.sort_unstable();
817        assert_eq!(key.scan.filters(), expected_filters.as_slice());
818    }
819
820    #[test]
821    fn normalizes_and_clears_time_filters() {
822        let normalized = ScanRequestFingerprintBuilder {
823            read_column_ids: vec![1, 2],
824            read_column_types: vec![None, None],
825            filters: vec!["k0 = 'foo'".to_string()],
826            time_filters: vec![],
827            series_row_selector: None,
828            append_mode: false,
829            filter_deleted: true,
830            merge_mode: MergeMode::LastRow,
831            partition_expr_version: 0,
832        }
833        .build();
834
835        assert!(normalized.time_filters().is_empty());
836
837        let fingerprint = ScanRequestFingerprintBuilder {
838            read_column_ids: vec![1, 2],
839            read_column_types: vec![None, None],
840            filters: vec!["k0 = 'foo'".to_string()],
841            time_filters: vec!["ts >= 1000".to_string()],
842            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
843            append_mode: false,
844            filter_deleted: true,
845            merge_mode: MergeMode::LastRow,
846            partition_expr_version: 7,
847        }
848        .build();
849
850        let reset = fingerprint.without_time_filters();
851
852        assert_eq!(reset.read_column_ids(), fingerprint.read_column_ids());
853        assert_eq!(reset.read_column_types(), fingerprint.read_column_types());
854        assert_eq!(reset.filters(), fingerprint.filters());
855        assert!(reset.time_filters().is_empty());
856        assert_eq!(reset.series_row_selector, fingerprint.series_row_selector);
857        assert_eq!(reset.append_mode, fingerprint.append_mode);
858        assert_eq!(reset.filter_deleted, fingerprint.filter_deleted);
859        assert_eq!(reset.merge_mode, fingerprint.merge_mode);
860        assert_eq!(
861            reset.partition_expr_version,
862            fingerprint.partition_expr_version
863        );
864    }
865
866    fn test_schema() -> Arc<datatypes::arrow::datatypes::Schema> {
867        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
868
869        Arc::new(Schema::new(vec![Field::new(
870            "value",
871            ArrowDataType::Int64,
872            false,
873        )]))
874    }
875
876    fn make_batch(values: &[i64]) -> RecordBatch {
877        use datatypes::arrow::array::Int64Array;
878
879        RecordBatch::try_new(
880            test_schema(),
881            vec![Arc::new(Int64Array::from(values.to_vec()))],
882        )
883        .unwrap()
884    }
885
886    fn make_large_binary_batch(rows: usize, bytes_per_row: usize) -> RecordBatch {
887        use datatypes::arrow::array::BinaryArray;
888        use datatypes::arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
889
890        let schema = Arc::new(Schema::new(vec![Field::new(
891            "value",
892            ArrowDataType::Binary,
893            false,
894        )]));
895        let payload = vec![b'x'; bytes_per_row];
896        let values = (0..rows).map(|_| payload.as_slice()).collect::<Vec<_>>();
897
898        RecordBatch::try_new(schema, vec![Arc::new(BinaryArray::from_vec(values))]).unwrap()
899    }
900
901    #[test]
902    fn compact_record_batches_keeps_original_boundaries() {
903        let batches = vec![make_batch(&[1, 2]), make_batch(&[3]), make_batch(&[4, 5])];
904
905        let compacted = compact_record_batches(batches).unwrap();
906
907        assert_eq!(compacted.batch.num_rows(), 5);
908        assert_eq!(compacted.slice_lengths, vec![2, 1, 2]);
909    }
910
911    #[tokio::test]
912    async fn cached_flat_range_stream_replays_original_batches() {
913        let value = Arc::new(RangeScanCacheValue::new(
914            vec![CachedBatchSlice {
915                batch: make_batch(&[1, 2, 3]),
916                slice_lengths: vec![2, 1],
917            }],
918            make_batch(&[1, 2, 3]).get_array_memory_size(),
919        ));
920
921        let replayed = cached_flat_range_stream(value)
922            .try_collect::<Vec<_>>()
923            .await
924            .unwrap();
925
926        assert_eq!(replayed.len(), 2);
927        assert_eq!(replayed[0].num_rows(), 2);
928        assert_eq!(replayed[1].num_rows(), 1);
929    }
930
931    #[tokio::test]
932    async fn cache_batch_buffer_finishes_pending_batches() {
933        let strategy = test_cache_strategy();
934        let batch = make_batch(&[1, 2, 3]);
935        let expected_size = batch.get_array_memory_size();
936        let (key, part_metrics) = test_cache_context(&strategy);
937
938        let mut buffer = CacheBatchBuffer::new(&strategy);
939        buffer.push(batch).unwrap();
940
941        let value = finish_cache_batch_buffer(buffer, key.clone(), strategy.clone(), part_metrics)
942            .await
943            .unwrap();
944        assert_eq!(value.cached_batches.len(), 1);
945        assert_eq!(value.cached_batches[0].slice_lengths, vec![3]);
946        assert_eq!(value.estimated_batches_size, expected_size);
947        assert!(Arc::ptr_eq(
948            &value,
949            &strategy.get_range_result(&key).unwrap()
950        ));
951    }
952
953    #[tokio::test]
954    async fn cache_batch_buffer_compacts_when_rows_exceed_default_batch_size() {
955        let strategy = test_cache_strategy();
956        let batch = make_batch(&vec![1; DEFAULT_READ_BATCH_SIZE / 2 + 1]);
957        let (key, part_metrics) = test_cache_context(&strategy);
958
959        let mut buffer = CacheBatchBuffer::new(&strategy);
960        buffer.push(batch.clone()).unwrap();
961        buffer.push(batch).unwrap();
962
963        assert_eq!(buffer.buffered_rows, 0);
964        assert!(buffer.buffered_batches.is_empty());
965
966        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
967            .await
968            .unwrap();
969        assert_eq!(value.cached_batches.len(), 1);
970        assert_eq!(
971            value.cached_batches[0].slice_lengths,
972            vec![
973                DEFAULT_READ_BATCH_SIZE / 2 + 1,
974                DEFAULT_READ_BATCH_SIZE / 2 + 1
975            ]
976        );
977    }
978
979    #[tokio::test]
980    async fn cache_batch_buffer_compacts_when_buffered_size_exceeds_threshold() {
981        let strategy = test_cache_strategy();
982        let large_batch = make_large_binary_batch(DEFAULT_READ_BATCH_SIZE, 4096);
983        let (key, part_metrics) = test_cache_context(&strategy);
984
985        let mut buffer = CacheBatchBuffer::new(&strategy);
986        buffer.push(large_batch.clone()).unwrap();
987
988        assert_eq!(buffer.buffered_rows, 0);
989        assert!(buffer.buffered_batches.is_empty());
990
991        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
992            .await
993            .unwrap();
994        assert_eq!(value.cached_batches.len(), 1);
995        assert_eq!(
996            value.cached_batches[0].slice_lengths,
997            vec![large_batch.num_rows()]
998        );
999    }
1000
1001    #[tokio::test]
1002    async fn cache_batch_buffer_uses_compacted_size_for_weight() {
1003        let strategy = test_cache_strategy();
1004        let batch1 = make_batch(&[1, 2]);
1005        let batch2 = make_batch(&[3, 4]);
1006        let (key, part_metrics) = test_cache_context(&strategy);
1007        let expected = concat_batches(&test_schema(), &[batch1.clone(), batch2.clone()])
1008            .unwrap()
1009            .get_array_memory_size();
1010
1011        let mut buffer = CacheBatchBuffer::new(&strategy);
1012        buffer.push(batch1).unwrap();
1013        buffer.push(batch2).unwrap();
1014
1015        let value = finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1016            .await
1017            .unwrap();
1018        assert_eq!(value.estimated_batches_size, expected);
1019    }
1020
1021    #[tokio::test]
1022    async fn cache_batch_buffer_skips_cache_when_weight_exceeds_limit() {
1023        let strategy = test_cache_strategy();
1024        let (key, part_metrics) = test_cache_context(&strategy);
1025        let mut buffer = CacheBatchBuffer::new(&strategy);
1026        buffer.total_weight = RANGE_CACHE_SKIP_BYTES;
1027
1028        buffer.push(make_batch(&[1])).unwrap();
1029
1030        assert!(buffer.sender.is_none());
1031        assert!(
1032            finish_cache_batch_buffer(buffer, key, strategy, part_metrics)
1033                .await
1034                .is_none()
1035        );
1036    }
1037}