Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::schema::ext::ArrowSchemaExt;
35use futures::StreamExt;
36use itertools::Itertools;
37use partition::expr::PartitionExpr;
38use smallvec::SmallVec;
39use snafu::ResultExt;
40use store_api::metadata::{RegionMetadata, RegionMetadataRef};
41use store_api::region_engine::{PartitionRange, RegionScannerRef};
42use store_api::storage::{
43    RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
44    TimeSeriesRowSelector,
45};
46use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
47use tokio::sync::{Semaphore, mpsc};
48use tokio_stream::wrappers::ReceiverStream;
49
50use crate::access_layer::AccessLayerRef;
51use crate::cache::CacheStrategy;
52use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
53use crate::error::{InvalidPartitionExprSnafu, Result};
54#[cfg(feature = "enterprise")]
55use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
56use crate::memtable::{MemtableRange, RangesOptions};
57use crate::metrics::READ_SST_COUNT;
58use crate::read::compat::{self, FlatCompatBatch};
59use crate::read::flat_projection::FlatProjectionMapper;
60use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
61use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
62use crate::read::read_columns::{
63    ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
64};
65use crate::read::seq_scan::SeqScan;
66use crate::read::series_scan::SeriesScan;
67use crate::read::stream::ScanBatchStream;
68use crate::read::unordered_scan::UnorderedScan;
69use crate::read::{BoxedRecordBatchStream, RecordBatch};
70use crate::region::options::MergeMode;
71use crate::region::version::VersionRef;
72use crate::sst::file::FileHandle;
73use crate::sst::index::bloom_filter::applier::{
74    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
75};
76use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
77use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
78use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
79use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
80#[cfg(feature = "vector_index")]
81use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
82use crate::sst::parquet::file_range::PreFilterMode;
83use crate::sst::parquet::reader::ReaderMetrics;
84
85#[cfg(feature = "vector_index")]
86const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
87
88/// A scanner scans a region and returns a [SendableRecordBatchStream].
89pub(crate) enum Scanner {
90    /// Sequential scan.
91    Seq(SeqScan),
92    /// Unordered scan.
93    Unordered(UnorderedScan),
94    /// Per-series scan.
95    Series(SeriesScan),
96}
97
98impl Scanner {
99    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
100    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
101    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
102        match self {
103            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
104            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
105            Scanner::Series(series_scan) => series_scan.build_stream().await,
106        }
107    }
108
109    /// Create a stream of [`Batch`] by this scanner.
110    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
111        match self {
112            Scanner::Seq(x) => x.scan_all_partitions(),
113            Scanner::Unordered(x) => x.scan_all_partitions(),
114            Scanner::Series(x) => x.scan_all_partitions(),
115        }
116    }
117}
118
119#[cfg(test)]
120impl Scanner {
121    /// Returns number of files to scan.
122    pub(crate) fn num_files(&self) -> usize {
123        match self {
124            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
125            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
126            Scanner::Series(series_scan) => series_scan.input().num_files(),
127        }
128    }
129
130    /// Returns number of memtables to scan.
131    pub(crate) fn num_memtables(&self) -> usize {
132        match self {
133            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
134            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
135            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
136        }
137    }
138
139    /// Returns SST file ids to scan.
140    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
141        match self {
142            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
143            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
144            Scanner::Series(series_scan) => series_scan.input().file_ids(),
145        }
146    }
147
148    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
149        match self {
150            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
151            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
152            Scanner::Series(series_scan) => series_scan.input().index_ids(),
153        }
154    }
155
156    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
157        match self {
158            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
159            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
160            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
161        }
162    }
163
164    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
165    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
166        use store_api::region_engine::{PrepareRequest, RegionScanner};
167
168        let request = PrepareRequest::default().with_target_partitions(target_partitions);
169        match self {
170            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
171            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
172            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
173        }
174    }
175}
176
177#[cfg_attr(doc, aquamarine::aquamarine)]
178/// Helper to scans a region by [ScanRequest].
179///
180/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
181/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
182///
183/// ```mermaid
184/// classDiagram
185/// class ScanRegion {
186///     -VersionRef version
187///     -ScanRequest request
188///     ~scanner() Scanner
189///     ~seq_scan() SeqScan
190/// }
191/// class Scanner {
192///     <<enumeration>>
193///     SeqScan
194///     UnorderedScan
195///     +scan() SendableRecordBatchStream
196/// }
197/// class SeqScan {
198///     -ScanInput input
199///     +build() SendableRecordBatchStream
200/// }
201/// class UnorderedScan {
202///     -ScanInput input
203///     +build() SendableRecordBatchStream
204/// }
205/// class ScanInput {
206///     -ProjectionMapper mapper
207///     -Option~TimeRange~ time_range
208///     -Option~Predicate~ predicate
209///     -Vec~MemtableRef~ memtables
210///     -Vec~FileHandle~ files
211/// }
212/// class ProjectionMapper {
213///     ~output_schema() SchemaRef
214///     ~convert(Batch) RecordBatch
215/// }
216/// ScanRegion -- Scanner
217/// ScanRegion o-- ScanRequest
218/// Scanner o-- SeqScan
219/// Scanner o-- UnorderedScan
220/// SeqScan o-- ScanInput
221/// UnorderedScan o-- ScanInput
222/// Scanner -- SendableRecordBatchStream
223/// ScanInput o-- ProjectionMapper
224/// SeqScan -- SendableRecordBatchStream
225/// UnorderedScan -- SendableRecordBatchStream
226/// ```
227pub(crate) struct ScanRegion {
228    /// Version of the region at scan.
229    version: VersionRef,
230    /// Access layer of the region.
231    access_layer: AccessLayerRef,
232    /// Scan request.
233    request: ScanRequest,
234    /// Cache.
235    cache_strategy: CacheStrategy,
236    /// Maximum number of SST files to scan concurrently.
237    max_concurrent_scan_files: usize,
238    /// Whether to ignore inverted index.
239    ignore_inverted_index: bool,
240    /// Whether to ignore fulltext index.
241    ignore_fulltext_index: bool,
242    /// Whether to ignore bloom filter.
243    ignore_bloom_filter: bool,
244    /// Start time of the scan task.
245    start_time: Option<Instant>,
246    /// Whether to filter out the deleted rows.
247    /// Usually true for normal read, and false for scan for compaction.
248    filter_deleted: bool,
249    #[cfg(feature = "enterprise")]
250    extension_range_provider: Option<BoxedExtensionRangeProvider>,
251}
252
253impl ScanRegion {
254    /// Creates a [ScanRegion].
255    pub(crate) fn new(
256        version: VersionRef,
257        access_layer: AccessLayerRef,
258        request: ScanRequest,
259        cache_strategy: CacheStrategy,
260    ) -> ScanRegion {
261        ScanRegion {
262            version,
263            access_layer,
264            request,
265            cache_strategy,
266            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
267            ignore_inverted_index: false,
268            ignore_fulltext_index: false,
269            ignore_bloom_filter: false,
270            start_time: None,
271            filter_deleted: true,
272            #[cfg(feature = "enterprise")]
273            extension_range_provider: None,
274        }
275    }
276
277    /// Sets maximum number of SST files to scan concurrently.
278    #[must_use]
279    pub(crate) fn with_max_concurrent_scan_files(
280        mut self,
281        max_concurrent_scan_files: usize,
282    ) -> Self {
283        self.max_concurrent_scan_files = max_concurrent_scan_files;
284        self
285    }
286
287    /// Sets whether to ignore inverted index.
288    #[must_use]
289    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290        self.ignore_inverted_index = ignore;
291        self
292    }
293
294    /// Sets whether to ignore fulltext index.
295    #[must_use]
296    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297        self.ignore_fulltext_index = ignore;
298        self
299    }
300
301    /// Sets whether to ignore bloom filter.
302    #[must_use]
303    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304        self.ignore_bloom_filter = ignore;
305        self
306    }
307
308    #[must_use]
309    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310        self.start_time = Some(now);
311        self
312    }
313
314    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315        self.filter_deleted = filter_deleted;
316    }
317
318    #[cfg(feature = "enterprise")]
319    pub(crate) fn set_extension_range_provider(
320        &mut self,
321        extension_range_provider: BoxedExtensionRangeProvider,
322    ) {
323        self.extension_range_provider = Some(extension_range_provider);
324    }
325
326    /// Returns a [Scanner] to scan the region.
327    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328    pub(crate) async fn scanner(self) -> Result<Scanner> {
329        if self.use_series_scan() {
330            self.series_scan().await.map(Scanner::Series)
331        } else if self.use_unordered_scan() {
332            // If table is append only and there is no series row selector, we use unordered scan in query.
333            // We still use seq scan in compaction.
334            self.unordered_scan().await.map(Scanner::Unordered)
335        } else {
336            self.seq_scan().await.map(Scanner::Seq)
337        }
338    }
339
340    /// Returns a [RegionScanner] to scan the region.
341    #[tracing::instrument(
342        level = tracing::Level::DEBUG,
343        skip_all,
344        fields(region_id = %self.region_id())
345    )]
346    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347        if self.use_series_scan() {
348            self.series_scan()
349                .await
350                .map(|scanner| Box::new(scanner) as _)
351        } else if self.use_unordered_scan() {
352            self.unordered_scan()
353                .await
354                .map(|scanner| Box::new(scanner) as _)
355        } else {
356            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357        }
358    }
359
360    /// Scan sequentially.
361    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363        let input = self.scan_input().await?.with_compaction(false);
364        Ok(SeqScan::new(input))
365    }
366
367    /// Unordered scan.
368    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370        let input = self.scan_input().await?;
371        Ok(UnorderedScan::new(input))
372    }
373
374    /// Scans by series.
375    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377        let input = self.scan_input().await?;
378        Ok(SeriesScan::new(input))
379    }
380
381    /// Returns true if the region can use unordered scan for current request.
382    fn use_unordered_scan(&self) -> bool {
383        // We use unordered scan when:
384        // 1. The region is in append mode.
385        // 2. There is no series row selector.
386        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
387        //
388        // We still use seq scan in compaction.
389        self.version.options.append_mode
390            && self.request.series_row_selector.is_none()
391            && (self.request.distribution.is_none()
392                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393    }
394
395    /// Returns true if the region can use series scan for current request.
396    fn use_series_scan(&self) -> bool {
397        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398    }
399
400    /// Creates a scan input.
401    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
402    async fn scan_input(self) -> Result<ScanInput> {
403        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
404        let time_range = self.build_time_range_predicate();
405        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
406
407        let read_cols = match &self.request.projection_input {
408            Some(p) => {
409                // Read columns include the pushed-down projection and columns
410                // resolved from the predicate.
411                let metadata = &self.version.metadata;
412                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
413                let from_predicate = read_columns_from_predicate(&predicate, metadata);
414                merge(from_projection, from_predicate)
415            }
416            None => {
417                let read_col_ids = self
418                    .version
419                    .metadata
420                    .column_metadatas
421                    .iter()
422                    .map(|col| col.column_id);
423                ReadColumns::from_deduped_column_ids(read_col_ids)
424            }
425        };
426        let read_col_ids = read_cols.column_ids();
427
428        // The mapper always computes projected column ids as the schema of SSTs may change.
429        let projection = self
430            .request
431            .projection_indices()
432            .map(|x| x.to_vec())
433            .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
434        let json_type_hint = self
435            .version
436            .metadata
437            .schema
438            .arrow_schema()
439            .has_json_extension_field()
440            .then_some(&self.request.json_type_hint)
441            .inspect(|json_type_hint| {
442                debug!(
443                    "Concretized JSON type: {{{}}}",
444                    json_type_hint
445                        .iter()
446                        .map(|(k, v)| format!("{}: {}", k, v))
447                        .join(", ")
448                );
449            });
450        let mapper = FlatProjectionMapper::new_with_read_columns(
451            &self.version.metadata,
452            projection,
453            read_cols,
454            json_type_hint,
455        )?;
456
457        let ssts = &self.version.ssts;
458        let mut files = Vec::new();
459        if !self.request.skip_sst_files {
460            for level in ssts.levels() {
461                for file in level.files.values() {
462                    let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
463                        (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
464                        // If the file's sequence is None (or actually is zero), it could mean the file
465                        // is generated and added to the region "directly". In this case, its data should
466                        // be considered as fresh as the memtable. So its sequence is treated greater than
467                        // the min_sequence, whatever the value of min_sequence is. Hence the default
468                        // "true" in this arm.
469                        (Some(_), None) => true,
470                        (None, _) => true,
471                    };
472
473                    // Finds SST files in range.
474                    if exceed_min_sequence && file_in_range(file, &time_range) {
475                        files.push(file.clone());
476                    }
477                    // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
478                    // unless the timing is too good, or the sequence number wouldn't be in file.
479                    // and the batch will be filtered out by tree reader anyway.
480                }
481            }
482        }
483
484        let memtables = self.version.memtables.list_memtables();
485        // Skip empty memtables and memtables out of time range.
486        let mut mem_range_builders = Vec::new();
487        let filter_mode = pre_filter_mode(
488            self.version.options.append_mode,
489            self.version.options.merge_mode(),
490        );
491
492        for m in memtables {
493            // check if memtable is empty by reading stats.
494            let Some((start, end)) = m.stats().time_range() else {
495                continue;
496            };
497            // The time range of the memtable is inclusive.
498            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
499            if !memtable_range.intersects(&time_range) {
500                continue;
501            }
502            let ranges_in_memtable = m.ranges(
503                Some(&read_col_ids),
504                RangesOptions::default()
505                    .with_predicate(predicate.clone())
506                    .with_sequence(SequenceRange::new(
507                        self.request.memtable_min_sequence,
508                        self.request.memtable_max_sequence,
509                    ))
510                    .with_pre_filter_mode(filter_mode),
511            )?;
512            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
513                let stats = v.stats().clone();
514                MemRangeBuilder::new(v, stats)
515            }));
516        }
517
518        let region_id = self.region_id();
519        debug!(
520            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
521            region_id,
522            self.request,
523            time_range,
524            mem_range_builders.len(),
525            files.len(),
526            self.version.options.append_mode,
527        );
528
529        let (non_field_filters, field_filters) = self.partition_by_field_filters();
530        let inverted_index_appliers = [
531            self.build_invereted_index_applier(&non_field_filters),
532            self.build_invereted_index_applier(&field_filters),
533        ];
534        let bloom_filter_appliers = [
535            self.build_bloom_filter_applier(&non_field_filters),
536            self.build_bloom_filter_applier(&field_filters),
537        ];
538        let fulltext_index_appliers = [
539            self.build_fulltext_index_applier(&non_field_filters),
540            self.build_fulltext_index_applier(&field_filters),
541        ];
542        #[cfg(feature = "vector_index")]
543        let vector_index_applier = self.build_vector_index_applier();
544        #[cfg(feature = "vector_index")]
545        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
546            if self.request.filters.is_empty() {
547                search.k
548            } else {
549                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
550            }
551        });
552
553        let input = ScanInput::new(self.access_layer, mapper)
554            .with_time_range(Some(time_range))
555            .with_predicate(predicate)
556            .with_memtables(mem_range_builders)
557            .with_files(files)
558            .with_cache(self.cache_strategy)
559            .with_inverted_index_appliers(inverted_index_appliers)
560            .with_bloom_filter_index_appliers(bloom_filter_appliers)
561            .with_fulltext_index_appliers(fulltext_index_appliers)
562            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
563            .with_start_time(self.start_time)
564            .with_append_mode(self.version.options.append_mode)
565            .with_filter_deleted(self.filter_deleted)
566            .with_merge_mode(self.version.options.merge_mode())
567            .with_series_row_selector(self.request.series_row_selector)
568            .with_distribution(self.request.distribution)
569            .with_explain_flat_format(
570                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
571            )
572            .with_snapshot_sequence(
573                self.request
574                    .snapshot_on_scan
575                    .then_some(self.request.memtable_max_sequence)
576                    .flatten(),
577            );
578        #[cfg(feature = "vector_index")]
579        let input = input
580            .with_vector_index_applier(vector_index_applier)
581            .with_vector_index_k(vector_index_k);
582
583        #[cfg(feature = "enterprise")]
584        let input = if !self.request.skip_sst_files
585            && let Some(provider) = self.extension_range_provider
586        {
587            let ranges = provider
588                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
589                .await?;
590            debug!("Find extension ranges: {ranges:?}");
591            input.with_extension_ranges(ranges)
592        } else {
593            input
594        };
595        Ok(input)
596    }
597
598    fn region_id(&self) -> RegionId {
599        self.version.metadata.region_id
600    }
601
602    /// Build time range predicate from filters.
603    fn build_time_range_predicate(&self) -> TimestampRange {
604        let time_index = self.version.metadata.time_index_column();
605        let unit = time_index
606            .column_schema
607            .data_type
608            .as_timestamp()
609            .expect("Time index must have timestamp-compatible type")
610            .unit();
611        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
612    }
613
614    /// Partitions filters into two groups: non-field filters and field filters.
615    /// Returns `(non_field_filters, field_filters)`.
616    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
617        let field_columns = self
618            .version
619            .metadata
620            .field_columns()
621            .map(|col| &col.column_schema.name)
622            .collect::<HashSet<_>>();
623
624        let mut columns = HashSet::new();
625
626        self.request.filters.iter().cloned().partition(|expr| {
627            columns.clear();
628            // `expr_to_columns` won't return error.
629            if expr_to_columns(expr, &mut columns).is_err() {
630                // If we can't extract columns, treat it as non-field filter
631                return true;
632            }
633            // Return true for non-field filters (partition puts true cases in first vec)
634            !columns
635                .iter()
636                .any(|column| field_columns.contains(&column.name))
637        })
638    }
639
640    /// Use the latest schema to build the inverted index applier.
641    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
642        if self.ignore_inverted_index {
643            return None;
644        }
645
646        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
647        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
648
649        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
650
651        InvertedIndexApplierBuilder::new(
652            self.access_layer.table_dir().to_string(),
653            self.access_layer.path_type(),
654            self.access_layer.object_store().clone(),
655            self.version.metadata.as_ref(),
656            self.version.metadata.inverted_indexed_column_ids(
657                self.version
658                    .options
659                    .index_options
660                    .inverted_index
661                    .ignore_column_ids
662                    .iter(),
663            ),
664            self.access_layer.puffin_manager_factory().clone(),
665        )
666        .with_file_cache(file_cache)
667        .with_inverted_index_cache(inverted_index_cache)
668        .with_puffin_metadata_cache(puffin_metadata_cache)
669        .build(filters)
670        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
671        .ok()
672        .flatten()
673        .map(Arc::new)
674    }
675
676    /// Use the latest schema to build the bloom filter index applier.
677    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
678        if self.ignore_bloom_filter {
679            return None;
680        }
681
682        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
683        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
684        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
685
686        BloomFilterIndexApplierBuilder::new(
687            self.access_layer.table_dir().to_string(),
688            self.access_layer.path_type(),
689            self.access_layer.object_store().clone(),
690            self.version.metadata.as_ref(),
691            self.access_layer.puffin_manager_factory().clone(),
692        )
693        .with_file_cache(file_cache)
694        .with_bloom_filter_index_cache(bloom_filter_index_cache)
695        .with_puffin_metadata_cache(puffin_metadata_cache)
696        .build(filters)
697        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
698        .ok()
699        .flatten()
700        .map(Arc::new)
701    }
702
703    /// Use the latest schema to build the fulltext index applier.
704    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
705        if self.ignore_fulltext_index {
706            return None;
707        }
708
709        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
710        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
711        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
712        FulltextIndexApplierBuilder::new(
713            self.access_layer.table_dir().to_string(),
714            self.access_layer.path_type(),
715            self.access_layer.object_store().clone(),
716            self.access_layer.puffin_manager_factory().clone(),
717            self.version.metadata.as_ref(),
718        )
719        .with_file_cache(file_cache)
720        .with_puffin_metadata_cache(puffin_metadata_cache)
721        .with_bloom_filter_cache(bloom_filter_index_cache)
722        .build(filters)
723        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
724        .ok()
725        .flatten()
726        .map(Arc::new)
727    }
728
729    /// Build the vector index applier from vector search request.
730    #[cfg(feature = "vector_index")]
731    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
732        let vector_search = self.request.vector_search.as_ref()?;
733
734        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
735        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
736        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
737
738        let applier = VectorIndexApplier::new(
739            self.access_layer.table_dir().to_string(),
740            self.access_layer.path_type(),
741            self.access_layer.object_store().clone(),
742            self.access_layer.puffin_manager_factory().clone(),
743            vector_search.column_id,
744            vector_search.query_vector.clone(),
745            vector_search.metric,
746        )
747        .with_file_cache(file_cache)
748        .with_puffin_metadata_cache(puffin_metadata_cache)
749        .with_vector_index_cache(vector_index_cache);
750
751        Some(Arc::new(applier))
752    }
753}
754
755/// Returns true if the time range of a SST `file` matches the `predicate`.
756fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
757    if predicate == &TimestampRange::min_to_max() {
758        return true;
759    }
760    // end timestamp of a SST is inclusive.
761    let (start, end) = file.time_range();
762    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
763    file_ts_range.intersects(predicate)
764}
765
766/// Common input for different scanners.
767pub struct ScanInput {
768    /// Region SST access layer.
769    access_layer: AccessLayerRef,
770    /// Maps projected Batches to RecordBatches.
771    pub(crate) mapper: Arc<FlatProjectionMapper>,
772    /// The columns to read from memtables and SSTs.
773    /// Notice this is different from the columns in `mapper` which are projected columns.
774    /// But this read columns might also include non-projected columns needed for filtering.
775    pub(crate) read_cols: ReadColumns,
776    /// Time range filter for time index.
777    pub(crate) time_range: Option<TimestampRange>,
778    /// Predicate to push down.
779    pub(crate) predicate: PredicateGroup,
780    /// Region partition expr applied at read time.
781    region_partition_expr: Option<PartitionExpr>,
782    /// Memtable range builders for memtables in the time range..
783    pub(crate) memtables: Vec<MemRangeBuilder>,
784    /// Handles to SST files to scan.
785    pub(crate) files: Vec<FileHandle>,
786    /// Cache.
787    pub(crate) cache_strategy: CacheStrategy,
788    /// Ignores file not found error.
789    ignore_file_not_found: bool,
790    /// Maximum number of SST files to scan concurrently.
791    pub(crate) max_concurrent_scan_files: usize,
792    /// Index appliers.
793    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
794    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
795    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
796    /// Vector index applier for KNN search.
797    #[cfg(feature = "vector_index")]
798    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
799    /// Over-fetched k for vector index scan.
800    #[cfg(feature = "vector_index")]
801    pub(crate) vector_index_k: Option<usize>,
802    /// Start time of the query.
803    pub(crate) query_start: Option<Instant>,
804    /// The region is using append mode.
805    pub(crate) append_mode: bool,
806    /// Whether to remove deletion markers.
807    pub(crate) filter_deleted: bool,
808    /// Mode to merge duplicate rows.
809    pub(crate) merge_mode: MergeMode,
810    /// Hint to select rows from time series.
811    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
812    /// Hint for the required distribution of the scanner.
813    pub(crate) distribution: Option<TimeSeriesDistribution>,
814    /// Whether the region's configured SST format is flat.
815    explain_flat_format: bool,
816    /// Snapshot upper bound bound at scan open and propagated back to the caller.
817    pub(crate) snapshot_sequence: Option<SequenceNumber>,
818    /// Whether this scan is for compaction.
819    pub(crate) compaction: bool,
820    #[cfg(feature = "enterprise")]
821    extension_ranges: Vec<BoxedExtensionRange>,
822}
823
824impl ScanInput {
825    /// Creates a new [ScanInput].
826    #[must_use]
827    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
828        ScanInput {
829            access_layer,
830            read_cols: mapper.read_columns().clone(),
831            mapper: Arc::new(mapper),
832            time_range: None,
833            predicate: PredicateGroup::default(),
834            region_partition_expr: None,
835            memtables: Vec::new(),
836            files: Vec::new(),
837            cache_strategy: CacheStrategy::Disabled,
838            ignore_file_not_found: false,
839            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
840            inverted_index_appliers: [None, None],
841            bloom_filter_index_appliers: [None, None],
842            fulltext_index_appliers: [None, None],
843            #[cfg(feature = "vector_index")]
844            vector_index_applier: None,
845            #[cfg(feature = "vector_index")]
846            vector_index_k: None,
847            query_start: None,
848            append_mode: false,
849            filter_deleted: true,
850            merge_mode: MergeMode::default(),
851            series_row_selector: None,
852            distribution: None,
853            explain_flat_format: false,
854            snapshot_sequence: None,
855            compaction: false,
856            #[cfg(feature = "enterprise")]
857            extension_ranges: Vec::new(),
858        }
859    }
860
861    /// Sets time range filter for time index.
862    #[must_use]
863    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
864        self.time_range = time_range;
865        self
866    }
867
868    /// Sets predicate to push down.
869    #[must_use]
870    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
871        self.region_partition_expr = predicate.region_partition_expr().cloned();
872        self.predicate = predicate;
873        self
874    }
875
876    /// Sets memtable range builders.
877    #[must_use]
878    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
879        self.memtables = memtables;
880        self
881    }
882
883    /// Sets files to read.
884    #[must_use]
885    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
886        self.files = files;
887        self
888    }
889
890    /// Sets cache for this query.
891    #[must_use]
892    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
893        self.cache_strategy = cache;
894        self
895    }
896
897    /// Ignores file not found error.
898    #[must_use]
899    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
900        self.ignore_file_not_found = ignore;
901        self
902    }
903
904    /// Sets maximum number of SST files to scan concurrently.
905    #[must_use]
906    pub(crate) fn with_max_concurrent_scan_files(
907        mut self,
908        max_concurrent_scan_files: usize,
909    ) -> Self {
910        self.max_concurrent_scan_files = max_concurrent_scan_files;
911        self
912    }
913
914    /// Sets inverted index appliers.
915    #[must_use]
916    pub(crate) fn with_inverted_index_appliers(
917        mut self,
918        appliers: [Option<InvertedIndexApplierRef>; 2],
919    ) -> Self {
920        self.inverted_index_appliers = appliers;
921        self
922    }
923
924    /// Sets bloom filter appliers.
925    #[must_use]
926    pub(crate) fn with_bloom_filter_index_appliers(
927        mut self,
928        appliers: [Option<BloomFilterIndexApplierRef>; 2],
929    ) -> Self {
930        self.bloom_filter_index_appliers = appliers;
931        self
932    }
933
934    /// Sets fulltext index appliers.
935    #[must_use]
936    pub(crate) fn with_fulltext_index_appliers(
937        mut self,
938        appliers: [Option<FulltextIndexApplierRef>; 2],
939    ) -> Self {
940        self.fulltext_index_appliers = appliers;
941        self
942    }
943
944    /// Sets vector index applier for KNN search.
945    #[cfg(feature = "vector_index")]
946    #[must_use]
947    pub(crate) fn with_vector_index_applier(
948        mut self,
949        applier: Option<VectorIndexApplierRef>,
950    ) -> Self {
951        self.vector_index_applier = applier;
952        self
953    }
954
955    /// Sets over-fetched k for vector index scan.
956    #[cfg(feature = "vector_index")]
957    #[must_use]
958    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
959        self.vector_index_k = k;
960        self
961    }
962
963    /// Sets start time of the query.
964    #[must_use]
965    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
966        self.query_start = now;
967        self
968    }
969
970    #[must_use]
971    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
972        self.append_mode = is_append_mode;
973        self
974    }
975
976    /// Sets whether to remove deletion markers during scan.
977    #[must_use]
978    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
979        self.filter_deleted = filter_deleted;
980        self
981    }
982
983    /// Sets the merge mode.
984    #[must_use]
985    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
986        self.merge_mode = merge_mode;
987        self
988    }
989
990    /// Sets the distribution hint.
991    #[must_use]
992    pub(crate) fn with_distribution(
993        mut self,
994        distribution: Option<TimeSeriesDistribution>,
995    ) -> Self {
996        self.distribution = distribution;
997        self
998    }
999
1000    /// Sets whether the region's configured SST format is flat for explain output.
1001    #[must_use]
1002    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1003        self.explain_flat_format = explain_flat_format;
1004        self
1005    }
1006
1007    /// Sets the time series row selector.
1008    #[must_use]
1009    pub(crate) fn with_series_row_selector(
1010        mut self,
1011        series_row_selector: Option<TimeSeriesRowSelector>,
1012    ) -> Self {
1013        self.series_row_selector = series_row_selector;
1014        self
1015    }
1016
1017    #[must_use]
1018    pub(crate) fn with_snapshot_sequence(
1019        mut self,
1020        snapshot_sequence: Option<SequenceNumber>,
1021    ) -> Self {
1022        self.snapshot_sequence = snapshot_sequence;
1023        self
1024    }
1025
1026    /// Sets whether this scan is for compaction.
1027    #[must_use]
1028    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1029        self.compaction = compaction;
1030        self
1031    }
1032
1033    /// Builds memtable ranges to scan by `index`.
1034    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1035        let memtable = &self.memtables[index.index];
1036        let mut ranges = SmallVec::new();
1037        memtable.build_ranges(index.row_group_index, &mut ranges);
1038        ranges
1039    }
1040
1041    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1042        if self.should_skip_region_partition(file) {
1043            self.predicate.predicate_without_region().cloned()
1044        } else {
1045            self.predicate.predicate().cloned()
1046        }
1047    }
1048
1049    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1050        match (
1051            self.region_partition_expr.as_ref(),
1052            file.meta_ref().partition_expr.as_ref(),
1053        ) {
1054            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1055            _ => false,
1056        }
1057    }
1058
1059    /// Prunes a file to scan and returns the builder to build readers.
1060    #[tracing::instrument(
1061        skip_all,
1062        fields(
1063            region_id = %self.region_metadata().region_id,
1064            file_id = %file.file_id()
1065        )
1066    )]
1067    pub async fn prune_file(
1068        &self,
1069        file: &FileHandle,
1070        pre_filter_mode: PreFilterMode,
1071        reader_metrics: &mut ReaderMetrics,
1072    ) -> Result<FileRangeBuilder> {
1073        let predicate = self.predicate_for_file(file);
1074        let may_build_selective_row_selection = predicate.is_some();
1075        let decode_pk_values = !self.compaction
1076            && self
1077                .mapper
1078                .read_columns()
1079                .column_ids_iter()
1080                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1081        let reader = self
1082            .access_layer
1083            .read_sst(file.clone())
1084            .predicate(predicate)
1085            .projection(Some(self.read_cols.clone()))
1086            .cache(self.cache_strategy.clone())
1087            .inverted_index_appliers(self.inverted_index_appliers.clone())
1088            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1089            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1090        let reader = if !self.compaction && may_build_selective_row_selection {
1091            reader.deferred_optional_page_index()
1092        } else {
1093            reader
1094        };
1095        #[cfg(feature = "vector_index")]
1096        let reader = {
1097            let mut reader = reader;
1098            reader =
1099                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1100            reader
1101        };
1102        let res = reader
1103            .expected_metadata(Some(self.mapper.metadata().clone()))
1104            .compaction(self.compaction)
1105            .pre_filter_mode(pre_filter_mode)
1106            .decode_primary_key_values(decode_pk_values)
1107            .build_reader_input(reader_metrics)
1108            .await;
1109        let read_input = match res {
1110            Ok(x) => x,
1111            Err(e) => {
1112                if e.is_object_not_found() && self.ignore_file_not_found {
1113                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1114                    return Ok(FileRangeBuilder::default());
1115                } else {
1116                    return Err(e);
1117                }
1118            }
1119        };
1120
1121        let Some((mut file_range_ctx, selection)) = read_input else {
1122            return Ok(FileRangeBuilder::default());
1123        };
1124
1125        let need_compat = !compat::has_same_columns_and_pk_encoding(
1126            &self.mapper,
1127            file_range_ctx.read_format(),
1128            self.compaction,
1129        );
1130        if need_compat {
1131            // They have different schema. We need to adapt the batch first so the
1132            // mapper can convert it.
1133            let compat = FlatCompatBatch::try_new(
1134                &self.mapper,
1135                file_range_ctx.read_format(),
1136                self.compaction,
1137            )?;
1138            file_range_ctx.set_compat_batch(compat);
1139        }
1140        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1141    }
1142
1143    /// Scans flat sources (RecordBatch streams) in parallel.
1144    ///
1145    /// # Panics if the input doesn't allow parallel scan.
1146    #[tracing::instrument(
1147        skip(self, sources, semaphore),
1148        fields(
1149            region_id = %self.region_metadata().region_id,
1150            source_count = sources.len()
1151        )
1152    )]
1153    pub(crate) fn create_parallel_flat_sources(
1154        &self,
1155        sources: Vec<BoxedRecordBatchStream>,
1156        semaphore: Arc<Semaphore>,
1157        channel_size: usize,
1158    ) -> Result<Vec<BoxedRecordBatchStream>> {
1159        if sources.len() <= 1 {
1160            return Ok(sources);
1161        }
1162
1163        // Spawn a task for each source.
1164        let sources = sources
1165            .into_iter()
1166            .map(|source| {
1167                let (sender, receiver) = mpsc::channel(channel_size);
1168                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1169                let stream = Box::pin(ReceiverStream::new(receiver));
1170                Box::pin(stream) as _
1171            })
1172            .collect();
1173        Ok(sources)
1174    }
1175
1176    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1177    #[tracing::instrument(
1178        skip(self, input, semaphore, sender),
1179        fields(region_id = %self.region_metadata().region_id)
1180    )]
1181    pub(crate) fn spawn_flat_scan_task(
1182        &self,
1183        mut input: BoxedRecordBatchStream,
1184        semaphore: Arc<Semaphore>,
1185        sender: mpsc::Sender<Result<RecordBatch>>,
1186    ) {
1187        let region_id = self.region_metadata().region_id;
1188        let span = tracing::info_span!(
1189            "ScanInput::parallel_scan_task",
1190            region_id = %region_id,
1191            stream_kind = "flat"
1192        );
1193        common_runtime::spawn_global(
1194            async move {
1195                loop {
1196                    // We release the permit before sending result to avoid the task waiting on
1197                    // the channel with the permit held.
1198                    let maybe_batch = {
1199                        // Safety: We never close the semaphore.
1200                        let _permit = semaphore.acquire().await.unwrap();
1201                        input.next().await
1202                    };
1203                    match maybe_batch {
1204                        Some(Ok(batch)) => {
1205                            let _ = sender.send(Ok(batch)).await;
1206                        }
1207                        Some(Err(e)) => {
1208                            let _ = sender.send(Err(e)).await;
1209                            break;
1210                        }
1211                        None => break,
1212                    }
1213                }
1214            }
1215            .instrument(span),
1216        );
1217    }
1218
1219    pub(crate) fn total_rows(&self) -> usize {
1220        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1221        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1222
1223        let rows = rows_in_files + rows_in_memtables;
1224        #[cfg(feature = "enterprise")]
1225        let rows = rows
1226            + self
1227                .extension_ranges
1228                .iter()
1229                .map(|x| x.num_rows())
1230                .sum::<u64>() as usize;
1231        rows
1232    }
1233
1234    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1235        &self.predicate
1236    }
1237
1238    /// Returns number of memtables to scan.
1239    pub(crate) fn num_memtables(&self) -> usize {
1240        self.memtables.len()
1241    }
1242
1243    /// Returns number of SST files to scan.
1244    pub(crate) fn num_files(&self) -> usize {
1245        self.files.len()
1246    }
1247
1248    /// Gets the file handle from a row group index.
1249    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1250        let file_index = index.index - self.num_memtables();
1251        &self.files[file_index]
1252    }
1253
1254    pub fn region_metadata(&self) -> &RegionMetadataRef {
1255        self.mapper.metadata()
1256    }
1257
1258    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1259        if source_count <= 1 {
1260            // Duplicated rows in the same source is not a normal case and we don't provide
1261            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1262            // are exactly identical in the same source so we use PreFilterMode::All for
1263            // performance reason.
1264            return PreFilterMode::All;
1265        }
1266
1267        pre_filter_mode(self.append_mode, self.merge_mode)
1268    }
1269}
1270
1271#[cfg(feature = "enterprise")]
1272impl ScanInput {
1273    #[must_use]
1274    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1275        Self {
1276            extension_ranges,
1277            ..self
1278        }
1279    }
1280
1281    #[cfg(feature = "enterprise")]
1282    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1283        &self.extension_ranges
1284    }
1285
1286    /// Get a boxed [ExtensionRange] by the index in all ranges.
1287    #[cfg(feature = "enterprise")]
1288    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1289        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1290    }
1291}
1292
1293#[cfg(test)]
1294impl ScanInput {
1295    /// Returns SST file ids to scan.
1296    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1297        self.files.iter().map(|file| file.file_id()).collect()
1298    }
1299
1300    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1301        self.files.iter().map(|file| file.index_id()).collect()
1302    }
1303}
1304
1305fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1306    if append_mode {
1307        return PreFilterMode::All;
1308    }
1309
1310    match merge_mode {
1311        MergeMode::LastRow => PreFilterMode::SkipFields,
1312        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1313    }
1314}
1315
1316/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
1317/// implied time range used to decide whether the cache key can drop the time
1318/// predicates for a given partition (see `build_range_cache_key`).
1319pub(crate) struct ScanFingerprintBundle {
1320    pub(crate) fingerprint: ScanRequestFingerprint,
1321    /// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the
1322    /// column's `TimeUnit`).
1323    /// `None`    = at least one time-only predicate could not be proven (e.g.
1324    /// `OR`), so the cache-key optimization is disabled for this scan.
1325    pub(crate) implied_time_range: Option<TimestampRange>,
1326}
1327
1328/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible
1329/// for partition range caching.
1330pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1331    let eligible = !input.compaction
1332        && !input.files.is_empty()
1333        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1334
1335    if !eligible {
1336        return None;
1337    }
1338
1339    let metadata = input.region_metadata();
1340    let tag_names: HashSet<&str> = metadata
1341        .column_metadatas
1342        .iter()
1343        .filter(|col| col.semantic_type == SemanticType::Tag)
1344        .map(|col| col.column_schema.name.as_str())
1345        .collect();
1346
1347    let time_index = metadata.time_index_column();
1348    let time_index_name = time_index.column_schema.name.clone();
1349    let ts_col_unit = time_index
1350        .column_schema
1351        .data_type
1352        .as_timestamp()
1353        .expect("Time index must have timestamp-compatible type")
1354        .unit();
1355
1356    let exprs = input
1357        .predicate_group()
1358        .predicate_without_region()
1359        .map(|predicate| predicate.exprs())
1360        .unwrap_or_default();
1361
1362    let mut filters = Vec::new();
1363    let mut time_only_exprs: Vec<&Expr> = Vec::new();
1364    let mut has_tag_filter = false;
1365    let mut columns = HashSet::new();
1366
1367    for expr in exprs {
1368        columns.clear();
1369        let is_time_only = match expr_to_columns(expr, &mut columns) {
1370            Ok(()) if !columns.is_empty() => {
1371                has_tag_filter |= columns
1372                    .iter()
1373                    .any(|col| tag_names.contains(col.name.as_str()));
1374                columns.iter().all(|col| col.name == time_index_name)
1375            }
1376            _ => false,
1377        };
1378
1379        // Route time-only exprs that the legacy extractor recognizes into
1380        // `time_only_exprs` so the implication walker
1381        // (`implied_time_range_from_exprs`, called below) can attempt to drop
1382        // them from the cache key when the partition's `FileTimeRange` is fully
1383        // covered, then stringify them into the fingerprint's `time_filters`
1384        // bucket. Time-only exprs that the extractor doesn't recognize stay in
1385        // `filters` and never get stripped — conservatively correct.
1386        if is_time_only
1387            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1388        {
1389            time_only_exprs.push(expr);
1390        } else {
1391            filters.push(expr.to_string());
1392        }
1393    }
1394
1395    if !has_tag_filter {
1396        // We only cache requests that have tag filters to avoid caching all series.
1397        return None;
1398    }
1399
1400    let implied_time_range =
1401        implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1402    let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1403
1404    // Ensure the filters are sorted for consistent fingerprinting.
1405    filters.sort_unstable();
1406    time_filters.sort_unstable();
1407    let read_columns = input.read_cols.clone();
1408    let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1409        read_column_types: read_columns
1410            .column_ids_iter()
1411            .map(|id| {
1412                metadata
1413                    .column_by_id(id)
1414                    .map(|col| col.column_schema.data_type.clone())
1415            })
1416            .collect(),
1417        read_columns,
1418        filters,
1419        time_filters,
1420        series_row_selector: input.series_row_selector,
1421        append_mode: input.append_mode,
1422        filter_deleted: input.filter_deleted,
1423        merge_mode: input.merge_mode,
1424        partition_expr_version: metadata.partition_expr_version,
1425    }
1426    .build();
1427
1428    Some(ScanFingerprintBundle {
1429        fingerprint,
1430        implied_time_range,
1431    })
1432}
1433
1434/// Context shared by different streams from a scanner.
1435/// It contains the input and ranges to scan.
1436pub struct StreamContext {
1437    /// Input memtables and files.
1438    pub input: ScanInput,
1439    /// Metadata for partition ranges.
1440    pub(crate) ranges: Vec<RangeMeta>,
1441    /// Precomputed scan fingerprint for partition range caching.
1442    /// `None` when the scan is not eligible for caching.
1443    #[allow(dead_code)]
1444    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1445    /// Implied range of every time-only predicate, in the time index column's
1446    /// `TimeUnit`. Used by `build_range_cache_key` to decide whether the
1447    /// partition's `FileTimeRange` is fully covered (allowing `time_filters`
1448    /// to be stripped from the cache key). `None` when caching is ineligible
1449    /// or when the implication walker bailed on an unsupported shape (e.g.
1450    /// `OR`).
1451    pub(crate) scan_implied_time_range: Option<TimestampRange>,
1452
1453    // Metrics:
1454    /// The start time of the query.
1455    pub(crate) query_start: Instant,
1456}
1457
1458impl StreamContext {
1459    /// Creates a new [StreamContext] for [SeqScan].
1460    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1461        let query_start = input.query_start.unwrap_or_else(Instant::now);
1462        let ranges = RangeMeta::seq_scan_ranges(&input);
1463        READ_SST_COUNT.observe(input.num_files() as f64);
1464        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1465            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1466            None => (None, None),
1467        };
1468
1469        Self {
1470            input,
1471            ranges,
1472            scan_fingerprint,
1473            scan_implied_time_range,
1474            query_start,
1475        }
1476    }
1477
1478    /// Creates a new [StreamContext] for [UnorderedScan].
1479    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1480        let query_start = input.query_start.unwrap_or_else(Instant::now);
1481        let ranges = RangeMeta::unordered_scan_ranges(&input);
1482        READ_SST_COUNT.observe(input.num_files() as f64);
1483        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1484            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1485            None => (None, None),
1486        };
1487
1488        Self {
1489            input,
1490            ranges,
1491            scan_fingerprint,
1492            scan_implied_time_range,
1493            query_start,
1494        }
1495    }
1496
1497    /// Returns true if the index refers to a memtable.
1498    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1499        self.input.num_memtables() > index.index
1500    }
1501
1502    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1503        !self.is_mem_range_index(index)
1504            && index.index < self.input.num_files() + self.input.num_memtables()
1505    }
1506
1507    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1508        let range_meta = &self.ranges[part_range.identifier];
1509        let source_count = range_meta.indices.len();
1510
1511        self.input.range_pre_filter_mode(source_count)
1512    }
1513
1514    /// Retrieves the partition ranges.
1515    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1516        self.ranges
1517            .iter()
1518            .enumerate()
1519            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1520            .collect()
1521    }
1522
1523    /// Format the context for explain.
1524    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1525        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1526        for range_meta in &self.ranges {
1527            for idx in &range_meta.row_group_indices {
1528                if self.is_mem_range_index(*idx) {
1529                    num_mem_ranges += 1;
1530                } else if self.is_file_range_index(*idx) {
1531                    num_file_ranges += 1;
1532                } else {
1533                    num_other_ranges += 1;
1534                }
1535            }
1536        }
1537        if verbose {
1538            write!(f, "{{")?;
1539        }
1540        write!(
1541            f,
1542            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1543            self.ranges.len(),
1544            num_mem_ranges,
1545            self.input.num_files(),
1546            num_file_ranges,
1547        )?;
1548        if num_other_ranges > 0 {
1549            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1550        }
1551        write!(f, "}}")?;
1552
1553        if let Some(selector) = &self.input.series_row_selector {
1554            write!(f, ", \"selector\":\"{}\"", selector)?;
1555        }
1556        if let Some(distribution) = &self.input.distribution {
1557            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1558        }
1559
1560        if verbose {
1561            self.format_verbose_content(f)?;
1562        }
1563
1564        Ok(())
1565    }
1566
1567    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1568        struct FileWrapper<'a> {
1569            file: &'a FileHandle,
1570        }
1571
1572        impl fmt::Debug for FileWrapper<'_> {
1573            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1574                let (start, end) = self.file.time_range();
1575                write!(
1576                    f,
1577                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1578                    self.file.file_id(),
1579                    start.value(),
1580                    start.unit(),
1581                    end.value(),
1582                    end.unit(),
1583                    self.file.num_rows(),
1584                    self.file.size(),
1585                    self.file.index_size()
1586                )
1587            }
1588        }
1589
1590        struct InputWrapper<'a> {
1591            input: &'a ScanInput,
1592        }
1593
1594        #[cfg(feature = "enterprise")]
1595        impl InputWrapper<'_> {
1596            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1597                if self.input.extension_ranges.is_empty() {
1598                    return Ok(());
1599                }
1600
1601                let mut delimiter = "";
1602                write!(f, ", extension_ranges: [")?;
1603                for range in self.input.extension_ranges() {
1604                    write!(f, "{}{:?}", delimiter, range)?;
1605                    delimiter = ", ";
1606                }
1607                write!(f, "]")?;
1608                Ok(())
1609            }
1610        }
1611
1612        impl fmt::Debug for InputWrapper<'_> {
1613            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1614                let output_schema = self.input.mapper.output_schema();
1615                if !output_schema.is_empty() {
1616                    let names: Vec<_> = output_schema
1617                        .column_schemas()
1618                        .iter()
1619                        .map(|col| &col.name)
1620                        .collect();
1621                    write!(f, ", \"projection\": {:?}", names)?;
1622                }
1623                if let Some(predicate) = &self.input.predicate.predicate() {
1624                    if !predicate.exprs().is_empty() {
1625                        let exprs: Vec<_> =
1626                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1627                        write!(f, ", \"filters\": {:?}", exprs)?;
1628                    }
1629                    if !predicate.dyn_filters().is_empty() {
1630                        let dyn_filters: Vec<_> = predicate
1631                            .dyn_filters()
1632                            .iter()
1633                            .map(|f| format!("{}", f))
1634                            .collect();
1635                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1636                    }
1637                }
1638                #[cfg(feature = "vector_index")]
1639                if let Some(vector_index_k) = self.input.vector_index_k {
1640                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1641                }
1642                if !self.input.files.is_empty() {
1643                    write!(f, ", \"files\": ")?;
1644                    f.debug_list()
1645                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1646                        .finish()?;
1647                }
1648                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1649                #[cfg(feature = "enterprise")]
1650                self.format_extension_ranges(f)?;
1651
1652                Ok(())
1653            }
1654        }
1655
1656        write!(f, "{:?}", InputWrapper { input: &self.input })
1657    }
1658
1659    /// Add new dynamic filters to the predicates.
1660    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1661    pub(crate) fn add_dyn_filter_to_predicate(
1662        self: &Arc<Self>,
1663        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1664    ) -> Vec<bool> {
1665        let mut supported = Vec::with_capacity(filter_exprs.len());
1666        let filter_expr = filter_exprs
1667            .into_iter()
1668            .filter_map(|expr| {
1669                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1670                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1671            {
1672                supported.push(true);
1673                Some(dyn_filter)
1674            } else {
1675                supported.push(false);
1676                None
1677            }
1678            })
1679            .collect();
1680        self.input.predicate.add_dyn_filters(filter_expr);
1681        supported
1682    }
1683}
1684
1685/// Predicates to evaluate.
1686/// It only keeps filters that [SimpleFilterEvaluator] supports.
1687#[derive(Clone, Default)]
1688pub struct PredicateGroup {
1689    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1690    /// Predicate that includes request filters and region partition expr (if any).
1691    predicate_all: Predicate,
1692    /// Predicate that only includes request filters.
1693    predicate_without_region: Predicate,
1694    /// Region partition expression restored from metadata.
1695    region_partition_expr: Option<PartitionExpr>,
1696}
1697
1698impl PredicateGroup {
1699    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1700    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1701        let mut combined_exprs = exprs.to_vec();
1702        let mut region_partition_expr = None;
1703
1704        if let Some(expr_json) = metadata.partition_expr.as_ref()
1705            && !expr_json.is_empty()
1706            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1707                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1708        {
1709            let logical_expr = expr
1710                .try_as_logical_expr()
1711                .context(InvalidPartitionExprSnafu {
1712                    expr: expr_json.clone(),
1713                })?;
1714
1715            combined_exprs.push(logical_expr);
1716            region_partition_expr = Some(expr);
1717        }
1718
1719        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1720        // Columns in the expr.
1721        let mut columns = HashSet::new();
1722        for expr in &combined_exprs {
1723            columns.clear();
1724            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1725                continue;
1726            };
1727            time_filters.push(filter);
1728        }
1729        let time_filters = if time_filters.is_empty() {
1730            None
1731        } else {
1732            Some(Arc::new(time_filters))
1733        };
1734
1735        let predicate_all = Predicate::new(combined_exprs);
1736        let predicate_without_region = Predicate::new(exprs.to_vec());
1737
1738        Ok(Self {
1739            time_filters,
1740            predicate_all,
1741            predicate_without_region,
1742            region_partition_expr,
1743        })
1744    }
1745
1746    /// Returns time filters.
1747    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1748        self.time_filters.clone()
1749    }
1750
1751    /// Returns predicate of all exprs (including region partition expr if present).
1752    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1753        if self.predicate_all.is_empty() {
1754            None
1755        } else {
1756            Some(&self.predicate_all)
1757        }
1758    }
1759
1760    /// Returns predicate that excludes region partition expr.
1761    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1762        if self.predicate_without_region.is_empty() {
1763            None
1764        } else {
1765            Some(&self.predicate_without_region)
1766        }
1767    }
1768
1769    /// Add dynamic filters in the predicates.
1770    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1771        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1772        self.predicate_without_region.add_dyn_filters(dyn_filters);
1773    }
1774
1775    /// Returns the region partition expr from metadata, if any.
1776    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1777        self.region_partition_expr.as_ref()
1778    }
1779
1780    fn expr_to_filter(
1781        expr: &Expr,
1782        metadata: &RegionMetadata,
1783        columns: &mut HashSet<Column>,
1784    ) -> Option<SimpleFilterEvaluator> {
1785        columns.clear();
1786        // `expr_to_columns` won't return error.
1787        // We still ignore these expressions for safety.
1788        expr_to_columns(expr, columns).ok()?;
1789        if columns.len() > 1 {
1790            // Simple filter doesn't support multiple columns.
1791            return None;
1792        }
1793        let column = columns.iter().next()?;
1794        let column_meta = metadata.column_by_name(&column.name)?;
1795        if column_meta.semantic_type == SemanticType::Timestamp {
1796            SimpleFilterEvaluator::try_new(expr)
1797        } else {
1798            None
1799        }
1800    }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805    use std::sync::Arc;
1806
1807    use datafusion::physical_plan::expressions::lit as physical_lit;
1808    use datafusion_common::ScalarValue;
1809    use datafusion_expr::{col, lit};
1810    use datatypes::value::Value;
1811    use partition::expr::col as partition_col;
1812    use store_api::metadata::RegionMetadataBuilder;
1813    use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
1814
1815    use super::*;
1816    use crate::cache::CacheManager;
1817    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1818    use crate::test_util::memtable_util::metadata_with_primary_key;
1819    use crate::test_util::scheduler_util::SchedulerEnv;
1820
1821    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1822        let env = SchedulerEnv::new().await;
1823        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1824        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1825        let file = FileHandle::new(
1826            crate::sst::file::FileMeta::default(),
1827            Arc::new(crate::sst::file_purger::NoopFilePurger),
1828        );
1829
1830        ScanInput::new(env.access_layer.clone(), mapper)
1831            .with_predicate(predicate)
1832            .with_cache(CacheStrategy::EnableAll(Arc::new(
1833                CacheManager::builder()
1834                    .range_result_cache_size(1024)
1835                    .build(),
1836            )))
1837            .with_files(vec![file])
1838    }
1839
1840    /// Helper to create a timestamp millisecond literal.
1841    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1842        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1843    }
1844
1845    #[tokio::test]
1846    async fn test_build_scan_fingerprint_for_eligible_scan() {
1847        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1848        let input = new_scan_input(
1849            metadata.clone(),
1850            vec![
1851                col("ts").gt_eq(ts_lit(1000)),
1852                col("k0").eq(lit("foo")),
1853                col("v0").gt(lit(1)),
1854            ],
1855        )
1856        .await
1857        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1858        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1859        .with_merge_mode(MergeMode::LastNonNull)
1860        .with_filter_deleted(false);
1861
1862        let fingerprint = build_scan_fingerprint(&input).unwrap();
1863
1864        let expected = ScanRequestFingerprintBuilder {
1865            read_columns: input.read_cols,
1866            read_column_types: vec![
1867                metadata
1868                    .column_by_id(0)
1869                    .map(|col| col.column_schema.data_type.clone()),
1870                metadata
1871                    .column_by_id(2)
1872                    .map(|col| col.column_schema.data_type.clone()),
1873                metadata
1874                    .column_by_id(3)
1875                    .map(|col| col.column_schema.data_type.clone()),
1876            ],
1877            filters: vec![
1878                col("k0").eq(lit("foo")).to_string(),
1879                col("v0").gt(lit(1)).to_string(),
1880            ],
1881            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1882            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1883            append_mode: false,
1884            filter_deleted: false,
1885            merge_mode: MergeMode::LastNonNull,
1886            partition_expr_version: 0,
1887        }
1888        .build();
1889        assert_eq!(expected, fingerprint.fingerprint);
1890    }
1891
1892    #[tokio::test]
1893    async fn test_build_scan_fingerprint_requires_tag_filter() {
1894        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1895        let input = new_scan_input(
1896            metadata,
1897            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1898        )
1899        .await;
1900
1901        assert!(build_scan_fingerprint(&input).is_none());
1902    }
1903
1904    #[tokio::test]
1905    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1906        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1907        let filters = vec![col("k0").eq(lit("foo"))];
1908
1909        let disabled = ScanInput::new(
1910            SchedulerEnv::new().await.access_layer.clone(),
1911            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1912        )
1913        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1914        assert!(build_scan_fingerprint(&disabled).is_none());
1915
1916        let compaction = new_scan_input(metadata.clone(), filters.clone())
1917            .await
1918            .with_compaction(true);
1919        assert!(build_scan_fingerprint(&compaction).is_none());
1920
1921        // No files to read.
1922        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1923        assert!(build_scan_fingerprint(&no_files).is_none());
1924    }
1925
1926    #[tokio::test]
1927    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1928        let base = metadata_with_primary_key(vec![0, 1], false);
1929        let mut builder = RegionMetadataBuilder::from_existing(base);
1930        let partition_expr = partition_col("k0")
1931            .gt_eq(Value::String("foo".into()))
1932            .as_json_str()
1933            .unwrap();
1934        builder.partition_expr_json(Some(partition_expr));
1935        let metadata = Arc::new(builder.build_without_validation().unwrap());
1936
1937        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1938        let fingerprint = build_scan_fingerprint(&input).unwrap();
1939
1940        let expected = ScanRequestFingerprintBuilder {
1941            read_columns: input.read_cols,
1942            read_column_types: vec![
1943                metadata
1944                    .column_by_id(0)
1945                    .map(|col| col.column_schema.data_type.clone()),
1946                metadata
1947                    .column_by_id(2)
1948                    .map(|col| col.column_schema.data_type.clone()),
1949                metadata
1950                    .column_by_id(3)
1951                    .map(|col| col.column_schema.data_type.clone()),
1952            ],
1953            filters: vec![col("k0").eq(lit("foo")).to_string()],
1954            time_filters: vec![],
1955            series_row_selector: None,
1956            append_mode: false,
1957            filter_deleted: true,
1958            merge_mode: MergeMode::LastRow,
1959            partition_expr_version: metadata.partition_expr_version,
1960        }
1961        .build();
1962        assert_eq!(expected, fingerprint.fingerprint);
1963        assert_ne!(0, metadata.partition_expr_version);
1964    }
1965
1966    #[test]
1967    fn test_update_dyn_filters_with_empty_base_predicates() {
1968        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1969        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
1970        assert!(predicate_group.predicate().is_none());
1971        assert!(predicate_group.predicate_without_region().is_none());
1972
1973        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
1974        predicate_group.add_dyn_filters(vec![dyn_filter]);
1975
1976        let predicate_all = predicate_group.predicate().unwrap();
1977        assert!(predicate_all.exprs().is_empty());
1978        assert_eq!(1, predicate_all.dyn_filters().len());
1979
1980        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
1981        assert!(predicate_without_region.exprs().is_empty());
1982        assert_eq!(1, predicate_without_region.dyn_filters().len());
1983    }
1984
1985    #[tokio::test]
1986    async fn test_range_pre_filter_mode() {
1987        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1988        let cases = [
1989            (true, MergeMode::LastRow, 1, PreFilterMode::All),
1990            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
1991            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
1992            (true, MergeMode::LastRow, 2, PreFilterMode::All),
1993        ];
1994
1995        for (append_mode, merge_mode, source_count, expected_mode) in cases {
1996            let input = new_scan_input(metadata.clone(), vec![])
1997                .await
1998                .with_append_mode(append_mode)
1999                .with_merge_mode(merge_mode);
2000
2001            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2002        }
2003    }
2004}