Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::schema::ext::ArrowSchemaExt;
35use futures::StreamExt;
36use partition::expr::PartitionExpr;
37use smallvec::SmallVec;
38use snafu::ResultExt;
39use store_api::metadata::{RegionMetadata, RegionMetadataRef};
40use store_api::region_engine::{PartitionRange, RegionScannerRef};
41use store_api::storage::{
42    RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
43    TimeSeriesRowSelector,
44};
45use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
46use tokio::sync::{Semaphore, mpsc};
47use tokio_stream::wrappers::ReceiverStream;
48
49use crate::access_layer::AccessLayerRef;
50use crate::cache::CacheStrategy;
51use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
52use crate::error::{InvalidPartitionExprSnafu, Result};
53#[cfg(feature = "enterprise")]
54use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
55use crate::memtable::{MemtableRange, RangesOptions};
56use crate::metrics::READ_SST_COUNT;
57use crate::read::compat::{self, FlatCompatBatch};
58use crate::read::flat_projection::FlatProjectionMapper;
59use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
60use crate::read::range_cache::ScanRequestFingerprint;
61use crate::read::read_columns::{
62    ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
63};
64use crate::read::seq_scan::SeqScan;
65use crate::read::series_scan::SeriesScan;
66use crate::read::stream::ScanBatchStream;
67use crate::read::unordered_scan::UnorderedScan;
68use crate::read::{BoxedRecordBatchStream, RecordBatch};
69use crate::region::options::MergeMode;
70use crate::region::version::VersionRef;
71use crate::sst::file::FileHandle;
72use crate::sst::index::bloom_filter::applier::{
73    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
74};
75use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
76use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
77use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
78use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
79#[cfg(feature = "vector_index")]
80use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
81use crate::sst::parquet::file_range::PreFilterMode;
82use crate::sst::parquet::reader::ReaderMetrics;
83
84#[cfg(feature = "vector_index")]
85const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
86
87/// A scanner scans a region and returns a [SendableRecordBatchStream].
88pub(crate) enum Scanner {
89    /// Sequential scan.
90    Seq(SeqScan),
91    /// Unordered scan.
92    Unordered(UnorderedScan),
93    /// Per-series scan.
94    Series(SeriesScan),
95}
96
97impl Scanner {
98    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
99    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
100    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
101        match self {
102            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
103            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
104            Scanner::Series(series_scan) => series_scan.build_stream().await,
105        }
106    }
107
108    /// Create a stream of [`Batch`] by this scanner.
109    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
110        match self {
111            Scanner::Seq(x) => x.scan_all_partitions(),
112            Scanner::Unordered(x) => x.scan_all_partitions(),
113            Scanner::Series(x) => x.scan_all_partitions(),
114        }
115    }
116}
117
118#[cfg(test)]
119impl Scanner {
120    /// Returns number of files to scan.
121    pub(crate) fn num_files(&self) -> usize {
122        match self {
123            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
124            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
125            Scanner::Series(series_scan) => series_scan.input().num_files(),
126        }
127    }
128
129    /// Returns number of memtables to scan.
130    pub(crate) fn num_memtables(&self) -> usize {
131        match self {
132            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
133            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
134            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
135        }
136    }
137
138    /// Returns SST file ids to scan.
139    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
140        match self {
141            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
142            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
143            Scanner::Series(series_scan) => series_scan.input().file_ids(),
144        }
145    }
146
147    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
148        match self {
149            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
150            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
151            Scanner::Series(series_scan) => series_scan.input().index_ids(),
152        }
153    }
154
155    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
156        match self {
157            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
158            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
159            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
160        }
161    }
162
163    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
164    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
165        use store_api::region_engine::{PrepareRequest, RegionScanner};
166
167        let request = PrepareRequest::default().with_target_partitions(target_partitions);
168        match self {
169            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
170            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
171            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
172        }
173    }
174}
175
176#[cfg_attr(doc, aquamarine::aquamarine)]
177/// Helper to scans a region by [ScanRequest].
178///
179/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
180/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
181///
182/// ```mermaid
183/// classDiagram
184/// class ScanRegion {
185///     -VersionRef version
186///     -ScanRequest request
187///     ~scanner() Scanner
188///     ~seq_scan() SeqScan
189/// }
190/// class Scanner {
191///     <<enumeration>>
192///     SeqScan
193///     UnorderedScan
194///     +scan() SendableRecordBatchStream
195/// }
196/// class SeqScan {
197///     -ScanInput input
198///     +build() SendableRecordBatchStream
199/// }
200/// class UnorderedScan {
201///     -ScanInput input
202///     +build() SendableRecordBatchStream
203/// }
204/// class ScanInput {
205///     -ProjectionMapper mapper
206///     -Option~TimeRange~ time_range
207///     -Option~Predicate~ predicate
208///     -Vec~MemtableRef~ memtables
209///     -Vec~FileHandle~ files
210/// }
211/// class ProjectionMapper {
212///     ~output_schema() SchemaRef
213///     ~convert(Batch) RecordBatch
214/// }
215/// ScanRegion -- Scanner
216/// ScanRegion o-- ScanRequest
217/// Scanner o-- SeqScan
218/// Scanner o-- UnorderedScan
219/// SeqScan o-- ScanInput
220/// UnorderedScan o-- ScanInput
221/// Scanner -- SendableRecordBatchStream
222/// ScanInput o-- ProjectionMapper
223/// SeqScan -- SendableRecordBatchStream
224/// UnorderedScan -- SendableRecordBatchStream
225/// ```
226pub(crate) struct ScanRegion {
227    /// Version of the region at scan.
228    version: VersionRef,
229    /// Access layer of the region.
230    access_layer: AccessLayerRef,
231    /// Scan request.
232    request: ScanRequest,
233    /// Cache.
234    cache_strategy: CacheStrategy,
235    /// Maximum number of SST files to scan concurrently.
236    max_concurrent_scan_files: usize,
237    /// Whether to ignore inverted index.
238    ignore_inverted_index: bool,
239    /// Whether to ignore fulltext index.
240    ignore_fulltext_index: bool,
241    /// Whether to ignore bloom filter.
242    ignore_bloom_filter: bool,
243    /// Start time of the scan task.
244    start_time: Option<Instant>,
245    /// Whether to filter out the deleted rows.
246    /// Usually true for normal read, and false for scan for compaction.
247    filter_deleted: bool,
248    #[cfg(feature = "enterprise")]
249    extension_range_provider: Option<BoxedExtensionRangeProvider>,
250}
251
252impl ScanRegion {
253    /// Creates a [ScanRegion].
254    pub(crate) fn new(
255        version: VersionRef,
256        access_layer: AccessLayerRef,
257        request: ScanRequest,
258        cache_strategy: CacheStrategy,
259    ) -> ScanRegion {
260        ScanRegion {
261            version,
262            access_layer,
263            request,
264            cache_strategy,
265            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
266            ignore_inverted_index: false,
267            ignore_fulltext_index: false,
268            ignore_bloom_filter: false,
269            start_time: None,
270            filter_deleted: true,
271            #[cfg(feature = "enterprise")]
272            extension_range_provider: None,
273        }
274    }
275
276    /// Sets maximum number of SST files to scan concurrently.
277    #[must_use]
278    pub(crate) fn with_max_concurrent_scan_files(
279        mut self,
280        max_concurrent_scan_files: usize,
281    ) -> Self {
282        self.max_concurrent_scan_files = max_concurrent_scan_files;
283        self
284    }
285
286    /// Sets whether to ignore inverted index.
287    #[must_use]
288    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
289        self.ignore_inverted_index = ignore;
290        self
291    }
292
293    /// Sets whether to ignore fulltext index.
294    #[must_use]
295    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
296        self.ignore_fulltext_index = ignore;
297        self
298    }
299
300    /// Sets whether to ignore bloom filter.
301    #[must_use]
302    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
303        self.ignore_bloom_filter = ignore;
304        self
305    }
306
307    #[must_use]
308    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
309        self.start_time = Some(now);
310        self
311    }
312
313    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
314        self.filter_deleted = filter_deleted;
315    }
316
317    #[cfg(feature = "enterprise")]
318    pub(crate) fn set_extension_range_provider(
319        &mut self,
320        extension_range_provider: BoxedExtensionRangeProvider,
321    ) {
322        self.extension_range_provider = Some(extension_range_provider);
323    }
324
325    /// Returns a [Scanner] to scan the region.
326    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
327    pub(crate) async fn scanner(self) -> Result<Scanner> {
328        if self.use_series_scan() {
329            self.series_scan().await.map(Scanner::Series)
330        } else if self.use_unordered_scan() {
331            // If table is append only and there is no series row selector, we use unordered scan in query.
332            // We still use seq scan in compaction.
333            self.unordered_scan().await.map(Scanner::Unordered)
334        } else {
335            self.seq_scan().await.map(Scanner::Seq)
336        }
337    }
338
339    /// Returns a [RegionScanner] to scan the region.
340    #[tracing::instrument(
341        level = tracing::Level::DEBUG,
342        skip_all,
343        fields(region_id = %self.region_id())
344    )]
345    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
346        if self.use_series_scan() {
347            self.series_scan()
348                .await
349                .map(|scanner| Box::new(scanner) as _)
350        } else if self.use_unordered_scan() {
351            self.unordered_scan()
352                .await
353                .map(|scanner| Box::new(scanner) as _)
354        } else {
355            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
356        }
357    }
358
359    /// Scan sequentially.
360    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
361    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
362        let input = self.scan_input().await?.with_compaction(false);
363        Ok(SeqScan::new(input))
364    }
365
366    /// Unordered scan.
367    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
368    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
369        let input = self.scan_input().await?;
370        Ok(UnorderedScan::new(input))
371    }
372
373    /// Scans by series.
374    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
375    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
376        let input = self.scan_input().await?;
377        Ok(SeriesScan::new(input))
378    }
379
380    /// Returns true if the region can use unordered scan for current request.
381    fn use_unordered_scan(&self) -> bool {
382        // We use unordered scan when:
383        // 1. The region is in append mode.
384        // 2. There is no series row selector.
385        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
386        //
387        // We still use seq scan in compaction.
388        self.version.options.append_mode
389            && self.request.series_row_selector.is_none()
390            && (self.request.distribution.is_none()
391                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
392    }
393
394    /// Returns true if the region can use series scan for current request.
395    fn use_series_scan(&self) -> bool {
396        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
397    }
398
399    /// Creates a scan input.
400    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
401    async fn scan_input(self) -> Result<ScanInput> {
402        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
403        let time_range = self.build_time_range_predicate();
404        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
405
406        let read_cols = match &self.request.projection_input {
407            Some(p) => {
408                // Read columns include the pushed-down projection and columns
409                // resolved from the predicate.
410                let metadata = &self.version.metadata;
411                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
412                let from_predicate = read_columns_from_predicate(&predicate, metadata);
413                merge(from_projection, from_predicate)
414            }
415            None => {
416                let read_col_ids = self
417                    .version
418                    .metadata
419                    .column_metadatas
420                    .iter()
421                    .map(|col| col.column_id);
422                ReadColumns::from_deduped_column_ids(read_col_ids)
423            }
424        };
425        let read_col_ids = read_cols.column_ids();
426
427        // The mapper always computes projected column ids as the schema of SSTs may change.
428        let projection = self
429            .request
430            .projection_indices()
431            .map(|x| x.to_vec())
432            .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
433        let json_type_hint = self
434            .version
435            .metadata
436            .schema
437            .arrow_schema()
438            .has_json_extension_field()
439            .then_some(&self.request.json_type_hint);
440        let mapper = FlatProjectionMapper::new_with_read_columns(
441            &self.version.metadata,
442            projection,
443            read_cols,
444            json_type_hint,
445        )?;
446
447        let ssts = &self.version.ssts;
448        let mut files = Vec::new();
449        for level in ssts.levels() {
450            for file in level.files.values() {
451                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
452                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
453                    // If the file's sequence is None (or actually is zero), it could mean the file
454                    // is generated and added to the region "directly". In this case, its data should
455                    // be considered as fresh as the memtable. So its sequence is treated greater than
456                    // the min_sequence, whatever the value of min_sequence is. Hence the default
457                    // "true" in this arm.
458                    (Some(_), None) => true,
459                    (None, _) => true,
460                };
461
462                // Finds SST files in range.
463                if exceed_min_sequence && file_in_range(file, &time_range) {
464                    files.push(file.clone());
465                }
466                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
467                // unless the timing is too good, or the sequence number wouldn't be in file.
468                // and the batch will be filtered out by tree reader anyway.
469            }
470        }
471
472        let memtables = self.version.memtables.list_memtables();
473        // Skip empty memtables and memtables out of time range.
474        let mut mem_range_builders = Vec::new();
475        let filter_mode = pre_filter_mode(
476            self.version.options.append_mode,
477            self.version.options.merge_mode(),
478        );
479
480        for m in memtables {
481            // check if memtable is empty by reading stats.
482            let Some((start, end)) = m.stats().time_range() else {
483                continue;
484            };
485            // The time range of the memtable is inclusive.
486            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
487            if !memtable_range.intersects(&time_range) {
488                continue;
489            }
490            let ranges_in_memtable = m.ranges(
491                Some(&read_col_ids),
492                RangesOptions::default()
493                    .with_predicate(predicate.clone())
494                    .with_sequence(SequenceRange::new(
495                        self.request.memtable_min_sequence,
496                        self.request.memtable_max_sequence,
497                    ))
498                    .with_pre_filter_mode(filter_mode),
499            )?;
500            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
501                let stats = v.stats().clone();
502                MemRangeBuilder::new(v, stats)
503            }));
504        }
505
506        let region_id = self.region_id();
507        debug!(
508            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
509            region_id,
510            self.request,
511            time_range,
512            mem_range_builders.len(),
513            files.len(),
514            self.version.options.append_mode,
515        );
516
517        let (non_field_filters, field_filters) = self.partition_by_field_filters();
518        let inverted_index_appliers = [
519            self.build_invereted_index_applier(&non_field_filters),
520            self.build_invereted_index_applier(&field_filters),
521        ];
522        let bloom_filter_appliers = [
523            self.build_bloom_filter_applier(&non_field_filters),
524            self.build_bloom_filter_applier(&field_filters),
525        ];
526        let fulltext_index_appliers = [
527            self.build_fulltext_index_applier(&non_field_filters),
528            self.build_fulltext_index_applier(&field_filters),
529        ];
530        #[cfg(feature = "vector_index")]
531        let vector_index_applier = self.build_vector_index_applier();
532        #[cfg(feature = "vector_index")]
533        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
534            if self.request.filters.is_empty() {
535                search.k
536            } else {
537                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
538            }
539        });
540
541        let input = ScanInput::new(self.access_layer, mapper)
542            .with_time_range(Some(time_range))
543            .with_predicate(predicate)
544            .with_memtables(mem_range_builders)
545            .with_files(files)
546            .with_cache(self.cache_strategy)
547            .with_inverted_index_appliers(inverted_index_appliers)
548            .with_bloom_filter_index_appliers(bloom_filter_appliers)
549            .with_fulltext_index_appliers(fulltext_index_appliers)
550            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
551            .with_start_time(self.start_time)
552            .with_append_mode(self.version.options.append_mode)
553            .with_filter_deleted(self.filter_deleted)
554            .with_merge_mode(self.version.options.merge_mode())
555            .with_series_row_selector(self.request.series_row_selector)
556            .with_distribution(self.request.distribution)
557            .with_explain_flat_format(
558                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
559            )
560            .with_snapshot_sequence(
561                self.request
562                    .snapshot_on_scan
563                    .then_some(self.request.memtable_max_sequence)
564                    .flatten(),
565            );
566        #[cfg(feature = "vector_index")]
567        let input = input
568            .with_vector_index_applier(vector_index_applier)
569            .with_vector_index_k(vector_index_k);
570
571        #[cfg(feature = "enterprise")]
572        let input = if let Some(provider) = self.extension_range_provider {
573            let ranges = provider
574                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
575                .await?;
576            debug!("Find extension ranges: {ranges:?}");
577            input.with_extension_ranges(ranges)
578        } else {
579            input
580        };
581        Ok(input)
582    }
583
584    fn region_id(&self) -> RegionId {
585        self.version.metadata.region_id
586    }
587
588    /// Build time range predicate from filters.
589    fn build_time_range_predicate(&self) -> TimestampRange {
590        let time_index = self.version.metadata.time_index_column();
591        let unit = time_index
592            .column_schema
593            .data_type
594            .as_timestamp()
595            .expect("Time index must have timestamp-compatible type")
596            .unit();
597        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
598    }
599
600    /// Partitions filters into two groups: non-field filters and field filters.
601    /// Returns `(non_field_filters, field_filters)`.
602    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
603        let field_columns = self
604            .version
605            .metadata
606            .field_columns()
607            .map(|col| &col.column_schema.name)
608            .collect::<HashSet<_>>();
609
610        let mut columns = HashSet::new();
611
612        self.request.filters.iter().cloned().partition(|expr| {
613            columns.clear();
614            // `expr_to_columns` won't return error.
615            if expr_to_columns(expr, &mut columns).is_err() {
616                // If we can't extract columns, treat it as non-field filter
617                return true;
618            }
619            // Return true for non-field filters (partition puts true cases in first vec)
620            !columns
621                .iter()
622                .any(|column| field_columns.contains(&column.name))
623        })
624    }
625
626    /// Use the latest schema to build the inverted index applier.
627    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
628        if self.ignore_inverted_index {
629            return None;
630        }
631
632        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
633        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
634
635        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
636
637        InvertedIndexApplierBuilder::new(
638            self.access_layer.table_dir().to_string(),
639            self.access_layer.path_type(),
640            self.access_layer.object_store().clone(),
641            self.version.metadata.as_ref(),
642            self.version.metadata.inverted_indexed_column_ids(
643                self.version
644                    .options
645                    .index_options
646                    .inverted_index
647                    .ignore_column_ids
648                    .iter(),
649            ),
650            self.access_layer.puffin_manager_factory().clone(),
651        )
652        .with_file_cache(file_cache)
653        .with_inverted_index_cache(inverted_index_cache)
654        .with_puffin_metadata_cache(puffin_metadata_cache)
655        .build(filters)
656        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
657        .ok()
658        .flatten()
659        .map(Arc::new)
660    }
661
662    /// Use the latest schema to build the bloom filter index applier.
663    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
664        if self.ignore_bloom_filter {
665            return None;
666        }
667
668        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
669        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
670        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
671
672        BloomFilterIndexApplierBuilder::new(
673            self.access_layer.table_dir().to_string(),
674            self.access_layer.path_type(),
675            self.access_layer.object_store().clone(),
676            self.version.metadata.as_ref(),
677            self.access_layer.puffin_manager_factory().clone(),
678        )
679        .with_file_cache(file_cache)
680        .with_bloom_filter_index_cache(bloom_filter_index_cache)
681        .with_puffin_metadata_cache(puffin_metadata_cache)
682        .build(filters)
683        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
684        .ok()
685        .flatten()
686        .map(Arc::new)
687    }
688
689    /// Use the latest schema to build the fulltext index applier.
690    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
691        if self.ignore_fulltext_index {
692            return None;
693        }
694
695        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
696        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
697        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
698        FulltextIndexApplierBuilder::new(
699            self.access_layer.table_dir().to_string(),
700            self.access_layer.path_type(),
701            self.access_layer.object_store().clone(),
702            self.access_layer.puffin_manager_factory().clone(),
703            self.version.metadata.as_ref(),
704        )
705        .with_file_cache(file_cache)
706        .with_puffin_metadata_cache(puffin_metadata_cache)
707        .with_bloom_filter_cache(bloom_filter_index_cache)
708        .build(filters)
709        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
710        .ok()
711        .flatten()
712        .map(Arc::new)
713    }
714
715    /// Build the vector index applier from vector search request.
716    #[cfg(feature = "vector_index")]
717    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
718        let vector_search = self.request.vector_search.as_ref()?;
719
720        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
721        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
722        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
723
724        let applier = VectorIndexApplier::new(
725            self.access_layer.table_dir().to_string(),
726            self.access_layer.path_type(),
727            self.access_layer.object_store().clone(),
728            self.access_layer.puffin_manager_factory().clone(),
729            vector_search.column_id,
730            vector_search.query_vector.clone(),
731            vector_search.metric,
732        )
733        .with_file_cache(file_cache)
734        .with_puffin_metadata_cache(puffin_metadata_cache)
735        .with_vector_index_cache(vector_index_cache);
736
737        Some(Arc::new(applier))
738    }
739}
740
741/// Returns true if the time range of a SST `file` matches the `predicate`.
742fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
743    if predicate == &TimestampRange::min_to_max() {
744        return true;
745    }
746    // end timestamp of a SST is inclusive.
747    let (start, end) = file.time_range();
748    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
749    file_ts_range.intersects(predicate)
750}
751
752/// Common input for different scanners.
753pub struct ScanInput {
754    /// Region SST access layer.
755    access_layer: AccessLayerRef,
756    /// Maps projected Batches to RecordBatches.
757    pub(crate) mapper: Arc<FlatProjectionMapper>,
758    /// The columns to read from memtables and SSTs.
759    /// Notice this is different from the columns in `mapper` which are projected columns.
760    /// But this read columns might also include non-projected columns needed for filtering.
761    pub(crate) read_cols: ReadColumns,
762    /// Time range filter for time index.
763    pub(crate) time_range: Option<TimestampRange>,
764    /// Predicate to push down.
765    pub(crate) predicate: PredicateGroup,
766    /// Region partition expr applied at read time.
767    region_partition_expr: Option<PartitionExpr>,
768    /// Memtable range builders for memtables in the time range..
769    pub(crate) memtables: Vec<MemRangeBuilder>,
770    /// Handles to SST files to scan.
771    pub(crate) files: Vec<FileHandle>,
772    /// Cache.
773    pub(crate) cache_strategy: CacheStrategy,
774    /// Ignores file not found error.
775    ignore_file_not_found: bool,
776    /// Maximum number of SST files to scan concurrently.
777    pub(crate) max_concurrent_scan_files: usize,
778    /// Index appliers.
779    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
780    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
781    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
782    /// Vector index applier for KNN search.
783    #[cfg(feature = "vector_index")]
784    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
785    /// Over-fetched k for vector index scan.
786    #[cfg(feature = "vector_index")]
787    pub(crate) vector_index_k: Option<usize>,
788    /// Start time of the query.
789    pub(crate) query_start: Option<Instant>,
790    /// The region is using append mode.
791    pub(crate) append_mode: bool,
792    /// Whether to remove deletion markers.
793    pub(crate) filter_deleted: bool,
794    /// Mode to merge duplicate rows.
795    pub(crate) merge_mode: MergeMode,
796    /// Hint to select rows from time series.
797    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
798    /// Hint for the required distribution of the scanner.
799    pub(crate) distribution: Option<TimeSeriesDistribution>,
800    /// Whether the region's configured SST format is flat.
801    explain_flat_format: bool,
802    /// Snapshot upper bound bound at scan open and propagated back to the caller.
803    pub(crate) snapshot_sequence: Option<SequenceNumber>,
804    /// Whether this scan is for compaction.
805    pub(crate) compaction: bool,
806    #[cfg(feature = "enterprise")]
807    extension_ranges: Vec<BoxedExtensionRange>,
808}
809
810impl ScanInput {
811    /// Creates a new [ScanInput].
812    #[must_use]
813    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
814        ScanInput {
815            access_layer,
816            read_cols: mapper.read_columns().clone(),
817            mapper: Arc::new(mapper),
818            time_range: None,
819            predicate: PredicateGroup::default(),
820            region_partition_expr: None,
821            memtables: Vec::new(),
822            files: Vec::new(),
823            cache_strategy: CacheStrategy::Disabled,
824            ignore_file_not_found: false,
825            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
826            inverted_index_appliers: [None, None],
827            bloom_filter_index_appliers: [None, None],
828            fulltext_index_appliers: [None, None],
829            #[cfg(feature = "vector_index")]
830            vector_index_applier: None,
831            #[cfg(feature = "vector_index")]
832            vector_index_k: None,
833            query_start: None,
834            append_mode: false,
835            filter_deleted: true,
836            merge_mode: MergeMode::default(),
837            series_row_selector: None,
838            distribution: None,
839            explain_flat_format: false,
840            snapshot_sequence: None,
841            compaction: false,
842            #[cfg(feature = "enterprise")]
843            extension_ranges: Vec::new(),
844        }
845    }
846
847    /// Sets time range filter for time index.
848    #[must_use]
849    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
850        self.time_range = time_range;
851        self
852    }
853
854    /// Sets predicate to push down.
855    #[must_use]
856    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
857        self.region_partition_expr = predicate.region_partition_expr().cloned();
858        self.predicate = predicate;
859        self
860    }
861
862    /// Sets memtable range builders.
863    #[must_use]
864    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
865        self.memtables = memtables;
866        self
867    }
868
869    /// Sets files to read.
870    #[must_use]
871    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
872        self.files = files;
873        self
874    }
875
876    /// Sets cache for this query.
877    #[must_use]
878    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
879        self.cache_strategy = cache;
880        self
881    }
882
883    /// Ignores file not found error.
884    #[must_use]
885    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
886        self.ignore_file_not_found = ignore;
887        self
888    }
889
890    /// Sets maximum number of SST files to scan concurrently.
891    #[must_use]
892    pub(crate) fn with_max_concurrent_scan_files(
893        mut self,
894        max_concurrent_scan_files: usize,
895    ) -> Self {
896        self.max_concurrent_scan_files = max_concurrent_scan_files;
897        self
898    }
899
900    /// Sets inverted index appliers.
901    #[must_use]
902    pub(crate) fn with_inverted_index_appliers(
903        mut self,
904        appliers: [Option<InvertedIndexApplierRef>; 2],
905    ) -> Self {
906        self.inverted_index_appliers = appliers;
907        self
908    }
909
910    /// Sets bloom filter appliers.
911    #[must_use]
912    pub(crate) fn with_bloom_filter_index_appliers(
913        mut self,
914        appliers: [Option<BloomFilterIndexApplierRef>; 2],
915    ) -> Self {
916        self.bloom_filter_index_appliers = appliers;
917        self
918    }
919
920    /// Sets fulltext index appliers.
921    #[must_use]
922    pub(crate) fn with_fulltext_index_appliers(
923        mut self,
924        appliers: [Option<FulltextIndexApplierRef>; 2],
925    ) -> Self {
926        self.fulltext_index_appliers = appliers;
927        self
928    }
929
930    /// Sets vector index applier for KNN search.
931    #[cfg(feature = "vector_index")]
932    #[must_use]
933    pub(crate) fn with_vector_index_applier(
934        mut self,
935        applier: Option<VectorIndexApplierRef>,
936    ) -> Self {
937        self.vector_index_applier = applier;
938        self
939    }
940
941    /// Sets over-fetched k for vector index scan.
942    #[cfg(feature = "vector_index")]
943    #[must_use]
944    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
945        self.vector_index_k = k;
946        self
947    }
948
949    /// Sets start time of the query.
950    #[must_use]
951    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
952        self.query_start = now;
953        self
954    }
955
956    #[must_use]
957    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
958        self.append_mode = is_append_mode;
959        self
960    }
961
962    /// Sets whether to remove deletion markers during scan.
963    #[must_use]
964    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
965        self.filter_deleted = filter_deleted;
966        self
967    }
968
969    /// Sets the merge mode.
970    #[must_use]
971    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
972        self.merge_mode = merge_mode;
973        self
974    }
975
976    /// Sets the distribution hint.
977    #[must_use]
978    pub(crate) fn with_distribution(
979        mut self,
980        distribution: Option<TimeSeriesDistribution>,
981    ) -> Self {
982        self.distribution = distribution;
983        self
984    }
985
986    /// Sets whether the region's configured SST format is flat for explain output.
987    #[must_use]
988    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
989        self.explain_flat_format = explain_flat_format;
990        self
991    }
992
993    /// Sets the time series row selector.
994    #[must_use]
995    pub(crate) fn with_series_row_selector(
996        mut self,
997        series_row_selector: Option<TimeSeriesRowSelector>,
998    ) -> Self {
999        self.series_row_selector = series_row_selector;
1000        self
1001    }
1002
1003    #[must_use]
1004    pub(crate) fn with_snapshot_sequence(
1005        mut self,
1006        snapshot_sequence: Option<SequenceNumber>,
1007    ) -> Self {
1008        self.snapshot_sequence = snapshot_sequence;
1009        self
1010    }
1011
1012    /// Sets whether this scan is for compaction.
1013    #[must_use]
1014    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1015        self.compaction = compaction;
1016        self
1017    }
1018
1019    /// Builds memtable ranges to scan by `index`.
1020    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1021        let memtable = &self.memtables[index.index];
1022        let mut ranges = SmallVec::new();
1023        memtable.build_ranges(index.row_group_index, &mut ranges);
1024        ranges
1025    }
1026
1027    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1028        if self.should_skip_region_partition(file) {
1029            self.predicate.predicate_without_region().cloned()
1030        } else {
1031            self.predicate.predicate().cloned()
1032        }
1033    }
1034
1035    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1036        match (
1037            self.region_partition_expr.as_ref(),
1038            file.meta_ref().partition_expr.as_ref(),
1039        ) {
1040            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1041            _ => false,
1042        }
1043    }
1044
1045    /// Prunes a file to scan and returns the builder to build readers.
1046    #[tracing::instrument(
1047        skip_all,
1048        fields(
1049            region_id = %self.region_metadata().region_id,
1050            file_id = %file.file_id()
1051        )
1052    )]
1053    pub async fn prune_file(
1054        &self,
1055        file: &FileHandle,
1056        pre_filter_mode: PreFilterMode,
1057        reader_metrics: &mut ReaderMetrics,
1058    ) -> Result<FileRangeBuilder> {
1059        let predicate = self.predicate_for_file(file);
1060        let may_build_selective_row_selection = predicate.is_some();
1061        let decode_pk_values = !self.compaction
1062            && self
1063                .mapper
1064                .read_columns()
1065                .column_ids_iter()
1066                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1067        let reader = self
1068            .access_layer
1069            .read_sst(file.clone())
1070            .predicate(predicate)
1071            .projection(Some(self.read_cols.clone()))
1072            .cache(self.cache_strategy.clone())
1073            .inverted_index_appliers(self.inverted_index_appliers.clone())
1074            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1075            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1076        let reader = if !self.compaction && may_build_selective_row_selection {
1077            reader.deferred_optional_page_index()
1078        } else {
1079            reader
1080        };
1081        #[cfg(feature = "vector_index")]
1082        let reader = {
1083            let mut reader = reader;
1084            reader =
1085                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1086            reader
1087        };
1088        let res = reader
1089            .expected_metadata(Some(self.mapper.metadata().clone()))
1090            .compaction(self.compaction)
1091            .pre_filter_mode(pre_filter_mode)
1092            .decode_primary_key_values(decode_pk_values)
1093            .build_reader_input(reader_metrics)
1094            .await;
1095        let read_input = match res {
1096            Ok(x) => x,
1097            Err(e) => {
1098                if e.is_object_not_found() && self.ignore_file_not_found {
1099                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1100                    return Ok(FileRangeBuilder::default());
1101                } else {
1102                    return Err(e);
1103                }
1104            }
1105        };
1106
1107        let Some((mut file_range_ctx, selection)) = read_input else {
1108            return Ok(FileRangeBuilder::default());
1109        };
1110
1111        let need_compat = !compat::has_same_columns_and_pk_encoding(
1112            &self.mapper,
1113            file_range_ctx.read_format(),
1114            self.compaction,
1115        );
1116        if need_compat {
1117            // They have different schema. We need to adapt the batch first so the
1118            // mapper can convert it.
1119            let compat = FlatCompatBatch::try_new(
1120                &self.mapper,
1121                file_range_ctx.read_format(),
1122                self.compaction,
1123            )?;
1124            file_range_ctx.set_compat_batch(compat);
1125        }
1126        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1127    }
1128
1129    /// Scans flat sources (RecordBatch streams) in parallel.
1130    ///
1131    /// # Panics if the input doesn't allow parallel scan.
1132    #[tracing::instrument(
1133        skip(self, sources, semaphore),
1134        fields(
1135            region_id = %self.region_metadata().region_id,
1136            source_count = sources.len()
1137        )
1138    )]
1139    pub(crate) fn create_parallel_flat_sources(
1140        &self,
1141        sources: Vec<BoxedRecordBatchStream>,
1142        semaphore: Arc<Semaphore>,
1143        channel_size: usize,
1144    ) -> Result<Vec<BoxedRecordBatchStream>> {
1145        if sources.len() <= 1 {
1146            return Ok(sources);
1147        }
1148
1149        // Spawn a task for each source.
1150        let sources = sources
1151            .into_iter()
1152            .map(|source| {
1153                let (sender, receiver) = mpsc::channel(channel_size);
1154                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1155                let stream = Box::pin(ReceiverStream::new(receiver));
1156                Box::pin(stream) as _
1157            })
1158            .collect();
1159        Ok(sources)
1160    }
1161
1162    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1163    #[tracing::instrument(
1164        skip(self, input, semaphore, sender),
1165        fields(region_id = %self.region_metadata().region_id)
1166    )]
1167    pub(crate) fn spawn_flat_scan_task(
1168        &self,
1169        mut input: BoxedRecordBatchStream,
1170        semaphore: Arc<Semaphore>,
1171        sender: mpsc::Sender<Result<RecordBatch>>,
1172    ) {
1173        let region_id = self.region_metadata().region_id;
1174        let span = tracing::info_span!(
1175            "ScanInput::parallel_scan_task",
1176            region_id = %region_id,
1177            stream_kind = "flat"
1178        );
1179        common_runtime::spawn_global(
1180            async move {
1181                loop {
1182                    // We release the permit before sending result to avoid the task waiting on
1183                    // the channel with the permit held.
1184                    let maybe_batch = {
1185                        // Safety: We never close the semaphore.
1186                        let _permit = semaphore.acquire().await.unwrap();
1187                        input.next().await
1188                    };
1189                    match maybe_batch {
1190                        Some(Ok(batch)) => {
1191                            let _ = sender.send(Ok(batch)).await;
1192                        }
1193                        Some(Err(e)) => {
1194                            let _ = sender.send(Err(e)).await;
1195                            break;
1196                        }
1197                        None => break,
1198                    }
1199                }
1200            }
1201            .instrument(span),
1202        );
1203    }
1204
1205    pub(crate) fn total_rows(&self) -> usize {
1206        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1207        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1208
1209        let rows = rows_in_files + rows_in_memtables;
1210        #[cfg(feature = "enterprise")]
1211        let rows = rows
1212            + self
1213                .extension_ranges
1214                .iter()
1215                .map(|x| x.num_rows())
1216                .sum::<u64>() as usize;
1217        rows
1218    }
1219
1220    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1221        &self.predicate
1222    }
1223
1224    /// Returns number of memtables to scan.
1225    pub(crate) fn num_memtables(&self) -> usize {
1226        self.memtables.len()
1227    }
1228
1229    /// Returns number of SST files to scan.
1230    pub(crate) fn num_files(&self) -> usize {
1231        self.files.len()
1232    }
1233
1234    /// Gets the file handle from a row group index.
1235    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1236        let file_index = index.index - self.num_memtables();
1237        &self.files[file_index]
1238    }
1239
1240    pub fn region_metadata(&self) -> &RegionMetadataRef {
1241        self.mapper.metadata()
1242    }
1243
1244    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1245        if source_count <= 1 {
1246            // Duplicated rows in the same source is not a normal case and we don't provide
1247            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1248            // are exactly identical in the same source so we use PreFilterMode::All for
1249            // performance reason.
1250            return PreFilterMode::All;
1251        }
1252
1253        pre_filter_mode(self.append_mode, self.merge_mode)
1254    }
1255}
1256
1257#[cfg(feature = "enterprise")]
1258impl ScanInput {
1259    #[must_use]
1260    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1261        Self {
1262            extension_ranges,
1263            ..self
1264        }
1265    }
1266
1267    #[cfg(feature = "enterprise")]
1268    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1269        &self.extension_ranges
1270    }
1271
1272    /// Get a boxed [ExtensionRange] by the index in all ranges.
1273    #[cfg(feature = "enterprise")]
1274    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1275        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1276    }
1277}
1278
1279#[cfg(test)]
1280impl ScanInput {
1281    /// Returns SST file ids to scan.
1282    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1283        self.files.iter().map(|file| file.file_id()).collect()
1284    }
1285
1286    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1287        self.files.iter().map(|file| file.index_id()).collect()
1288    }
1289}
1290
1291fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1292    if append_mode {
1293        return PreFilterMode::All;
1294    }
1295
1296    match merge_mode {
1297        MergeMode::LastRow => PreFilterMode::SkipFields,
1298        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1299    }
1300}
1301
1302/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1303/// for partition range caching.
1304pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1305    let eligible = !input.compaction
1306        && !input.files.is_empty()
1307        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1308
1309    if !eligible {
1310        return None;
1311    }
1312
1313    let metadata = input.region_metadata();
1314    let tag_names: HashSet<&str> = metadata
1315        .column_metadatas
1316        .iter()
1317        .filter(|col| col.semantic_type == SemanticType::Tag)
1318        .map(|col| col.column_schema.name.as_str())
1319        .collect();
1320
1321    let time_index = metadata.time_index_column();
1322    let time_index_name = time_index.column_schema.name.clone();
1323    let ts_col_unit = time_index
1324        .column_schema
1325        .data_type
1326        .as_timestamp()
1327        .expect("Time index must have timestamp-compatible type")
1328        .unit();
1329
1330    let exprs = input
1331        .predicate_group()
1332        .predicate_without_region()
1333        .map(|predicate| predicate.exprs())
1334        .unwrap_or_default();
1335
1336    let mut filters = Vec::new();
1337    let mut time_filters = Vec::new();
1338    let mut has_tag_filter = false;
1339    let mut columns = HashSet::new();
1340
1341    for expr in exprs {
1342        columns.clear();
1343        let is_time_only = match expr_to_columns(expr, &mut columns) {
1344            Ok(()) if !columns.is_empty() => {
1345                has_tag_filter |= columns
1346                    .iter()
1347                    .any(|col| tag_names.contains(col.name.as_str()));
1348                columns.iter().all(|col| col.name == time_index_name)
1349            }
1350            _ => false,
1351        };
1352
1353        // TODO(yingwen): The split between `time_filters` and `filters` is currently inert
1354        // because `build_range_cache_key()` always keeps both in the cache key. We used to
1355        // strip `time_filters` when the query's `TimestampRange` covered the partition's
1356        // `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove
1357        // a time predicate is implied by that range (it can return a wider range than the
1358        // predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows
1359        // that should have been filtered. Reviving the optimization needs a per-predicate
1360        // implication check that walks each time-only `Expr` (recursing through AND/OR/NOT)
1361        // and proves the predicate holds for every timestamp inside the partition's
1362        // `FileTimeRange`; until then both buckets land in the fingerprint.
1363        if is_time_only
1364            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1365        {
1366            time_filters.push(expr.to_string());
1367        } else {
1368            filters.push(expr.to_string());
1369        }
1370    }
1371
1372    if !has_tag_filter {
1373        // We only cache requests that have tag filters to avoid caching all series.
1374        return None;
1375    }
1376
1377    // Ensure the filters are sorted for consistent fingerprinting.
1378    filters.sort_unstable();
1379    time_filters.sort_unstable();
1380    let read_columns = input.read_cols.clone();
1381    Some(
1382        crate::read::range_cache::ScanRequestFingerprintBuilder {
1383            read_column_types: read_columns
1384                .column_ids_iter()
1385                .map(|id| {
1386                    metadata
1387                        .column_by_id(id)
1388                        .map(|col| col.column_schema.data_type.clone())
1389                })
1390                .collect(),
1391            read_columns,
1392            filters,
1393            time_filters,
1394            series_row_selector: input.series_row_selector,
1395            append_mode: input.append_mode,
1396            filter_deleted: input.filter_deleted,
1397            merge_mode: input.merge_mode,
1398            partition_expr_version: metadata.partition_expr_version,
1399        }
1400        .build(),
1401    )
1402}
1403
1404/// Context shared by different streams from a scanner.
1405/// It contains the input and ranges to scan.
1406pub struct StreamContext {
1407    /// Input memtables and files.
1408    pub input: ScanInput,
1409    /// Metadata for partition ranges.
1410    pub(crate) ranges: Vec<RangeMeta>,
1411    /// Precomputed scan fingerprint for partition range caching.
1412    /// `None` when the scan is not eligible for caching.
1413    #[allow(dead_code)]
1414    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1415
1416    // Metrics:
1417    /// The start time of the query.
1418    pub(crate) query_start: Instant,
1419}
1420
1421impl StreamContext {
1422    /// Creates a new [StreamContext] for [SeqScan].
1423    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1424        let query_start = input.query_start.unwrap_or_else(Instant::now);
1425        let ranges = RangeMeta::seq_scan_ranges(&input);
1426        READ_SST_COUNT.observe(input.num_files() as f64);
1427        let scan_fingerprint = build_scan_fingerprint(&input);
1428
1429        Self {
1430            input,
1431            ranges,
1432            scan_fingerprint,
1433            query_start,
1434        }
1435    }
1436
1437    /// Creates a new [StreamContext] for [UnorderedScan].
1438    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1439        let query_start = input.query_start.unwrap_or_else(Instant::now);
1440        let ranges = RangeMeta::unordered_scan_ranges(&input);
1441        READ_SST_COUNT.observe(input.num_files() as f64);
1442        let scan_fingerprint = build_scan_fingerprint(&input);
1443
1444        Self {
1445            input,
1446            ranges,
1447            scan_fingerprint,
1448            query_start,
1449        }
1450    }
1451
1452    /// Returns true if the index refers to a memtable.
1453    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1454        self.input.num_memtables() > index.index
1455    }
1456
1457    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1458        !self.is_mem_range_index(index)
1459            && index.index < self.input.num_files() + self.input.num_memtables()
1460    }
1461
1462    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1463        let range_meta = &self.ranges[part_range.identifier];
1464        let source_count = range_meta.indices.len();
1465
1466        self.input.range_pre_filter_mode(source_count)
1467    }
1468
1469    /// Retrieves the partition ranges.
1470    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1471        self.ranges
1472            .iter()
1473            .enumerate()
1474            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1475            .collect()
1476    }
1477
1478    /// Format the context for explain.
1479    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1480        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1481        for range_meta in &self.ranges {
1482            for idx in &range_meta.row_group_indices {
1483                if self.is_mem_range_index(*idx) {
1484                    num_mem_ranges += 1;
1485                } else if self.is_file_range_index(*idx) {
1486                    num_file_ranges += 1;
1487                } else {
1488                    num_other_ranges += 1;
1489                }
1490            }
1491        }
1492        if verbose {
1493            write!(f, "{{")?;
1494        }
1495        write!(
1496            f,
1497            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1498            self.ranges.len(),
1499            num_mem_ranges,
1500            self.input.num_files(),
1501            num_file_ranges,
1502        )?;
1503        if num_other_ranges > 0 {
1504            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1505        }
1506        write!(f, "}}")?;
1507
1508        if let Some(selector) = &self.input.series_row_selector {
1509            write!(f, ", \"selector\":\"{}\"", selector)?;
1510        }
1511        if let Some(distribution) = &self.input.distribution {
1512            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1513        }
1514
1515        if verbose {
1516            self.format_verbose_content(f)?;
1517        }
1518
1519        Ok(())
1520    }
1521
1522    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1523        struct FileWrapper<'a> {
1524            file: &'a FileHandle,
1525        }
1526
1527        impl fmt::Debug for FileWrapper<'_> {
1528            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1529                let (start, end) = self.file.time_range();
1530                write!(
1531                    f,
1532                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1533                    self.file.file_id(),
1534                    start.value(),
1535                    start.unit(),
1536                    end.value(),
1537                    end.unit(),
1538                    self.file.num_rows(),
1539                    self.file.size(),
1540                    self.file.index_size()
1541                )
1542            }
1543        }
1544
1545        struct InputWrapper<'a> {
1546            input: &'a ScanInput,
1547        }
1548
1549        #[cfg(feature = "enterprise")]
1550        impl InputWrapper<'_> {
1551            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1552                if self.input.extension_ranges.is_empty() {
1553                    return Ok(());
1554                }
1555
1556                let mut delimiter = "";
1557                write!(f, ", extension_ranges: [")?;
1558                for range in self.input.extension_ranges() {
1559                    write!(f, "{}{:?}", delimiter, range)?;
1560                    delimiter = ", ";
1561                }
1562                write!(f, "]")?;
1563                Ok(())
1564            }
1565        }
1566
1567        impl fmt::Debug for InputWrapper<'_> {
1568            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1569                let output_schema = self.input.mapper.output_schema();
1570                if !output_schema.is_empty() {
1571                    let names: Vec<_> = output_schema
1572                        .column_schemas()
1573                        .iter()
1574                        .map(|col| &col.name)
1575                        .collect();
1576                    write!(f, ", \"projection\": {:?}", names)?;
1577                }
1578                if let Some(predicate) = &self.input.predicate.predicate() {
1579                    if !predicate.exprs().is_empty() {
1580                        let exprs: Vec<_> =
1581                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1582                        write!(f, ", \"filters\": {:?}", exprs)?;
1583                    }
1584                    if !predicate.dyn_filters().is_empty() {
1585                        let dyn_filters: Vec<_> = predicate
1586                            .dyn_filters()
1587                            .iter()
1588                            .map(|f| format!("{}", f))
1589                            .collect();
1590                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1591                    }
1592                }
1593                #[cfg(feature = "vector_index")]
1594                if let Some(vector_index_k) = self.input.vector_index_k {
1595                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1596                }
1597                if !self.input.files.is_empty() {
1598                    write!(f, ", \"files\": ")?;
1599                    f.debug_list()
1600                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1601                        .finish()?;
1602                }
1603                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1604                #[cfg(feature = "enterprise")]
1605                self.format_extension_ranges(f)?;
1606
1607                Ok(())
1608            }
1609        }
1610
1611        write!(f, "{:?}", InputWrapper { input: &self.input })
1612    }
1613
1614    /// Add new dynamic filters to the predicates.
1615    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1616    pub(crate) fn add_dyn_filter_to_predicate(
1617        self: &Arc<Self>,
1618        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1619    ) -> Vec<bool> {
1620        let mut supported = Vec::with_capacity(filter_exprs.len());
1621        let filter_expr = filter_exprs
1622            .into_iter()
1623            .filter_map(|expr| {
1624                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1625                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1626            {
1627                supported.push(true);
1628                Some(dyn_filter)
1629            } else {
1630                supported.push(false);
1631                None
1632            }
1633            })
1634            .collect();
1635        self.input.predicate.add_dyn_filters(filter_expr);
1636        supported
1637    }
1638}
1639
1640/// Predicates to evaluate.
1641/// It only keeps filters that [SimpleFilterEvaluator] supports.
1642#[derive(Clone, Default)]
1643pub struct PredicateGroup {
1644    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1645    /// Predicate that includes request filters and region partition expr (if any).
1646    predicate_all: Predicate,
1647    /// Predicate that only includes request filters.
1648    predicate_without_region: Predicate,
1649    /// Region partition expression restored from metadata.
1650    region_partition_expr: Option<PartitionExpr>,
1651}
1652
1653impl PredicateGroup {
1654    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1655    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1656        let mut combined_exprs = exprs.to_vec();
1657        let mut region_partition_expr = None;
1658
1659        if let Some(expr_json) = metadata.partition_expr.as_ref()
1660            && !expr_json.is_empty()
1661            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1662                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1663        {
1664            let logical_expr = expr
1665                .try_as_logical_expr()
1666                .context(InvalidPartitionExprSnafu {
1667                    expr: expr_json.clone(),
1668                })?;
1669
1670            combined_exprs.push(logical_expr);
1671            region_partition_expr = Some(expr);
1672        }
1673
1674        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1675        // Columns in the expr.
1676        let mut columns = HashSet::new();
1677        for expr in &combined_exprs {
1678            columns.clear();
1679            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1680                continue;
1681            };
1682            time_filters.push(filter);
1683        }
1684        let time_filters = if time_filters.is_empty() {
1685            None
1686        } else {
1687            Some(Arc::new(time_filters))
1688        };
1689
1690        let predicate_all = Predicate::new(combined_exprs);
1691        let predicate_without_region = Predicate::new(exprs.to_vec());
1692
1693        Ok(Self {
1694            time_filters,
1695            predicate_all,
1696            predicate_without_region,
1697            region_partition_expr,
1698        })
1699    }
1700
1701    /// Returns time filters.
1702    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1703        self.time_filters.clone()
1704    }
1705
1706    /// Returns predicate of all exprs (including region partition expr if present).
1707    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1708        if self.predicate_all.is_empty() {
1709            None
1710        } else {
1711            Some(&self.predicate_all)
1712        }
1713    }
1714
1715    /// Returns predicate that excludes region partition expr.
1716    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1717        if self.predicate_without_region.is_empty() {
1718            None
1719        } else {
1720            Some(&self.predicate_without_region)
1721        }
1722    }
1723
1724    /// Add dynamic filters in the predicates.
1725    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1726        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1727        self.predicate_without_region.add_dyn_filters(dyn_filters);
1728    }
1729
1730    /// Returns the region partition expr from metadata, if any.
1731    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1732        self.region_partition_expr.as_ref()
1733    }
1734
1735    fn expr_to_filter(
1736        expr: &Expr,
1737        metadata: &RegionMetadata,
1738        columns: &mut HashSet<Column>,
1739    ) -> Option<SimpleFilterEvaluator> {
1740        columns.clear();
1741        // `expr_to_columns` won't return error.
1742        // We still ignore these expressions for safety.
1743        expr_to_columns(expr, columns).ok()?;
1744        if columns.len() > 1 {
1745            // Simple filter doesn't support multiple columns.
1746            return None;
1747        }
1748        let column = columns.iter().next()?;
1749        let column_meta = metadata.column_by_name(&column.name)?;
1750        if column_meta.semantic_type == SemanticType::Timestamp {
1751            SimpleFilterEvaluator::try_new(expr)
1752        } else {
1753            None
1754        }
1755    }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760    use std::sync::Arc;
1761
1762    use datafusion::physical_plan::expressions::lit as physical_lit;
1763    use datafusion_common::ScalarValue;
1764    use datafusion_expr::{col, lit};
1765    use datatypes::value::Value;
1766    use partition::expr::col as partition_col;
1767    use store_api::metadata::RegionMetadataBuilder;
1768    use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
1769
1770    use super::*;
1771    use crate::cache::CacheManager;
1772    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1773    use crate::test_util::memtable_util::metadata_with_primary_key;
1774    use crate::test_util::scheduler_util::SchedulerEnv;
1775
1776    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1777        let env = SchedulerEnv::new().await;
1778        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1779        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1780        let file = FileHandle::new(
1781            crate::sst::file::FileMeta::default(),
1782            Arc::new(crate::sst::file_purger::NoopFilePurger),
1783        );
1784
1785        ScanInput::new(env.access_layer.clone(), mapper)
1786            .with_predicate(predicate)
1787            .with_cache(CacheStrategy::EnableAll(Arc::new(
1788                CacheManager::builder()
1789                    .range_result_cache_size(1024)
1790                    .build(),
1791            )))
1792            .with_files(vec![file])
1793    }
1794
1795    /// Helper to create a timestamp millisecond literal.
1796    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1797        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1798    }
1799
1800    #[tokio::test]
1801    async fn test_build_scan_fingerprint_for_eligible_scan() {
1802        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1803        let input = new_scan_input(
1804            metadata.clone(),
1805            vec![
1806                col("ts").gt_eq(ts_lit(1000)),
1807                col("k0").eq(lit("foo")),
1808                col("v0").gt(lit(1)),
1809            ],
1810        )
1811        .await
1812        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1813        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1814        .with_merge_mode(MergeMode::LastNonNull)
1815        .with_filter_deleted(false);
1816
1817        let fingerprint = build_scan_fingerprint(&input).unwrap();
1818
1819        let expected = ScanRequestFingerprintBuilder {
1820            read_columns: input.read_cols,
1821            read_column_types: vec![
1822                metadata
1823                    .column_by_id(0)
1824                    .map(|col| col.column_schema.data_type.clone()),
1825                metadata
1826                    .column_by_id(2)
1827                    .map(|col| col.column_schema.data_type.clone()),
1828                metadata
1829                    .column_by_id(3)
1830                    .map(|col| col.column_schema.data_type.clone()),
1831            ],
1832            filters: vec![
1833                col("k0").eq(lit("foo")).to_string(),
1834                col("v0").gt(lit(1)).to_string(),
1835            ],
1836            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1837            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1838            append_mode: false,
1839            filter_deleted: false,
1840            merge_mode: MergeMode::LastNonNull,
1841            partition_expr_version: 0,
1842        }
1843        .build();
1844        assert_eq!(expected, fingerprint);
1845    }
1846
1847    #[tokio::test]
1848    async fn test_build_scan_fingerprint_requires_tag_filter() {
1849        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1850        let input = new_scan_input(
1851            metadata,
1852            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1853        )
1854        .await;
1855
1856        assert!(build_scan_fingerprint(&input).is_none());
1857    }
1858
1859    #[tokio::test]
1860    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1861        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1862        let filters = vec![col("k0").eq(lit("foo"))];
1863
1864        let disabled = ScanInput::new(
1865            SchedulerEnv::new().await.access_layer.clone(),
1866            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1867        )
1868        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1869        assert!(build_scan_fingerprint(&disabled).is_none());
1870
1871        let compaction = new_scan_input(metadata.clone(), filters.clone())
1872            .await
1873            .with_compaction(true);
1874        assert!(build_scan_fingerprint(&compaction).is_none());
1875
1876        // No files to read.
1877        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1878        assert!(build_scan_fingerprint(&no_files).is_none());
1879    }
1880
1881    #[tokio::test]
1882    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1883        let base = metadata_with_primary_key(vec![0, 1], false);
1884        let mut builder = RegionMetadataBuilder::from_existing(base);
1885        let partition_expr = partition_col("k0")
1886            .gt_eq(Value::String("foo".into()))
1887            .as_json_str()
1888            .unwrap();
1889        builder.partition_expr_json(Some(partition_expr));
1890        let metadata = Arc::new(builder.build_without_validation().unwrap());
1891
1892        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1893        let fingerprint = build_scan_fingerprint(&input).unwrap();
1894
1895        let expected = ScanRequestFingerprintBuilder {
1896            read_columns: input.read_cols,
1897            read_column_types: vec![
1898                metadata
1899                    .column_by_id(0)
1900                    .map(|col| col.column_schema.data_type.clone()),
1901                metadata
1902                    .column_by_id(2)
1903                    .map(|col| col.column_schema.data_type.clone()),
1904                metadata
1905                    .column_by_id(3)
1906                    .map(|col| col.column_schema.data_type.clone()),
1907            ],
1908            filters: vec![col("k0").eq(lit("foo")).to_string()],
1909            time_filters: vec![],
1910            series_row_selector: None,
1911            append_mode: false,
1912            filter_deleted: true,
1913            merge_mode: MergeMode::LastRow,
1914            partition_expr_version: metadata.partition_expr_version,
1915        }
1916        .build();
1917        assert_eq!(expected, fingerprint);
1918        assert_ne!(0, metadata.partition_expr_version);
1919    }
1920
1921    #[test]
1922    fn test_update_dyn_filters_with_empty_base_predicates() {
1923        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1924        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
1925        assert!(predicate_group.predicate().is_none());
1926        assert!(predicate_group.predicate_without_region().is_none());
1927
1928        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
1929        predicate_group.add_dyn_filters(vec![dyn_filter]);
1930
1931        let predicate_all = predicate_group.predicate().unwrap();
1932        assert!(predicate_all.exprs().is_empty());
1933        assert_eq!(1, predicate_all.dyn_filters().len());
1934
1935        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
1936        assert!(predicate_without_region.exprs().is_empty());
1937        assert_eq!(1, predicate_without_region.dyn_filters().len());
1938    }
1939
1940    #[tokio::test]
1941    async fn test_range_pre_filter_mode() {
1942        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1943        let cases = [
1944            (true, MergeMode::LastRow, 1, PreFilterMode::All),
1945            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
1946            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
1947            (true, MergeMode::LastRow, 2, PreFilterMode::All),
1948        ];
1949
1950        for (append_mode, merge_mode, source_count, expected_mode) in cases {
1951            let input = new_scan_input(metadata.clone(), vec![])
1952                .await
1953                .with_append_mode(append_mode)
1954                .with_merge_mode(merge_mode);
1955
1956            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
1957        }
1958    }
1959}