Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41    ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
42};
43use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
44use tokio::sync::{Semaphore, mpsc};
45use tokio_stream::wrappers::ReceiverStream;
46
47use crate::access_layer::AccessLayerRef;
48use crate::cache::CacheStrategy;
49use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
50use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
51#[cfg(feature = "enterprise")]
52use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
53use crate::memtable::{MemtableRange, RangesOptions};
54use crate::metrics::READ_SST_COUNT;
55use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
56use crate::read::projection::ProjectionMapper;
57use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
58use crate::read::range_cache::ScanRequestFingerprint;
59use crate::read::seq_scan::SeqScan;
60use crate::read::series_scan::SeriesScan;
61use crate::read::stream::ScanBatchStream;
62use crate::read::unordered_scan::UnorderedScan;
63use crate::read::{BoxedRecordBatchStream, RecordBatch};
64use crate::region::options::MergeMode;
65use crate::region::version::VersionRef;
66use crate::sst::file::FileHandle;
67use crate::sst::index::bloom_filter::applier::{
68    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
69};
70use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
71use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
72use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
73use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
74#[cfg(feature = "vector_index")]
75use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
76use crate::sst::parquet::file_range::PreFilterMode;
77use crate::sst::parquet::reader::ReaderMetrics;
78
79#[cfg(feature = "vector_index")]
80const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
81
82/// A scanner scans a region and returns a [SendableRecordBatchStream].
83pub(crate) enum Scanner {
84    /// Sequential scan.
85    Seq(SeqScan),
86    /// Unordered scan.
87    Unordered(UnorderedScan),
88    /// Per-series scan.
89    Series(SeriesScan),
90}
91
92impl Scanner {
93    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
94    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
95    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96        match self {
97            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
98            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
99            Scanner::Series(series_scan) => series_scan.build_stream().await,
100        }
101    }
102
103    /// Create a stream of [`Batch`] by this scanner.
104    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
105        match self {
106            Scanner::Seq(x) => x.scan_all_partitions(),
107            Scanner::Unordered(x) => x.scan_all_partitions(),
108            Scanner::Series(x) => x.scan_all_partitions(),
109        }
110    }
111}
112
113#[cfg(test)]
114impl Scanner {
115    /// Returns number of files to scan.
116    pub(crate) fn num_files(&self) -> usize {
117        match self {
118            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
119            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
120            Scanner::Series(series_scan) => series_scan.input().num_files(),
121        }
122    }
123
124    /// Returns number of memtables to scan.
125    pub(crate) fn num_memtables(&self) -> usize {
126        match self {
127            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
128            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
129            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
130        }
131    }
132
133    /// Returns SST file ids to scan.
134    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
135        match self {
136            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
137            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
138            Scanner::Series(series_scan) => series_scan.input().file_ids(),
139        }
140    }
141
142    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
143        match self {
144            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
145            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
146            Scanner::Series(series_scan) => series_scan.input().index_ids(),
147        }
148    }
149
150    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
151    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
152        use store_api::region_engine::{PrepareRequest, RegionScanner};
153
154        let request = PrepareRequest::default().with_target_partitions(target_partitions);
155        match self {
156            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
157            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
158            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
159        }
160    }
161}
162
163#[cfg_attr(doc, aquamarine::aquamarine)]
164/// Helper to scans a region by [ScanRequest].
165///
166/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
167/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
168///
169/// ```mermaid
170/// classDiagram
171/// class ScanRegion {
172///     -VersionRef version
173///     -ScanRequest request
174///     ~scanner() Scanner
175///     ~seq_scan() SeqScan
176/// }
177/// class Scanner {
178///     <<enumeration>>
179///     SeqScan
180///     UnorderedScan
181///     +scan() SendableRecordBatchStream
182/// }
183/// class SeqScan {
184///     -ScanInput input
185///     +build() SendableRecordBatchStream
186/// }
187/// class UnorderedScan {
188///     -ScanInput input
189///     +build() SendableRecordBatchStream
190/// }
191/// class ScanInput {
192///     -ProjectionMapper mapper
193///     -Option~TimeRange~ time_range
194///     -Option~Predicate~ predicate
195///     -Vec~MemtableRef~ memtables
196///     -Vec~FileHandle~ files
197/// }
198/// class ProjectionMapper {
199///     ~output_schema() SchemaRef
200///     ~convert(Batch) RecordBatch
201/// }
202/// ScanRegion -- Scanner
203/// ScanRegion o-- ScanRequest
204/// Scanner o-- SeqScan
205/// Scanner o-- UnorderedScan
206/// SeqScan o-- ScanInput
207/// UnorderedScan o-- ScanInput
208/// Scanner -- SendableRecordBatchStream
209/// ScanInput o-- ProjectionMapper
210/// SeqScan -- SendableRecordBatchStream
211/// UnorderedScan -- SendableRecordBatchStream
212/// ```
213pub(crate) struct ScanRegion {
214    /// Version of the region at scan.
215    version: VersionRef,
216    /// Access layer of the region.
217    access_layer: AccessLayerRef,
218    /// Scan request.
219    request: ScanRequest,
220    /// Cache.
221    cache_strategy: CacheStrategy,
222    /// Maximum number of SST files to scan concurrently.
223    max_concurrent_scan_files: usize,
224    /// Whether to ignore inverted index.
225    ignore_inverted_index: bool,
226    /// Whether to ignore fulltext index.
227    ignore_fulltext_index: bool,
228    /// Whether to ignore bloom filter.
229    ignore_bloom_filter: bool,
230    /// Start time of the scan task.
231    start_time: Option<Instant>,
232    /// Whether to filter out the deleted rows.
233    /// Usually true for normal read, and false for scan for compaction.
234    filter_deleted: bool,
235    #[cfg(feature = "enterprise")]
236    extension_range_provider: Option<BoxedExtensionRangeProvider>,
237}
238
239impl ScanRegion {
240    /// Creates a [ScanRegion].
241    pub(crate) fn new(
242        version: VersionRef,
243        access_layer: AccessLayerRef,
244        request: ScanRequest,
245        cache_strategy: CacheStrategy,
246    ) -> ScanRegion {
247        ScanRegion {
248            version,
249            access_layer,
250            request,
251            cache_strategy,
252            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
253            ignore_inverted_index: false,
254            ignore_fulltext_index: false,
255            ignore_bloom_filter: false,
256            start_time: None,
257            filter_deleted: true,
258            #[cfg(feature = "enterprise")]
259            extension_range_provider: None,
260        }
261    }
262
263    /// Sets maximum number of SST files to scan concurrently.
264    #[must_use]
265    pub(crate) fn with_max_concurrent_scan_files(
266        mut self,
267        max_concurrent_scan_files: usize,
268    ) -> Self {
269        self.max_concurrent_scan_files = max_concurrent_scan_files;
270        self
271    }
272
273    /// Sets whether to ignore inverted index.
274    #[must_use]
275    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
276        self.ignore_inverted_index = ignore;
277        self
278    }
279
280    /// Sets whether to ignore fulltext index.
281    #[must_use]
282    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
283        self.ignore_fulltext_index = ignore;
284        self
285    }
286
287    /// Sets whether to ignore bloom filter.
288    #[must_use]
289    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
290        self.ignore_bloom_filter = ignore;
291        self
292    }
293
294    #[must_use]
295    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
296        self.start_time = Some(now);
297        self
298    }
299
300    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
301        self.filter_deleted = filter_deleted;
302    }
303
304    #[cfg(feature = "enterprise")]
305    pub(crate) fn set_extension_range_provider(
306        &mut self,
307        extension_range_provider: BoxedExtensionRangeProvider,
308    ) {
309        self.extension_range_provider = Some(extension_range_provider);
310    }
311
312    /// Returns a [Scanner] to scan the region.
313    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
314    pub(crate) async fn scanner(self) -> Result<Scanner> {
315        if self.use_series_scan() {
316            self.series_scan().await.map(Scanner::Series)
317        } else if self.use_unordered_scan() {
318            // If table is append only and there is no series row selector, we use unordered scan in query.
319            // We still use seq scan in compaction.
320            self.unordered_scan().await.map(Scanner::Unordered)
321        } else {
322            self.seq_scan().await.map(Scanner::Seq)
323        }
324    }
325
326    /// Returns a [RegionScanner] to scan the region.
327    #[tracing::instrument(
328        level = tracing::Level::DEBUG,
329        skip_all,
330        fields(region_id = %self.region_id())
331    )]
332    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
333        if self.use_series_scan() {
334            self.series_scan()
335                .await
336                .map(|scanner| Box::new(scanner) as _)
337        } else if self.use_unordered_scan() {
338            self.unordered_scan()
339                .await
340                .map(|scanner| Box::new(scanner) as _)
341        } else {
342            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
343        }
344    }
345
346    /// Scan sequentially.
347    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
348    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
349        let input = self.scan_input().await?.with_compaction(false);
350        Ok(SeqScan::new(input))
351    }
352
353    /// Unordered scan.
354    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
355    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
356        let input = self.scan_input().await?;
357        Ok(UnorderedScan::new(input))
358    }
359
360    /// Scans by series.
361    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
363        let input = self.scan_input().await?;
364        Ok(SeriesScan::new(input))
365    }
366
367    /// Returns true if the region can use unordered scan for current request.
368    fn use_unordered_scan(&self) -> bool {
369        // We use unordered scan when:
370        // 1. The region is in append mode.
371        // 2. There is no series row selector.
372        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
373        //
374        // We still use seq scan in compaction.
375        self.version.options.append_mode
376            && self.request.series_row_selector.is_none()
377            && (self.request.distribution.is_none()
378                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
379    }
380
381    /// Returns true if the region can use series scan for current request.
382    fn use_series_scan(&self) -> bool {
383        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
384    }
385
386    /// Creates a scan input.
387    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
388    async fn scan_input(self) -> Result<ScanInput> {
389        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
390        let time_range = self.build_time_range_predicate();
391        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
392
393        let read_column_ids = match self.request.projection_indices() {
394            Some(p) => self.build_read_column_ids(p, &predicate)?,
395            None => self
396                .version
397                .metadata
398                .column_metadatas
399                .iter()
400                .map(|col| col.column_id)
401                .collect(),
402        };
403
404        // The mapper always computes projected column ids as the schema of SSTs may change.
405        let mapper = match self.request.projection_indices() {
406            Some(p) => ProjectionMapper::new_with_read_columns(
407                &self.version.metadata,
408                p.iter().copied(),
409                read_column_ids.clone(),
410            )?,
411            None => ProjectionMapper::all(&self.version.metadata)?,
412        };
413
414        let ssts = &self.version.ssts;
415        let mut files = Vec::new();
416        for level in ssts.levels() {
417            for file in level.files.values() {
418                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
419                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
420                    // If the file's sequence is None (or actually is zero), it could mean the file
421                    // is generated and added to the region "directly". In this case, its data should
422                    // be considered as fresh as the memtable. So its sequence is treated greater than
423                    // the min_sequence, whatever the value of min_sequence is. Hence the default
424                    // "true" in this arm.
425                    (Some(_), None) => true,
426                    (None, _) => true,
427                };
428
429                // Finds SST files in range.
430                if exceed_min_sequence && file_in_range(file, &time_range) {
431                    files.push(file.clone());
432                }
433                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
434                // unless the timing is too good, or the sequence number wouldn't be in file.
435                // and the batch will be filtered out by tree reader anyway.
436            }
437        }
438
439        let memtables = self.version.memtables.list_memtables();
440        // Skip empty memtables and memtables out of time range.
441        let mut mem_range_builders = Vec::new();
442        let filter_mode = pre_filter_mode(
443            self.version.options.append_mode,
444            self.version.options.merge_mode(),
445        );
446
447        for m in memtables {
448            // check if memtable is empty by reading stats.
449            let Some((start, end)) = m.stats().time_range() else {
450                continue;
451            };
452            // The time range of the memtable is inclusive.
453            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
454            if !memtable_range.intersects(&time_range) {
455                continue;
456            }
457            let ranges_in_memtable = m.ranges(
458                Some(read_column_ids.as_slice()),
459                RangesOptions::default()
460                    .with_predicate(predicate.clone())
461                    .with_sequence(SequenceRange::new(
462                        self.request.memtable_min_sequence,
463                        self.request.memtable_max_sequence,
464                    ))
465                    .with_pre_filter_mode(filter_mode),
466            )?;
467            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
468                let stats = v.stats().clone();
469                MemRangeBuilder::new(v, stats)
470            }));
471        }
472
473        let region_id = self.region_id();
474        debug!(
475            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
476            region_id,
477            self.request,
478            time_range,
479            mem_range_builders.len(),
480            files.len(),
481            self.version.options.append_mode,
482        );
483
484        let (non_field_filters, field_filters) = self.partition_by_field_filters();
485        let inverted_index_appliers = [
486            self.build_invereted_index_applier(&non_field_filters),
487            self.build_invereted_index_applier(&field_filters),
488        ];
489        let bloom_filter_appliers = [
490            self.build_bloom_filter_applier(&non_field_filters),
491            self.build_bloom_filter_applier(&field_filters),
492        ];
493        let fulltext_index_appliers = [
494            self.build_fulltext_index_applier(&non_field_filters),
495            self.build_fulltext_index_applier(&field_filters),
496        ];
497        #[cfg(feature = "vector_index")]
498        let vector_index_applier = self.build_vector_index_applier();
499        #[cfg(feature = "vector_index")]
500        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
501            if self.request.filters.is_empty() {
502                search.k
503            } else {
504                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
505            }
506        });
507
508        let input = ScanInput::new(self.access_layer, mapper)
509            .with_time_range(Some(time_range))
510            .with_predicate(predicate)
511            .with_memtables(mem_range_builders)
512            .with_files(files)
513            .with_cache(self.cache_strategy)
514            .with_inverted_index_appliers(inverted_index_appliers)
515            .with_bloom_filter_index_appliers(bloom_filter_appliers)
516            .with_fulltext_index_appliers(fulltext_index_appliers)
517            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
518            .with_start_time(self.start_time)
519            .with_append_mode(self.version.options.append_mode)
520            .with_filter_deleted(self.filter_deleted)
521            .with_merge_mode(self.version.options.merge_mode())
522            .with_series_row_selector(self.request.series_row_selector)
523            .with_distribution(self.request.distribution)
524            .with_explain_flat_format(
525                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
526            );
527        #[cfg(feature = "vector_index")]
528        let input = input
529            .with_vector_index_applier(vector_index_applier)
530            .with_vector_index_k(vector_index_k);
531
532        #[cfg(feature = "enterprise")]
533        let input = if let Some(provider) = self.extension_range_provider {
534            let ranges = provider
535                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
536                .await?;
537            debug!("Find extension ranges: {ranges:?}");
538            input.with_extension_ranges(ranges)
539        } else {
540            input
541        };
542        Ok(input)
543    }
544
545    fn region_id(&self) -> RegionId {
546        self.version.metadata.region_id
547    }
548
549    /// Build time range predicate from filters.
550    fn build_time_range_predicate(&self) -> TimestampRange {
551        let time_index = self.version.metadata.time_index_column();
552        let unit = time_index
553            .column_schema
554            .data_type
555            .as_timestamp()
556            .expect("Time index must have timestamp-compatible type")
557            .unit();
558        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
559    }
560
561    /// Return all columns id to read according to the projection and filters.
562    fn build_read_column_ids(
563        &self,
564        projection: &[usize],
565        predicate: &PredicateGroup,
566    ) -> Result<Vec<ColumnId>> {
567        let metadata = &self.version.metadata;
568        // use Vec for read_column_ids to keep the order of columns.
569        let mut read_column_ids = Vec::new();
570        let mut seen = HashSet::new();
571
572        for idx in projection {
573            let column =
574                metadata
575                    .column_metadatas
576                    .get(*idx)
577                    .with_context(|| InvalidRequestSnafu {
578                        region_id: metadata.region_id,
579                        reason: format!("projection index {} is out of bound", idx),
580                    })?;
581            seen.insert(column.column_id);
582            // keep the projection order
583            read_column_ids.push(column.column_id);
584        }
585
586        if projection.is_empty() {
587            let time_index = metadata.time_index_column().column_id;
588            if seen.insert(time_index) {
589                read_column_ids.push(time_index);
590            }
591        }
592
593        let mut extra_names = HashSet::new();
594        let mut columns = HashSet::new();
595
596        for expr in &self.request.filters {
597            columns.clear();
598            if expr_to_columns(expr, &mut columns).is_err() {
599                continue;
600            }
601            extra_names.extend(columns.iter().map(|column| column.name.clone()));
602        }
603
604        if let Some(expr) = predicate.region_partition_expr() {
605            expr.collect_column_names(&mut extra_names);
606        }
607
608        if !extra_names.is_empty() {
609            for column in &metadata.column_metadatas {
610                if extra_names.contains(column.column_schema.name.as_str())
611                    && !seen.contains(&column.column_id)
612                {
613                    read_column_ids.push(column.column_id);
614                }
615                extra_names.remove(column.column_schema.name.as_str());
616            }
617            if !extra_names.is_empty() {
618                warn!(
619                    "Some columns in filters are not found in region {}: {:?}",
620                    metadata.region_id, extra_names
621                );
622            }
623        }
624        Ok(read_column_ids)
625    }
626
627    /// Partitions filters into two groups: non-field filters and field filters.
628    /// Returns `(non_field_filters, field_filters)`.
629    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
630        let field_columns = self
631            .version
632            .metadata
633            .field_columns()
634            .map(|col| &col.column_schema.name)
635            .collect::<HashSet<_>>();
636
637        let mut columns = HashSet::new();
638
639        self.request.filters.iter().cloned().partition(|expr| {
640            columns.clear();
641            // `expr_to_columns` won't return error.
642            if expr_to_columns(expr, &mut columns).is_err() {
643                // If we can't extract columns, treat it as non-field filter
644                return true;
645            }
646            // Return true for non-field filters (partition puts true cases in first vec)
647            !columns
648                .iter()
649                .any(|column| field_columns.contains(&column.name))
650        })
651    }
652
653    /// Use the latest schema to build the inverted index applier.
654    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
655        if self.ignore_inverted_index {
656            return None;
657        }
658
659        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
660        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
661
662        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
663
664        InvertedIndexApplierBuilder::new(
665            self.access_layer.table_dir().to_string(),
666            self.access_layer.path_type(),
667            self.access_layer.object_store().clone(),
668            self.version.metadata.as_ref(),
669            self.version.metadata.inverted_indexed_column_ids(
670                self.version
671                    .options
672                    .index_options
673                    .inverted_index
674                    .ignore_column_ids
675                    .iter(),
676            ),
677            self.access_layer.puffin_manager_factory().clone(),
678        )
679        .with_file_cache(file_cache)
680        .with_inverted_index_cache(inverted_index_cache)
681        .with_puffin_metadata_cache(puffin_metadata_cache)
682        .build(filters)
683        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
684        .ok()
685        .flatten()
686        .map(Arc::new)
687    }
688
689    /// Use the latest schema to build the bloom filter index applier.
690    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
691        if self.ignore_bloom_filter {
692            return None;
693        }
694
695        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
696        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
697        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
698
699        BloomFilterIndexApplierBuilder::new(
700            self.access_layer.table_dir().to_string(),
701            self.access_layer.path_type(),
702            self.access_layer.object_store().clone(),
703            self.version.metadata.as_ref(),
704            self.access_layer.puffin_manager_factory().clone(),
705        )
706        .with_file_cache(file_cache)
707        .with_bloom_filter_index_cache(bloom_filter_index_cache)
708        .with_puffin_metadata_cache(puffin_metadata_cache)
709        .build(filters)
710        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
711        .ok()
712        .flatten()
713        .map(Arc::new)
714    }
715
716    /// Use the latest schema to build the fulltext index applier.
717    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
718        if self.ignore_fulltext_index {
719            return None;
720        }
721
722        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
723        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
724        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
725        FulltextIndexApplierBuilder::new(
726            self.access_layer.table_dir().to_string(),
727            self.access_layer.path_type(),
728            self.access_layer.object_store().clone(),
729            self.access_layer.puffin_manager_factory().clone(),
730            self.version.metadata.as_ref(),
731        )
732        .with_file_cache(file_cache)
733        .with_puffin_metadata_cache(puffin_metadata_cache)
734        .with_bloom_filter_cache(bloom_filter_index_cache)
735        .build(filters)
736        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
737        .ok()
738        .flatten()
739        .map(Arc::new)
740    }
741
742    /// Build the vector index applier from vector search request.
743    #[cfg(feature = "vector_index")]
744    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
745        let vector_search = self.request.vector_search.as_ref()?;
746
747        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
748        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
749        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
750
751        let applier = VectorIndexApplier::new(
752            self.access_layer.table_dir().to_string(),
753            self.access_layer.path_type(),
754            self.access_layer.object_store().clone(),
755            self.access_layer.puffin_manager_factory().clone(),
756            vector_search.column_id,
757            vector_search.query_vector.clone(),
758            vector_search.metric,
759        )
760        .with_file_cache(file_cache)
761        .with_puffin_metadata_cache(puffin_metadata_cache)
762        .with_vector_index_cache(vector_index_cache);
763
764        Some(Arc::new(applier))
765    }
766}
767
768/// Returns true if the time range of a SST `file` matches the `predicate`.
769fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
770    if predicate == &TimestampRange::min_to_max() {
771        return true;
772    }
773    // end timestamp of a SST is inclusive.
774    let (start, end) = file.time_range();
775    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
776    file_ts_range.intersects(predicate)
777}
778
779/// Common input for different scanners.
780pub struct ScanInput {
781    /// Region SST access layer.
782    access_layer: AccessLayerRef,
783    /// Maps projected Batches to RecordBatches.
784    pub(crate) mapper: Arc<ProjectionMapper>,
785    /// Column ids to read from memtables and SSTs.
786    /// Notice this is different from the columns in `mapper` which are projected columns.
787    /// But this read columns might also include non-projected columns needed for filtering.
788    pub(crate) read_column_ids: Vec<ColumnId>,
789    /// Time range filter for time index.
790    pub(crate) time_range: Option<TimestampRange>,
791    /// Predicate to push down.
792    pub(crate) predicate: PredicateGroup,
793    /// Region partition expr applied at read time.
794    region_partition_expr: Option<PartitionExpr>,
795    /// Memtable range builders for memtables in the time range..
796    pub(crate) memtables: Vec<MemRangeBuilder>,
797    /// Handles to SST files to scan.
798    pub(crate) files: Vec<FileHandle>,
799    /// Cache.
800    pub(crate) cache_strategy: CacheStrategy,
801    /// Ignores file not found error.
802    ignore_file_not_found: bool,
803    /// Maximum number of SST files to scan concurrently.
804    pub(crate) max_concurrent_scan_files: usize,
805    /// Index appliers.
806    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
807    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
808    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
809    /// Vector index applier for KNN search.
810    #[cfg(feature = "vector_index")]
811    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
812    /// Over-fetched k for vector index scan.
813    #[cfg(feature = "vector_index")]
814    pub(crate) vector_index_k: Option<usize>,
815    /// Start time of the query.
816    pub(crate) query_start: Option<Instant>,
817    /// The region is using append mode.
818    pub(crate) append_mode: bool,
819    /// Whether to remove deletion markers.
820    pub(crate) filter_deleted: bool,
821    /// Mode to merge duplicate rows.
822    pub(crate) merge_mode: MergeMode,
823    /// Hint to select rows from time series.
824    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
825    /// Hint for the required distribution of the scanner.
826    pub(crate) distribution: Option<TimeSeriesDistribution>,
827    /// Whether the region's configured SST format is flat.
828    explain_flat_format: bool,
829    /// Whether this scan is for compaction.
830    pub(crate) compaction: bool,
831    #[cfg(feature = "enterprise")]
832    extension_ranges: Vec<BoxedExtensionRange>,
833}
834
835impl ScanInput {
836    /// Creates a new [ScanInput].
837    #[must_use]
838    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
839        ScanInput {
840            access_layer,
841            read_column_ids: mapper.column_ids().to_vec(),
842            mapper: Arc::new(mapper),
843            time_range: None,
844            predicate: PredicateGroup::default(),
845            region_partition_expr: None,
846            memtables: Vec::new(),
847            files: Vec::new(),
848            cache_strategy: CacheStrategy::Disabled,
849            ignore_file_not_found: false,
850            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
851            inverted_index_appliers: [None, None],
852            bloom_filter_index_appliers: [None, None],
853            fulltext_index_appliers: [None, None],
854            #[cfg(feature = "vector_index")]
855            vector_index_applier: None,
856            #[cfg(feature = "vector_index")]
857            vector_index_k: None,
858            query_start: None,
859            append_mode: false,
860            filter_deleted: true,
861            merge_mode: MergeMode::default(),
862            series_row_selector: None,
863            distribution: None,
864            explain_flat_format: false,
865            compaction: false,
866            #[cfg(feature = "enterprise")]
867            extension_ranges: Vec::new(),
868        }
869    }
870
871    /// Sets time range filter for time index.
872    #[must_use]
873    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
874        self.time_range = time_range;
875        self
876    }
877
878    /// Sets predicate to push down.
879    #[must_use]
880    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
881        self.region_partition_expr = predicate.region_partition_expr().cloned();
882        self.predicate = predicate;
883        self
884    }
885
886    /// Sets memtable range builders.
887    #[must_use]
888    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
889        self.memtables = memtables;
890        self
891    }
892
893    /// Sets files to read.
894    #[must_use]
895    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
896        self.files = files;
897        self
898    }
899
900    /// Sets cache for this query.
901    #[must_use]
902    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
903        self.cache_strategy = cache;
904        self
905    }
906
907    /// Ignores file not found error.
908    #[must_use]
909    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
910        self.ignore_file_not_found = ignore;
911        self
912    }
913
914    /// Sets maximum number of SST files to scan concurrently.
915    #[must_use]
916    pub(crate) fn with_max_concurrent_scan_files(
917        mut self,
918        max_concurrent_scan_files: usize,
919    ) -> Self {
920        self.max_concurrent_scan_files = max_concurrent_scan_files;
921        self
922    }
923
924    /// Sets inverted index appliers.
925    #[must_use]
926    pub(crate) fn with_inverted_index_appliers(
927        mut self,
928        appliers: [Option<InvertedIndexApplierRef>; 2],
929    ) -> Self {
930        self.inverted_index_appliers = appliers;
931        self
932    }
933
934    /// Sets bloom filter appliers.
935    #[must_use]
936    pub(crate) fn with_bloom_filter_index_appliers(
937        mut self,
938        appliers: [Option<BloomFilterIndexApplierRef>; 2],
939    ) -> Self {
940        self.bloom_filter_index_appliers = appliers;
941        self
942    }
943
944    /// Sets fulltext index appliers.
945    #[must_use]
946    pub(crate) fn with_fulltext_index_appliers(
947        mut self,
948        appliers: [Option<FulltextIndexApplierRef>; 2],
949    ) -> Self {
950        self.fulltext_index_appliers = appliers;
951        self
952    }
953
954    /// Sets vector index applier for KNN search.
955    #[cfg(feature = "vector_index")]
956    #[must_use]
957    pub(crate) fn with_vector_index_applier(
958        mut self,
959        applier: Option<VectorIndexApplierRef>,
960    ) -> Self {
961        self.vector_index_applier = applier;
962        self
963    }
964
965    /// Sets over-fetched k for vector index scan.
966    #[cfg(feature = "vector_index")]
967    #[must_use]
968    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
969        self.vector_index_k = k;
970        self
971    }
972
973    /// Sets start time of the query.
974    #[must_use]
975    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
976        self.query_start = now;
977        self
978    }
979
980    #[must_use]
981    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
982        self.append_mode = is_append_mode;
983        self
984    }
985
986    /// Sets whether to remove deletion markers during scan.
987    #[must_use]
988    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
989        self.filter_deleted = filter_deleted;
990        self
991    }
992
993    /// Sets the merge mode.
994    #[must_use]
995    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
996        self.merge_mode = merge_mode;
997        self
998    }
999
1000    /// Sets the distribution hint.
1001    #[must_use]
1002    pub(crate) fn with_distribution(
1003        mut self,
1004        distribution: Option<TimeSeriesDistribution>,
1005    ) -> Self {
1006        self.distribution = distribution;
1007        self
1008    }
1009
1010    /// Sets whether the region's configured SST format is flat for explain output.
1011    #[must_use]
1012    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1013        self.explain_flat_format = explain_flat_format;
1014        self
1015    }
1016
1017    /// Sets the time series row selector.
1018    #[must_use]
1019    pub(crate) fn with_series_row_selector(
1020        mut self,
1021        series_row_selector: Option<TimeSeriesRowSelector>,
1022    ) -> Self {
1023        self.series_row_selector = series_row_selector;
1024        self
1025    }
1026
1027    /// Sets whether this scan is for compaction.
1028    #[must_use]
1029    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1030        self.compaction = compaction;
1031        self
1032    }
1033
1034    /// Builds memtable ranges to scan by `index`.
1035    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1036        let memtable = &self.memtables[index.index];
1037        let mut ranges = SmallVec::new();
1038        memtable.build_ranges(index.row_group_index, &mut ranges);
1039        ranges
1040    }
1041
1042    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1043        if self.should_skip_region_partition(file) {
1044            self.predicate.predicate_without_region().cloned()
1045        } else {
1046            self.predicate.predicate().cloned()
1047        }
1048    }
1049
1050    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1051        match (
1052            self.region_partition_expr.as_ref(),
1053            file.meta_ref().partition_expr.as_ref(),
1054        ) {
1055            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1056            _ => false,
1057        }
1058    }
1059
1060    /// Prunes a file to scan and returns the builder to build readers.
1061    #[tracing::instrument(
1062        skip_all,
1063        fields(
1064            region_id = %self.region_metadata().region_id,
1065            file_id = %file.file_id()
1066        )
1067    )]
1068    pub async fn prune_file(
1069        &self,
1070        file: &FileHandle,
1071        reader_metrics: &mut ReaderMetrics,
1072    ) -> Result<FileRangeBuilder> {
1073        let predicate = self.predicate_for_file(file);
1074        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1075        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1076        let reader = self
1077            .access_layer
1078            .read_sst(file.clone())
1079            .predicate(predicate)
1080            .projection(Some(self.read_column_ids.clone()))
1081            .cache(self.cache_strategy.clone())
1082            .inverted_index_appliers(self.inverted_index_appliers.clone())
1083            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1084            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1085        #[cfg(feature = "vector_index")]
1086        let reader = {
1087            let mut reader = reader;
1088            reader =
1089                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1090            reader
1091        };
1092        let res = reader
1093            .expected_metadata(Some(self.mapper.metadata().clone()))
1094            .compaction(self.compaction)
1095            .pre_filter_mode(filter_mode)
1096            .decode_primary_key_values(decode_pk_values)
1097            .build_reader_input(reader_metrics)
1098            .await;
1099        let read_input = match res {
1100            Ok(x) => x,
1101            Err(e) => {
1102                if e.is_object_not_found() && self.ignore_file_not_found {
1103                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1104                    return Ok(FileRangeBuilder::default());
1105                } else {
1106                    return Err(e);
1107                }
1108            }
1109        };
1110
1111        let Some((mut file_range_ctx, selection)) = read_input else {
1112            return Ok(FileRangeBuilder::default());
1113        };
1114
1115        let need_compat = !compat::has_same_columns_and_pk_encoding(
1116            self.mapper.metadata(),
1117            file_range_ctx.read_format().metadata(),
1118        );
1119        if need_compat {
1120            // They have different schema. We need to adapt the batch first so the
1121            // mapper can convert it.
1122            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1123                let mapper = self.mapper.as_flat().unwrap();
1124                FlatCompatBatch::try_new(
1125                    mapper,
1126                    flat_format.metadata(),
1127                    flat_format.format_projection(),
1128                    self.compaction,
1129                )?
1130                .map(CompatBatch::Flat)
1131            } else {
1132                let compact_batch = PrimaryKeyCompatBatch::new(
1133                    &self.mapper,
1134                    file_range_ctx.read_format().metadata().clone(),
1135                )?;
1136                Some(CompatBatch::PrimaryKey(compact_batch))
1137            };
1138            file_range_ctx.set_compat_batch(compat);
1139        }
1140        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1141    }
1142
1143    /// Scans flat sources (RecordBatch streams) in parallel.
1144    ///
1145    /// # Panics if the input doesn't allow parallel scan.
1146    #[tracing::instrument(
1147        skip(self, sources, semaphore),
1148        fields(
1149            region_id = %self.region_metadata().region_id,
1150            source_count = sources.len()
1151        )
1152    )]
1153    pub(crate) fn create_parallel_flat_sources(
1154        &self,
1155        sources: Vec<BoxedRecordBatchStream>,
1156        semaphore: Arc<Semaphore>,
1157        channel_size: usize,
1158    ) -> Result<Vec<BoxedRecordBatchStream>> {
1159        if sources.len() <= 1 {
1160            return Ok(sources);
1161        }
1162
1163        // Spawn a task for each source.
1164        let sources = sources
1165            .into_iter()
1166            .map(|source| {
1167                let (sender, receiver) = mpsc::channel(channel_size);
1168                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1169                let stream = Box::pin(ReceiverStream::new(receiver));
1170                Box::pin(stream) as _
1171            })
1172            .collect();
1173        Ok(sources)
1174    }
1175
1176    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1177    #[tracing::instrument(
1178        skip(self, input, semaphore, sender),
1179        fields(region_id = %self.region_metadata().region_id)
1180    )]
1181    pub(crate) fn spawn_flat_scan_task(
1182        &self,
1183        mut input: BoxedRecordBatchStream,
1184        semaphore: Arc<Semaphore>,
1185        sender: mpsc::Sender<Result<RecordBatch>>,
1186    ) {
1187        let region_id = self.region_metadata().region_id;
1188        let span = tracing::info_span!(
1189            "ScanInput::parallel_scan_task",
1190            region_id = %region_id,
1191            stream_kind = "flat"
1192        );
1193        common_runtime::spawn_global(
1194            async move {
1195                loop {
1196                    // We release the permit before sending result to avoid the task waiting on
1197                    // the channel with the permit held.
1198                    let maybe_batch = {
1199                        // Safety: We never close the semaphore.
1200                        let _permit = semaphore.acquire().await.unwrap();
1201                        input.next().await
1202                    };
1203                    match maybe_batch {
1204                        Some(Ok(batch)) => {
1205                            let _ = sender.send(Ok(batch)).await;
1206                        }
1207                        Some(Err(e)) => {
1208                            let _ = sender.send(Err(e)).await;
1209                            break;
1210                        }
1211                        None => break,
1212                    }
1213                }
1214            }
1215            .instrument(span),
1216        );
1217    }
1218
1219    pub(crate) fn total_rows(&self) -> usize {
1220        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1221        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1222
1223        let rows = rows_in_files + rows_in_memtables;
1224        #[cfg(feature = "enterprise")]
1225        let rows = rows
1226            + self
1227                .extension_ranges
1228                .iter()
1229                .map(|x| x.num_rows())
1230                .sum::<u64>() as usize;
1231        rows
1232    }
1233
1234    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1235        &self.predicate
1236    }
1237
1238    /// Returns number of memtables to scan.
1239    pub(crate) fn num_memtables(&self) -> usize {
1240        self.memtables.len()
1241    }
1242
1243    /// Returns number of SST files to scan.
1244    pub(crate) fn num_files(&self) -> usize {
1245        self.files.len()
1246    }
1247
1248    /// Gets the file handle from a row group index.
1249    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1250        let file_index = index.index - self.num_memtables();
1251        &self.files[file_index]
1252    }
1253
1254    pub fn region_metadata(&self) -> &RegionMetadataRef {
1255        self.mapper.metadata()
1256    }
1257}
1258
1259#[cfg(feature = "enterprise")]
1260impl ScanInput {
1261    #[must_use]
1262    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1263        Self {
1264            extension_ranges,
1265            ..self
1266        }
1267    }
1268
1269    #[cfg(feature = "enterprise")]
1270    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1271        &self.extension_ranges
1272    }
1273
1274    /// Get a boxed [ExtensionRange] by the index in all ranges.
1275    #[cfg(feature = "enterprise")]
1276    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1277        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1278    }
1279}
1280
1281#[cfg(test)]
1282impl ScanInput {
1283    /// Returns SST file ids to scan.
1284    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1285        self.files.iter().map(|file| file.file_id()).collect()
1286    }
1287
1288    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1289        self.files.iter().map(|file| file.index_id()).collect()
1290    }
1291}
1292
1293fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1294    if append_mode {
1295        return PreFilterMode::All;
1296    }
1297
1298    match merge_mode {
1299        MergeMode::LastRow => PreFilterMode::SkipFields,
1300        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1301    }
1302}
1303
1304/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1305/// for partition range caching.
1306pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1307    let eligible = !input.compaction
1308        && !input.files.is_empty()
1309        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1310
1311    if !eligible {
1312        return None;
1313    }
1314
1315    let metadata = input.region_metadata();
1316    let tag_names: HashSet<&str> = metadata
1317        .column_metadatas
1318        .iter()
1319        .filter(|col| col.semantic_type == SemanticType::Tag)
1320        .map(|col| col.column_schema.name.as_str())
1321        .collect();
1322
1323    let time_index = metadata.time_index_column();
1324    let time_index_name = time_index.column_schema.name.clone();
1325    let ts_col_unit = time_index
1326        .column_schema
1327        .data_type
1328        .as_timestamp()
1329        .expect("Time index must have timestamp-compatible type")
1330        .unit();
1331
1332    let exprs = input
1333        .predicate_group()
1334        .predicate_without_region()
1335        .map(|predicate| predicate.exprs())
1336        .unwrap_or_default();
1337
1338    let mut filters = Vec::new();
1339    let mut time_filters = Vec::new();
1340    let mut has_tag_filter = false;
1341    let mut columns = HashSet::new();
1342
1343    for expr in exprs {
1344        columns.clear();
1345        let is_time_only = match expr_to_columns(expr, &mut columns) {
1346            Ok(()) if !columns.is_empty() => {
1347                has_tag_filter |= columns
1348                    .iter()
1349                    .any(|col| tag_names.contains(col.name.as_str()));
1350                columns.iter().all(|col| col.name == time_index_name)
1351            }
1352            _ => false,
1353        };
1354
1355        if is_time_only
1356            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1357        {
1358            // Range-reducible time predicates can be safely dropped from the
1359            // cache key when the query time range covers the partition range.
1360            time_filters.push(expr.to_string());
1361        } else {
1362            // Non-time filters and non-range time predicates (those that
1363            // extract_time_range_from_expr cannot convert to a TimestampRange)
1364            // always stay in the cache key.
1365            filters.push(expr.to_string());
1366        }
1367    }
1368
1369    if !has_tag_filter {
1370        // We only cache requests that have tag filters to avoid caching all series.
1371        return None;
1372    }
1373
1374    // Ensure the filters are sorted for consistent fingerprinting.
1375    filters.sort_unstable();
1376    time_filters.sort_unstable();
1377
1378    Some(
1379        crate::read::range_cache::ScanRequestFingerprintBuilder {
1380            read_column_ids: input.read_column_ids.clone(),
1381            read_column_types: input
1382                .read_column_ids
1383                .iter()
1384                .map(|id| {
1385                    metadata
1386                        .column_by_id(*id)
1387                        .map(|col| col.column_schema.data_type.clone())
1388                })
1389                .collect(),
1390            filters,
1391            time_filters,
1392            series_row_selector: input.series_row_selector,
1393            append_mode: input.append_mode,
1394            filter_deleted: input.filter_deleted,
1395            merge_mode: input.merge_mode,
1396            partition_expr_version: metadata.partition_expr_version,
1397        }
1398        .build(),
1399    )
1400}
1401
1402/// Context shared by different streams from a scanner.
1403/// It contains the input and ranges to scan.
1404pub struct StreamContext {
1405    /// Input memtables and files.
1406    pub input: ScanInput,
1407    /// Metadata for partition ranges.
1408    pub(crate) ranges: Vec<RangeMeta>,
1409    /// Precomputed scan fingerprint for partition range caching.
1410    /// `None` when the scan is not eligible for caching.
1411    #[allow(dead_code)]
1412    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1413
1414    // Metrics:
1415    /// The start time of the query.
1416    pub(crate) query_start: Instant,
1417}
1418
1419impl StreamContext {
1420    /// Creates a new [StreamContext] for [SeqScan].
1421    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1422        let query_start = input.query_start.unwrap_or_else(Instant::now);
1423        let ranges = RangeMeta::seq_scan_ranges(&input);
1424        READ_SST_COUNT.observe(input.num_files() as f64);
1425        let scan_fingerprint = build_scan_fingerprint(&input);
1426
1427        Self {
1428            input,
1429            ranges,
1430            scan_fingerprint,
1431            query_start,
1432        }
1433    }
1434
1435    /// Creates a new [StreamContext] for [UnorderedScan].
1436    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1437        let query_start = input.query_start.unwrap_or_else(Instant::now);
1438        let ranges = RangeMeta::unordered_scan_ranges(&input);
1439        READ_SST_COUNT.observe(input.num_files() as f64);
1440        let scan_fingerprint = build_scan_fingerprint(&input);
1441
1442        Self {
1443            input,
1444            ranges,
1445            scan_fingerprint,
1446            query_start,
1447        }
1448    }
1449
1450    /// Returns true if the index refers to a memtable.
1451    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1452        self.input.num_memtables() > index.index
1453    }
1454
1455    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1456        !self.is_mem_range_index(index)
1457            && index.index < self.input.num_files() + self.input.num_memtables()
1458    }
1459
1460    /// Retrieves the partition ranges.
1461    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1462        self.ranges
1463            .iter()
1464            .enumerate()
1465            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1466            .collect()
1467    }
1468
1469    /// Format the context for explain.
1470    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1471        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1472        for range_meta in &self.ranges {
1473            for idx in &range_meta.row_group_indices {
1474                if self.is_mem_range_index(*idx) {
1475                    num_mem_ranges += 1;
1476                } else if self.is_file_range_index(*idx) {
1477                    num_file_ranges += 1;
1478                } else {
1479                    num_other_ranges += 1;
1480                }
1481            }
1482        }
1483        if verbose {
1484            write!(f, "{{")?;
1485        }
1486        write!(
1487            f,
1488            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1489            self.ranges.len(),
1490            num_mem_ranges,
1491            self.input.num_files(),
1492            num_file_ranges,
1493        )?;
1494        if num_other_ranges > 0 {
1495            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1496        }
1497        write!(f, "}}")?;
1498
1499        if let Some(selector) = &self.input.series_row_selector {
1500            write!(f, ", \"selector\":\"{}\"", selector)?;
1501        }
1502        if let Some(distribution) = &self.input.distribution {
1503            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1504        }
1505
1506        if verbose {
1507            self.format_verbose_content(f)?;
1508        }
1509
1510        Ok(())
1511    }
1512
1513    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1514        struct FileWrapper<'a> {
1515            file: &'a FileHandle,
1516        }
1517
1518        impl fmt::Debug for FileWrapper<'_> {
1519            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1520                let (start, end) = self.file.time_range();
1521                write!(
1522                    f,
1523                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1524                    self.file.file_id(),
1525                    start.value(),
1526                    start.unit(),
1527                    end.value(),
1528                    end.unit(),
1529                    self.file.num_rows(),
1530                    self.file.size(),
1531                    self.file.index_size()
1532                )
1533            }
1534        }
1535
1536        struct InputWrapper<'a> {
1537            input: &'a ScanInput,
1538        }
1539
1540        #[cfg(feature = "enterprise")]
1541        impl InputWrapper<'_> {
1542            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1543                if self.input.extension_ranges.is_empty() {
1544                    return Ok(());
1545                }
1546
1547                let mut delimiter = "";
1548                write!(f, ", extension_ranges: [")?;
1549                for range in self.input.extension_ranges() {
1550                    write!(f, "{}{:?}", delimiter, range)?;
1551                    delimiter = ", ";
1552                }
1553                write!(f, "]")?;
1554                Ok(())
1555            }
1556        }
1557
1558        impl fmt::Debug for InputWrapper<'_> {
1559            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1560                let output_schema = self.input.mapper.output_schema();
1561                if !output_schema.is_empty() {
1562                    let names: Vec<_> = output_schema
1563                        .column_schemas()
1564                        .iter()
1565                        .map(|col| &col.name)
1566                        .collect();
1567                    write!(f, ", \"projection\": {:?}", names)?;
1568                }
1569                if let Some(predicate) = &self.input.predicate.predicate() {
1570                    if !predicate.exprs().is_empty() {
1571                        let exprs: Vec<_> =
1572                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1573                        write!(f, ", \"filters\": {:?}", exprs)?;
1574                    }
1575                    if !predicate.dyn_filters().is_empty() {
1576                        let dyn_filters: Vec<_> = predicate
1577                            .dyn_filters()
1578                            .iter()
1579                            .map(|f| format!("{}", f))
1580                            .collect();
1581                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1582                    }
1583                }
1584                #[cfg(feature = "vector_index")]
1585                if let Some(vector_index_k) = self.input.vector_index_k {
1586                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1587                }
1588                if !self.input.files.is_empty() {
1589                    write!(f, ", \"files\": ")?;
1590                    f.debug_list()
1591                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1592                        .finish()?;
1593                }
1594                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1595                #[cfg(feature = "enterprise")]
1596                self.format_extension_ranges(f)?;
1597
1598                Ok(())
1599            }
1600        }
1601
1602        write!(f, "{:?}", InputWrapper { input: &self.input })
1603    }
1604
1605    /// Add new dynamic filters to the predicates.
1606    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1607    pub(crate) fn add_dyn_filter_to_predicate(
1608        self: &Arc<Self>,
1609        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1610    ) -> Vec<bool> {
1611        let mut supported = Vec::with_capacity(filter_exprs.len());
1612        let filter_expr = filter_exprs
1613            .into_iter()
1614            .filter_map(|expr| {
1615                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1616                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1617            {
1618                supported.push(true);
1619                Some(dyn_filter)
1620            } else {
1621                supported.push(false);
1622                None
1623            }
1624            })
1625            .collect();
1626        self.input.predicate.add_dyn_filters(filter_expr);
1627        supported
1628    }
1629}
1630
1631/// Predicates to evaluate.
1632/// It only keeps filters that [SimpleFilterEvaluator] supports.
1633#[derive(Clone, Default)]
1634pub struct PredicateGroup {
1635    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1636    /// Predicate that includes request filters and region partition expr (if any).
1637    predicate_all: Predicate,
1638    /// Predicate that only includes request filters.
1639    predicate_without_region: Predicate,
1640    /// Region partition expression restored from metadata.
1641    region_partition_expr: Option<PartitionExpr>,
1642}
1643
1644impl PredicateGroup {
1645    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1646    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1647        let mut combined_exprs = exprs.to_vec();
1648        let mut region_partition_expr = None;
1649
1650        if let Some(expr_json) = metadata.partition_expr.as_ref()
1651            && !expr_json.is_empty()
1652            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1653                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1654        {
1655            let logical_expr = expr
1656                .try_as_logical_expr()
1657                .context(InvalidPartitionExprSnafu {
1658                    expr: expr_json.clone(),
1659                })?;
1660
1661            combined_exprs.push(logical_expr);
1662            region_partition_expr = Some(expr);
1663        }
1664
1665        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1666        // Columns in the expr.
1667        let mut columns = HashSet::new();
1668        for expr in &combined_exprs {
1669            columns.clear();
1670            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1671                continue;
1672            };
1673            time_filters.push(filter);
1674        }
1675        let time_filters = if time_filters.is_empty() {
1676            None
1677        } else {
1678            Some(Arc::new(time_filters))
1679        };
1680
1681        let predicate_all = Predicate::new(combined_exprs);
1682        let predicate_without_region = Predicate::new(exprs.to_vec());
1683
1684        Ok(Self {
1685            time_filters,
1686            predicate_all,
1687            predicate_without_region,
1688            region_partition_expr,
1689        })
1690    }
1691
1692    /// Returns time filters.
1693    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1694        self.time_filters.clone()
1695    }
1696
1697    /// Returns predicate of all exprs (including region partition expr if present).
1698    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1699        if self.predicate_all.is_empty() {
1700            None
1701        } else {
1702            Some(&self.predicate_all)
1703        }
1704    }
1705
1706    /// Returns predicate that excludes region partition expr.
1707    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1708        if self.predicate_without_region.is_empty() {
1709            None
1710        } else {
1711            Some(&self.predicate_without_region)
1712        }
1713    }
1714
1715    /// Add dynamic filters in the predicates.
1716    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1717        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1718        self.predicate_without_region.add_dyn_filters(dyn_filters);
1719    }
1720
1721    /// Returns the region partition expr from metadata, if any.
1722    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1723        self.region_partition_expr.as_ref()
1724    }
1725
1726    fn expr_to_filter(
1727        expr: &Expr,
1728        metadata: &RegionMetadata,
1729        columns: &mut HashSet<Column>,
1730    ) -> Option<SimpleFilterEvaluator> {
1731        columns.clear();
1732        // `expr_to_columns` won't return error.
1733        // We still ignore these expressions for safety.
1734        expr_to_columns(expr, columns).ok()?;
1735        if columns.len() > 1 {
1736            // Simple filter doesn't support multiple columns.
1737            return None;
1738        }
1739        let column = columns.iter().next()?;
1740        let column_meta = metadata.column_by_name(&column.name)?;
1741        if column_meta.semantic_type == SemanticType::Timestamp {
1742            SimpleFilterEvaluator::try_new(expr)
1743        } else {
1744            None
1745        }
1746    }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751    use std::sync::Arc;
1752
1753    use datafusion::physical_plan::expressions::lit as physical_lit;
1754    use datafusion_common::ScalarValue;
1755    use datafusion_expr::{col, lit};
1756    use datatypes::value::Value;
1757    use partition::expr::col as partition_col;
1758    use store_api::metadata::RegionMetadataBuilder;
1759    use store_api::storage::{
1760        ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
1761    };
1762
1763    use super::*;
1764    use crate::cache::CacheManager;
1765    use crate::memtable::time_partition::TimePartitions;
1766    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1767    use crate::region::version::VersionBuilder;
1768    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1769    use crate::test_util::scheduler_util::SchedulerEnv;
1770
1771    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1772        let mutable = Arc::new(TimePartitions::new(
1773            metadata.clone(),
1774            Arc::new(EmptyMemtableBuilder::default()),
1775            0,
1776            None,
1777        ));
1778        Arc::new(VersionBuilder::new(metadata, mutable).build())
1779    }
1780
1781    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1782        let env = SchedulerEnv::new().await;
1783        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1784        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1785        let file = FileHandle::new(
1786            crate::sst::file::FileMeta::default(),
1787            Arc::new(crate::sst::file_purger::NoopFilePurger),
1788        );
1789
1790        ScanInput::new(env.access_layer.clone(), mapper)
1791            .with_predicate(predicate)
1792            .with_cache(CacheStrategy::EnableAll(Arc::new(
1793                CacheManager::builder()
1794                    .range_result_cache_size(1024)
1795                    .build(),
1796            )))
1797            .with_files(vec![file])
1798    }
1799
1800    #[tokio::test]
1801    async fn test_build_read_column_ids_includes_filters() {
1802        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1803        let version = new_version(metadata.clone());
1804        let env = SchedulerEnv::new().await;
1805        let request = ScanRequest {
1806            projection_input: Some(vec![4].into()),
1807            filters: vec![
1808                col("v0").gt(lit(1)),
1809                col("ts").gt(lit(0)),
1810                col("k0").eq(lit("foo")),
1811            ],
1812            ..Default::default()
1813        };
1814        let scan_region = ScanRegion::new(
1815            version,
1816            env.access_layer.clone(),
1817            request,
1818            CacheStrategy::Disabled,
1819        );
1820        let predicate =
1821            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1822        let projection = &scan_region.request.projection_indices().unwrap();
1823        let read_ids = scan_region
1824            .build_read_column_ids(projection, &predicate)
1825            .unwrap();
1826        assert_eq!(vec![4, 0, 2, 3], read_ids);
1827    }
1828
1829    #[tokio::test]
1830    async fn test_build_read_column_ids_empty_projection() {
1831        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1832        let version = new_version(metadata.clone());
1833        let env = SchedulerEnv::new().await;
1834        let request = ScanRequest {
1835            projection_input: Some(ProjectionInput::default()),
1836            ..Default::default()
1837        };
1838        let scan_region = ScanRegion::new(
1839            version,
1840            env.access_layer.clone(),
1841            request,
1842            CacheStrategy::Disabled,
1843        );
1844        let predicate =
1845            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1846        let projection = &scan_region.request.projection_indices().unwrap();
1847        let read_ids = scan_region
1848            .build_read_column_ids(projection, &predicate)
1849            .unwrap();
1850        // Empty projection should still read the time index column (id 2 in this test schema).
1851        assert_eq!(vec![2], read_ids);
1852    }
1853
1854    #[tokio::test]
1855    async fn test_build_read_column_ids_keeps_projection_order() {
1856        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1857        let version = new_version(metadata.clone());
1858        let env = SchedulerEnv::new().await;
1859        let request = ScanRequest {
1860            projection_input: Some(vec![4, 1].into()),
1861            filters: vec![col("v0").gt(lit(1))],
1862            ..Default::default()
1863        };
1864        let scan_region = ScanRegion::new(
1865            version,
1866            env.access_layer.clone(),
1867            request,
1868            CacheStrategy::Disabled,
1869        );
1870        let predicate =
1871            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1872        let projection = &scan_region.request.projection_indices().unwrap();
1873        let read_ids = scan_region
1874            .build_read_column_ids(projection, &predicate)
1875            .unwrap();
1876        // Projection order preserved, extra columns appended in schema order.
1877        assert_eq!(vec![4, 1, 3], read_ids);
1878    }
1879
1880    /// Helper to create a timestamp millisecond literal.
1881    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1882        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1883    }
1884
1885    #[tokio::test]
1886    async fn test_build_scan_fingerprint_for_eligible_scan() {
1887        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1888        let input = new_scan_input(
1889            metadata.clone(),
1890            vec![
1891                col("ts").gt_eq(ts_lit(1000)),
1892                col("k0").eq(lit("foo")),
1893                col("v0").gt(lit(1)),
1894            ],
1895        )
1896        .await
1897        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1898        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1899        .with_merge_mode(MergeMode::LastNonNull)
1900        .with_filter_deleted(false);
1901
1902        let fingerprint = build_scan_fingerprint(&input).unwrap();
1903
1904        let expected = ScanRequestFingerprintBuilder {
1905            read_column_ids: input.read_column_ids.clone(),
1906            read_column_types: vec![
1907                metadata
1908                    .column_by_id(0)
1909                    .map(|col| col.column_schema.data_type.clone()),
1910                metadata
1911                    .column_by_id(2)
1912                    .map(|col| col.column_schema.data_type.clone()),
1913                metadata
1914                    .column_by_id(3)
1915                    .map(|col| col.column_schema.data_type.clone()),
1916            ],
1917            filters: vec![
1918                col("k0").eq(lit("foo")).to_string(),
1919                col("v0").gt(lit(1)).to_string(),
1920            ],
1921            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1922            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1923            append_mode: false,
1924            filter_deleted: false,
1925            merge_mode: MergeMode::LastNonNull,
1926            partition_expr_version: 0,
1927        }
1928        .build();
1929        assert_eq!(expected, fingerprint);
1930    }
1931
1932    #[tokio::test]
1933    async fn test_build_scan_fingerprint_requires_tag_filter() {
1934        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1935        let input = new_scan_input(
1936            metadata,
1937            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1938        )
1939        .await;
1940
1941        assert!(build_scan_fingerprint(&input).is_none());
1942    }
1943
1944    #[tokio::test]
1945    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1946        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1947        let filters = vec![col("k0").eq(lit("foo"))];
1948
1949        let disabled = ScanInput::new(
1950            SchedulerEnv::new().await.access_layer.clone(),
1951            ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1952        )
1953        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1954        assert!(build_scan_fingerprint(&disabled).is_none());
1955
1956        let compaction = new_scan_input(metadata.clone(), filters.clone())
1957            .await
1958            .with_compaction(true);
1959        assert!(build_scan_fingerprint(&compaction).is_none());
1960
1961        // No files to read.
1962        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1963        assert!(build_scan_fingerprint(&no_files).is_none());
1964    }
1965
1966    #[tokio::test]
1967    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1968        let base = metadata_with_primary_key(vec![0, 1], false);
1969        let mut builder = RegionMetadataBuilder::from_existing(base);
1970        let partition_expr = partition_col("k0")
1971            .gt_eq(Value::String("foo".into()))
1972            .as_json_str()
1973            .unwrap();
1974        builder.partition_expr_json(Some(partition_expr));
1975        let metadata = Arc::new(builder.build_without_validation().unwrap());
1976
1977        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1978        let fingerprint = build_scan_fingerprint(&input).unwrap();
1979
1980        let expected = ScanRequestFingerprintBuilder {
1981            read_column_ids: input.read_column_ids.clone(),
1982            read_column_types: vec![
1983                metadata
1984                    .column_by_id(0)
1985                    .map(|col| col.column_schema.data_type.clone()),
1986                metadata
1987                    .column_by_id(2)
1988                    .map(|col| col.column_schema.data_type.clone()),
1989                metadata
1990                    .column_by_id(3)
1991                    .map(|col| col.column_schema.data_type.clone()),
1992            ],
1993            filters: vec![col("k0").eq(lit("foo")).to_string()],
1994            time_filters: vec![],
1995            series_row_selector: None,
1996            append_mode: false,
1997            filter_deleted: true,
1998            merge_mode: MergeMode::LastRow,
1999            partition_expr_version: metadata.partition_expr_version,
2000        }
2001        .build();
2002        assert_eq!(expected, fingerprint);
2003        assert_ne!(0, metadata.partition_expr_version);
2004    }
2005
2006    #[test]
2007    fn test_update_dyn_filters_with_empty_base_predicates() {
2008        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2009        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2010        assert!(predicate_group.predicate().is_none());
2011        assert!(predicate_group.predicate_without_region().is_none());
2012
2013        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2014        predicate_group.add_dyn_filters(vec![dyn_filter]);
2015
2016        let predicate_all = predicate_group.predicate().unwrap();
2017        assert!(predicate_all.exprs().is_empty());
2018        assert_eq!(1, predicate_all.dyn_filters().len());
2019
2020        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2021        assert!(predicate_without_region.exprs().is_empty());
2022        assert_eq!(1, predicate_without_region.dyn_filters().len());
2023    }
2024}