Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::adapter::RegionQueryStatCounters;
27use common_recordbatch::filter::SimpleFilterEvaluator;
28use common_telemetry::tracing::Instrument;
29use common_telemetry::{debug, error, tracing, warn};
30use common_time::range::TimestampRange;
31use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
32use datafusion_common::pruning::PruningStatistics;
33use datafusion_common::{Column, ScalarValue};
34use datafusion_expr::Expr;
35use datafusion_expr::utils::expr_to_columns;
36use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
37use datatypes::extension::json::is_structured_json_field;
38use datatypes::types::json_type::JsonNativeType;
39use datatypes::value::timestamp_to_scalar_value;
40use futures::StreamExt;
41use itertools::Itertools;
42use partition::expr::PartitionExpr;
43use smallvec::SmallVec;
44use snafu::ResultExt;
45use store_api::metadata::{RegionMetadata, RegionMetadataRef};
46use store_api::region_engine::{PartitionRange, RegionScannerRef};
47use store_api::storage::{
48    NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
49    TimeSeriesRowSelector,
50};
51use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
52use tokio::sync::{Semaphore, mpsc};
53use tokio_stream::wrappers::ReceiverStream;
54
55use crate::access_layer::AccessLayerRef;
56use crate::cache::CacheStrategy;
57use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
58use crate::error::{InvalidPartitionExprSnafu, Result};
59#[cfg(feature = "enterprise")]
60use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
61use crate::memtable::{MemtableRange, RangesOptions};
62use crate::metrics::READ_SST_COUNT;
63use crate::read::compat::{self, FlatCompatBatch};
64use crate::read::flat_projection::FlatProjectionMapper;
65use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
66use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
67use crate::read::read_columns::{
68    ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
69    read_columns_from_projection,
70};
71use crate::read::seq_scan::SeqScan;
72use crate::read::series_scan::SeriesScan;
73use crate::read::stream::ScanBatchStream;
74use crate::read::unordered_scan::UnorderedScan;
75use crate::read::{BoxedRecordBatchStream, RecordBatch};
76use crate::region::options::MergeMode;
77use crate::region::version::VersionRef;
78use crate::sst::file::FileHandle;
79use crate::sst::index::bloom_filter::applier::{
80    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
81};
82use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
83use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
84use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
85use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
86#[cfg(feature = "vector_index")]
87use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
88use crate::sst::parquet::file_range::PreFilterMode;
89use crate::sst::parquet::reader::ReaderMetrics;
90
91#[cfg(feature = "vector_index")]
92const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
93
94/// A scanner scans a region and returns a [SendableRecordBatchStream].
95pub(crate) enum Scanner {
96    /// Sequential scan.
97    Seq(SeqScan),
98    /// Unordered scan.
99    Unordered(UnorderedScan),
100    /// Per-series scan.
101    Series(SeriesScan),
102}
103
104impl Scanner {
105    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
106    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
107    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
108        match self {
109            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
110            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
111            Scanner::Series(series_scan) => series_scan.build_stream().await,
112        }
113    }
114
115    /// Create a stream of [`Batch`] by this scanner.
116    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
117        match self {
118            Scanner::Seq(x) => x.scan_all_partitions(),
119            Scanner::Unordered(x) => x.scan_all_partitions(),
120            Scanner::Series(x) => x.scan_all_partitions(),
121        }
122    }
123}
124
125#[cfg(test)]
126impl Scanner {
127    /// Returns number of files to scan.
128    pub(crate) fn num_files(&self) -> usize {
129        match self {
130            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
131            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
132            Scanner::Series(series_scan) => series_scan.input().num_files(),
133        }
134    }
135
136    /// Returns number of memtables to scan.
137    pub(crate) fn num_memtables(&self) -> usize {
138        match self {
139            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
140            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
141            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
142        }
143    }
144
145    /// Returns SST file ids to scan.
146    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
147        match self {
148            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
149            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
150            Scanner::Series(series_scan) => series_scan.input().file_ids(),
151        }
152    }
153
154    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
155        match self {
156            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
157            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
158            Scanner::Series(series_scan) => series_scan.input().index_ids(),
159        }
160    }
161
162    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
163        match self {
164            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
165            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
166            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
167        }
168    }
169
170    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
171    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
172        use store_api::region_engine::{PrepareRequest, RegionScanner};
173
174        let request = PrepareRequest::default().with_target_partitions(target_partitions);
175        match self {
176            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
177            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
178            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
179        }
180    }
181}
182
183#[cfg_attr(doc, aquamarine::aquamarine)]
184/// Helper to scans a region by [ScanRequest].
185///
186/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
187/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
188///
189/// ```mermaid
190/// classDiagram
191/// class ScanRegion {
192///     -VersionRef version
193///     -ScanRequest request
194///     ~scanner() Scanner
195///     ~seq_scan() SeqScan
196/// }
197/// class Scanner {
198///     <<enumeration>>
199///     SeqScan
200///     UnorderedScan
201///     +scan() SendableRecordBatchStream
202/// }
203/// class SeqScan {
204///     -ScanInput input
205///     +build() SendableRecordBatchStream
206/// }
207/// class UnorderedScan {
208///     -ScanInput input
209///     +build() SendableRecordBatchStream
210/// }
211/// class ScanInput {
212///     -ProjectionMapper mapper
213///     -Option~TimeRange~ time_range
214///     -Option~Predicate~ predicate
215///     -Vec~MemtableRef~ memtables
216///     -Vec~FileHandle~ files
217/// }
218/// class ProjectionMapper {
219///     ~output_schema() SchemaRef
220///     ~convert(Batch) RecordBatch
221/// }
222/// ScanRegion -- Scanner
223/// ScanRegion o-- ScanRequest
224/// Scanner o-- SeqScan
225/// Scanner o-- UnorderedScan
226/// SeqScan o-- ScanInput
227/// UnorderedScan o-- ScanInput
228/// Scanner -- SendableRecordBatchStream
229/// ScanInput o-- ProjectionMapper
230/// SeqScan -- SendableRecordBatchStream
231/// UnorderedScan -- SendableRecordBatchStream
232/// ```
233pub(crate) struct ScanRegion {
234    /// Version of the region at scan.
235    version: VersionRef,
236    /// Access layer of the region.
237    access_layer: AccessLayerRef,
238    /// Scan request.
239    request: ScanRequest,
240    /// Cache.
241    cache_strategy: CacheStrategy,
242    /// Maximum number of SST files to scan concurrently.
243    max_concurrent_scan_files: usize,
244    /// Whether to ignore inverted index.
245    ignore_inverted_index: bool,
246    /// Whether to ignore fulltext index.
247    ignore_fulltext_index: bool,
248    /// Whether to ignore bloom filter.
249    ignore_bloom_filter: bool,
250    /// Start time of the scan task.
251    start_time: Option<Instant>,
252    /// Whether to filter out the deleted rows.
253    /// Usually true for normal read, and false for scan for compaction.
254    filter_deleted: bool,
255    /// Counters that should receive query-load metrics.
256    query_stat_counters: Option<RegionQueryStatCounters>,
257    #[cfg(feature = "enterprise")]
258    extension_range_provider: Option<BoxedExtensionRangeProvider>,
259}
260
261impl ScanRegion {
262    /// Creates a [ScanRegion].
263    pub(crate) fn new(
264        version: VersionRef,
265        access_layer: AccessLayerRef,
266        request: ScanRequest,
267        cache_strategy: CacheStrategy,
268    ) -> ScanRegion {
269        ScanRegion {
270            version,
271            access_layer,
272            request,
273            cache_strategy,
274            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
275            ignore_inverted_index: false,
276            ignore_fulltext_index: false,
277            ignore_bloom_filter: false,
278            start_time: None,
279            filter_deleted: true,
280            query_stat_counters: None,
281            #[cfg(feature = "enterprise")]
282            extension_range_provider: None,
283        }
284    }
285
286    /// Sets counters that should receive query-load metrics.
287    #[must_use]
288    pub(crate) fn with_query_stat_counters(mut self, counters: RegionQueryStatCounters) -> Self {
289        self.query_stat_counters = Some(counters);
290        self
291    }
292
293    /// Sets maximum number of SST files to scan concurrently.
294    #[must_use]
295    pub(crate) fn with_max_concurrent_scan_files(
296        mut self,
297        max_concurrent_scan_files: usize,
298    ) -> Self {
299        self.max_concurrent_scan_files = max_concurrent_scan_files;
300        self
301    }
302
303    /// Sets whether to ignore inverted index.
304    #[must_use]
305    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
306        self.ignore_inverted_index = ignore;
307        self
308    }
309
310    /// Sets whether to ignore fulltext index.
311    #[must_use]
312    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
313        self.ignore_fulltext_index = ignore;
314        self
315    }
316
317    /// Sets whether to ignore bloom filter.
318    #[must_use]
319    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
320        self.ignore_bloom_filter = ignore;
321        self
322    }
323
324    #[must_use]
325    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
326        self.start_time = Some(now);
327        self
328    }
329
330    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
331        self.filter_deleted = filter_deleted;
332    }
333
334    #[cfg(feature = "enterprise")]
335    pub(crate) fn set_extension_range_provider(
336        &mut self,
337        extension_range_provider: BoxedExtensionRangeProvider,
338    ) {
339        self.extension_range_provider = Some(extension_range_provider);
340    }
341
342    /// Returns a [Scanner] to scan the region.
343    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
344    pub(crate) async fn scanner(self) -> Result<Scanner> {
345        if self.use_series_scan() {
346            self.series_scan().await.map(Scanner::Series)
347        } else if self.use_unordered_scan() {
348            // If table is append only and there is no series row selector, we use unordered scan in query.
349            // We still use seq scan in compaction.
350            self.unordered_scan().await.map(Scanner::Unordered)
351        } else {
352            self.seq_scan().await.map(Scanner::Seq)
353        }
354    }
355
356    /// Returns a [RegionScanner] to scan the region.
357    #[tracing::instrument(
358        level = tracing::Level::DEBUG,
359        skip_all,
360        fields(region_id = %self.region_id())
361    )]
362    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
363        if self.use_series_scan() {
364            self.series_scan()
365                .await
366                .map(|scanner| Box::new(scanner) as _)
367        } else if self.use_unordered_scan() {
368            self.unordered_scan()
369                .await
370                .map(|scanner| Box::new(scanner) as _)
371        } else {
372            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
373        }
374    }
375
376    /// Scan sequentially.
377    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
379        let input = self.scan_input().await?.with_compaction(false);
380        Ok(SeqScan::new(input))
381    }
382
383    /// Unordered scan.
384    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
385    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
386        let input = self.scan_input().await?;
387        Ok(UnorderedScan::new(input))
388    }
389
390    /// Scans by series.
391    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
392    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
393        let input = self.scan_input().await?;
394        Ok(SeriesScan::new(input))
395    }
396
397    /// Returns true if the region can use unordered scan for current request.
398    fn use_unordered_scan(&self) -> bool {
399        // We use unordered scan when:
400        // 1. The region is in append mode.
401        // 2. There is no series row selector.
402        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
403        //
404        // We still use seq scan in compaction.
405        self.version.options.append_mode
406            && self.request.series_row_selector.is_none()
407            && (self.request.distribution.is_none()
408                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
409    }
410
411    /// Returns true if the region can use series scan for current request.
412    fn use_series_scan(&self) -> bool {
413        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
414    }
415
416    /// Creates a scan input.
417    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
418    async fn scan_input(self) -> Result<ScanInput> {
419        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
420        let time_range = self.build_time_range_predicate();
421        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
422
423        let mut read_cols = match &self.request.projection_input {
424            Some(p) => {
425                // Read columns include the pushed-down projection and columns
426                // resolved from the predicate.
427                let metadata = &self.version.metadata;
428                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
429                let from_predicate = read_columns_from_predicate(&predicate, metadata);
430                merge(from_projection, from_predicate)
431            }
432            None => {
433                let read_col_ids = self
434                    .version
435                    .metadata
436                    .column_metadatas
437                    .iter()
438                    .map(|col| col.column_id);
439                ReadColumns::from_deduped_column_ids(read_col_ids)
440            }
441        };
442        // Only narrow read columns and pass JSON type hints for structured JSON (JSON2)
443        // columns. Legacy JSONB columns have JSON extension metadata but their physical
444        // Arrow type is Binary, not Struct, so they must not enter structured JSON paths.
445        let has_structured_json = self
446            .version
447            .metadata
448            .schema
449            .arrow_schema()
450            .fields()
451            .iter()
452            .any(is_structured_json_field);
453        if has_structured_json {
454            narrow_read_columns_by_json_type_hint(
455                &mut read_cols,
456                &self.request.json_type_hint,
457                &self.version.metadata,
458            );
459        }
460        let read_col_ids = read_cols.column_ids();
461
462        // The mapper always computes projected column ids as the schema of SSTs may change.
463        let projection = self
464            .request
465            .projection_indices()
466            .map(|x| x.to_vec())
467            .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
468        let json_type_hint = has_structured_json
469            .then_some(&self.request.json_type_hint)
470            .inspect(|json_type_hint| {
471                debug!(
472                    "Concretized JSON type: {{{}}}",
473                    json_type_hint
474                        .iter()
475                        .map(|(k, v)| format!("{}: {}", k, v))
476                        .join(", ")
477                );
478            });
479        let mapper = FlatProjectionMapper::new_with_read_columns(
480            &self.version.metadata,
481            projection,
482            read_cols,
483            json_type_hint,
484        )?;
485
486        let ssts = &self.version.ssts;
487        let mut files = Vec::new();
488        if !self.request.skip_sst_files {
489            for level in ssts.levels() {
490                for file in level.files.values() {
491                    let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
492                        (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
493                        // If the file's sequence is None (or actually is zero), it could mean the file
494                        // is generated and added to the region "directly". In this case, its data should
495                        // be considered as fresh as the memtable. So its sequence is treated greater than
496                        // the min_sequence, whatever the value of min_sequence is. Hence the default
497                        // "true" in this arm.
498                        (Some(_), None) => true,
499                        (None, _) => true,
500                    };
501
502                    // Finds SST files in range.
503                    if exceed_min_sequence && file_in_range(file, &time_range) {
504                        files.push(file.clone());
505                    }
506                    // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
507                    // unless the timing is too good, or the sequence number wouldn't be in file.
508                    // and the batch will be filtered out by tree reader anyway.
509                }
510            }
511        }
512
513        let memtables = self.version.memtables.list_memtables();
514        // Skip empty memtables and memtables out of time range.
515        let mut mem_range_builders = Vec::new();
516        let filter_mode = pre_filter_mode(
517            self.version.options.append_mode,
518            self.version.options.merge_mode(),
519        );
520
521        for m in memtables {
522            // check if memtable is empty by reading stats.
523            let Some((start, end)) = m.stats().time_range() else {
524                continue;
525            };
526            // The time range of the memtable is inclusive.
527            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
528            if !memtable_range.intersects(&time_range) {
529                continue;
530            }
531            let ranges_in_memtable = m.ranges(
532                Some(&read_col_ids),
533                RangesOptions::default()
534                    .with_predicate(predicate.clone())
535                    .with_sequence(SequenceRange::new(
536                        self.request.memtable_min_sequence,
537                        self.request.memtable_max_sequence,
538                    ))
539                    .with_pre_filter_mode(filter_mode),
540            )?;
541            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
542                let stats = v.stats().clone();
543                MemRangeBuilder::new(v, stats)
544            }));
545        }
546
547        let region_id = self.region_id();
548        debug!(
549            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
550            region_id,
551            self.request,
552            time_range,
553            mem_range_builders.len(),
554            files.len(),
555            self.version.options.append_mode,
556        );
557
558        let (non_field_filters, field_filters) = self.partition_by_field_filters();
559        let inverted_index_appliers = [
560            self.build_invereted_index_applier(&non_field_filters),
561            self.build_invereted_index_applier(&field_filters),
562        ];
563        let bloom_filter_appliers = [
564            self.build_bloom_filter_applier(&non_field_filters),
565            self.build_bloom_filter_applier(&field_filters),
566        ];
567        let fulltext_index_appliers = [
568            self.build_fulltext_index_applier(&non_field_filters),
569            self.build_fulltext_index_applier(&field_filters),
570        ];
571        #[cfg(feature = "vector_index")]
572        let vector_index_applier = self.build_vector_index_applier();
573        #[cfg(feature = "vector_index")]
574        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
575            if self.request.filters.is_empty() {
576                search.k
577            } else {
578                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
579            }
580        });
581
582        let input = ScanInput::new(self.access_layer, mapper)
583            .with_time_range(Some(time_range))
584            .with_predicate(predicate)
585            .with_memtables(mem_range_builders)
586            .with_files(files)
587            .with_cache(self.cache_strategy)
588            .with_inverted_index_appliers(inverted_index_appliers)
589            .with_bloom_filter_index_appliers(bloom_filter_appliers)
590            .with_fulltext_index_appliers(fulltext_index_appliers)
591            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
592            .with_start_time(self.start_time)
593            .with_append_mode(self.version.options.append_mode)
594            .with_filter_deleted(self.filter_deleted)
595            .with_merge_mode(self.version.options.merge_mode())
596            .with_series_row_selector(self.request.series_row_selector)
597            .with_distribution(self.request.distribution)
598            .with_explain_flat_format(
599                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
600            )
601            .with_snapshot_sequence(
602                self.request
603                    .snapshot_on_scan
604                    .then_some(self.request.memtable_max_sequence)
605                    .flatten(),
606            )
607            .with_query_stat_counters(self.query_stat_counters);
608        #[cfg(feature = "vector_index")]
609        let input = input
610            .with_vector_index_applier(vector_index_applier)
611            .with_vector_index_k(vector_index_k);
612
613        #[cfg(feature = "enterprise")]
614        let input = if !self.request.skip_sst_files
615            && let Some(provider) = self.extension_range_provider
616        {
617            let ranges = provider
618                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
619                .await?;
620            debug!("Find extension ranges: {ranges:?}");
621            input.with_extension_ranges(ranges)
622        } else {
623            input
624        };
625        Ok(input)
626    }
627
628    fn region_id(&self) -> RegionId {
629        self.version.metadata.region_id
630    }
631
632    /// Build time range predicate from filters.
633    fn build_time_range_predicate(&self) -> TimestampRange {
634        let time_index = self.version.metadata.time_index_column();
635        let unit = time_index
636            .column_schema
637            .data_type
638            .as_timestamp()
639            .expect("Time index must have timestamp-compatible type")
640            .unit();
641        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
642    }
643
644    /// Partitions filters into two groups: non-field filters and field filters.
645    /// Returns `(non_field_filters, field_filters)`.
646    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
647        let field_columns = self
648            .version
649            .metadata
650            .field_columns()
651            .map(|col| &col.column_schema.name)
652            .collect::<HashSet<_>>();
653
654        let mut columns = HashSet::new();
655
656        self.request.filters.iter().cloned().partition(|expr| {
657            columns.clear();
658            // `expr_to_columns` won't return error.
659            if expr_to_columns(expr, &mut columns).is_err() {
660                // If we can't extract columns, treat it as non-field filter
661                return true;
662            }
663            // Return true for non-field filters (partition puts true cases in first vec)
664            !columns
665                .iter()
666                .any(|column| field_columns.contains(&column.name))
667        })
668    }
669
670    /// Use the latest schema to build the inverted index applier.
671    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
672        if self.ignore_inverted_index {
673            return None;
674        }
675
676        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
677        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
678
679        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
680
681        InvertedIndexApplierBuilder::new(
682            self.access_layer.table_dir().to_string(),
683            self.access_layer.path_type(),
684            self.access_layer.object_store().clone(),
685            self.version.metadata.as_ref(),
686            self.version.metadata.inverted_indexed_column_ids(
687                self.version
688                    .options
689                    .index_options
690                    .inverted_index
691                    .ignore_column_ids
692                    .iter(),
693            ),
694            self.access_layer.puffin_manager_factory().clone(),
695        )
696        .with_file_cache(file_cache)
697        .with_inverted_index_cache(inverted_index_cache)
698        .with_puffin_metadata_cache(puffin_metadata_cache)
699        .build(filters)
700        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
701        .ok()
702        .flatten()
703        .map(Arc::new)
704    }
705
706    /// Use the latest schema to build the bloom filter index applier.
707    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
708        if self.ignore_bloom_filter {
709            return None;
710        }
711
712        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
713        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
714        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
715
716        BloomFilterIndexApplierBuilder::new(
717            self.access_layer.table_dir().to_string(),
718            self.access_layer.path_type(),
719            self.access_layer.object_store().clone(),
720            self.version.metadata.as_ref(),
721            self.access_layer.puffin_manager_factory().clone(),
722        )
723        .with_file_cache(file_cache)
724        .with_bloom_filter_index_cache(bloom_filter_index_cache)
725        .with_puffin_metadata_cache(puffin_metadata_cache)
726        .build(filters)
727        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
728        .ok()
729        .flatten()
730        .map(Arc::new)
731    }
732
733    /// Use the latest schema to build the fulltext index applier.
734    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
735        if self.ignore_fulltext_index {
736            return None;
737        }
738
739        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
740        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
741        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
742        FulltextIndexApplierBuilder::new(
743            self.access_layer.table_dir().to_string(),
744            self.access_layer.path_type(),
745            self.access_layer.object_store().clone(),
746            self.access_layer.puffin_manager_factory().clone(),
747            self.version.metadata.as_ref(),
748        )
749        .with_file_cache(file_cache)
750        .with_puffin_metadata_cache(puffin_metadata_cache)
751        .with_bloom_filter_cache(bloom_filter_index_cache)
752        .build(filters)
753        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
754        .ok()
755        .flatten()
756        .map(Arc::new)
757    }
758
759    /// Build the vector index applier from vector search request.
760    #[cfg(feature = "vector_index")]
761    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
762        let vector_search = self.request.vector_search.as_ref()?;
763
764        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
765        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
766        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
767
768        let applier = VectorIndexApplier::new(
769            self.access_layer.table_dir().to_string(),
770            self.access_layer.path_type(),
771            self.access_layer.object_store().clone(),
772            self.access_layer.puffin_manager_factory().clone(),
773            vector_search.column_id,
774            vector_search.query_vector.clone(),
775            vector_search.metric,
776        )
777        .with_file_cache(file_cache)
778        .with_puffin_metadata_cache(puffin_metadata_cache)
779        .with_vector_index_cache(vector_index_cache);
780
781        Some(Arc::new(applier))
782    }
783}
784
785/// Returns true if the time range of a SST `file` matches the `predicate`.
786fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
787    if predicate == &TimestampRange::min_to_max() {
788        return true;
789    }
790    // end timestamp of a SST is inclusive.
791    let (start, end) = file.time_range();
792    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
793    file_ts_range.intersects(predicate)
794}
795
796/// Common input for different scanners.
797pub struct ScanInput {
798    /// Region SST access layer.
799    access_layer: AccessLayerRef,
800    /// Maps projected Batches to RecordBatches.
801    pub(crate) mapper: Arc<FlatProjectionMapper>,
802    /// The columns to read from memtables and SSTs.
803    /// Notice this is different from the columns in `mapper` which are projected columns.
804    /// But this read columns might also include non-projected columns needed for filtering.
805    pub(crate) read_cols: ReadColumns,
806    /// Time range filter for time index.
807    pub(crate) time_range: Option<TimestampRange>,
808    /// Predicate to push down.
809    pub(crate) predicate: PredicateGroup,
810    /// Region partition expr applied at read time.
811    region_partition_expr: Option<PartitionExpr>,
812    /// Memtable range builders for memtables in the time range..
813    pub(crate) memtables: Vec<MemRangeBuilder>,
814    /// Handles to SST files to scan.
815    pub(crate) files: Vec<FileHandle>,
816    /// Cache.
817    pub(crate) cache_strategy: CacheStrategy,
818    /// Ignores file not found error.
819    ignore_file_not_found: bool,
820    /// Maximum number of SST files to scan concurrently.
821    pub(crate) max_concurrent_scan_files: usize,
822    /// Index appliers.
823    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
824    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
825    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
826    /// Vector index applier for KNN search.
827    #[cfg(feature = "vector_index")]
828    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
829    /// Over-fetched k for vector index scan.
830    #[cfg(feature = "vector_index")]
831    pub(crate) vector_index_k: Option<usize>,
832    /// Start time of the query.
833    pub(crate) query_start: Option<Instant>,
834    /// The region is using append mode.
835    pub(crate) append_mode: bool,
836    /// Whether to remove deletion markers.
837    pub(crate) filter_deleted: bool,
838    /// Mode to merge duplicate rows.
839    pub(crate) merge_mode: MergeMode,
840    /// Hint to select rows from time series.
841    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
842    /// Hint for the required distribution of the scanner.
843    pub(crate) distribution: Option<TimeSeriesDistribution>,
844    /// Whether the region's configured SST format is flat.
845    explain_flat_format: bool,
846    /// Snapshot upper bound bound at scan open and propagated back to the caller.
847    pub(crate) snapshot_sequence: Option<SequenceNumber>,
848    /// Whether this scan is for compaction.
849    pub(crate) compaction: bool,
850    /// Counters that should receive query-load metrics.
851    pub(crate) query_stat_counters: Option<RegionQueryStatCounters>,
852    #[cfg(feature = "enterprise")]
853    extension_ranges: Vec<BoxedExtensionRange>,
854}
855
856impl ScanInput {
857    /// Creates a new [ScanInput].
858    #[must_use]
859    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
860        ScanInput {
861            access_layer,
862            read_cols: mapper.read_columns().clone(),
863            mapper: Arc::new(mapper),
864            time_range: None,
865            predicate: PredicateGroup::default(),
866            region_partition_expr: None,
867            memtables: Vec::new(),
868            files: Vec::new(),
869            cache_strategy: CacheStrategy::Disabled,
870            ignore_file_not_found: false,
871            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
872            inverted_index_appliers: [None, None],
873            bloom_filter_index_appliers: [None, None],
874            fulltext_index_appliers: [None, None],
875            #[cfg(feature = "vector_index")]
876            vector_index_applier: None,
877            #[cfg(feature = "vector_index")]
878            vector_index_k: None,
879            query_start: None,
880            append_mode: false,
881            filter_deleted: true,
882            merge_mode: MergeMode::default(),
883            series_row_selector: None,
884            distribution: None,
885            explain_flat_format: false,
886            snapshot_sequence: None,
887            compaction: false,
888            query_stat_counters: None,
889            #[cfg(feature = "enterprise")]
890            extension_ranges: Vec::new(),
891        }
892    }
893
894    /// Sets time range filter for time index.
895    #[must_use]
896    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
897        self.time_range = time_range;
898        self
899    }
900
901    /// Sets predicate to push down.
902    #[must_use]
903    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
904        self.region_partition_expr = predicate.region_partition_expr().cloned();
905        self.predicate = predicate;
906        self
907    }
908
909    /// Sets memtable range builders.
910    #[must_use]
911    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
912        self.memtables = memtables;
913        self
914    }
915
916    /// Sets files to read.
917    #[must_use]
918    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
919        self.files = files;
920        self
921    }
922
923    /// Sets cache for this query.
924    #[must_use]
925    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
926        self.cache_strategy = cache;
927        self
928    }
929
930    /// Ignores file not found error.
931    #[must_use]
932    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
933        self.ignore_file_not_found = ignore;
934        self
935    }
936
937    /// Sets maximum number of SST files to scan concurrently.
938    #[must_use]
939    pub(crate) fn with_max_concurrent_scan_files(
940        mut self,
941        max_concurrent_scan_files: usize,
942    ) -> Self {
943        self.max_concurrent_scan_files = max_concurrent_scan_files;
944        self
945    }
946
947    /// Sets inverted index appliers.
948    #[must_use]
949    pub(crate) fn with_inverted_index_appliers(
950        mut self,
951        appliers: [Option<InvertedIndexApplierRef>; 2],
952    ) -> Self {
953        self.inverted_index_appliers = appliers;
954        self
955    }
956
957    /// Sets bloom filter appliers.
958    #[must_use]
959    pub(crate) fn with_bloom_filter_index_appliers(
960        mut self,
961        appliers: [Option<BloomFilterIndexApplierRef>; 2],
962    ) -> Self {
963        self.bloom_filter_index_appliers = appliers;
964        self
965    }
966
967    /// Sets fulltext index appliers.
968    #[must_use]
969    pub(crate) fn with_fulltext_index_appliers(
970        mut self,
971        appliers: [Option<FulltextIndexApplierRef>; 2],
972    ) -> Self {
973        self.fulltext_index_appliers = appliers;
974        self
975    }
976
977    /// Sets vector index applier for KNN search.
978    #[cfg(feature = "vector_index")]
979    #[must_use]
980    pub(crate) fn with_vector_index_applier(
981        mut self,
982        applier: Option<VectorIndexApplierRef>,
983    ) -> Self {
984        self.vector_index_applier = applier;
985        self
986    }
987
988    /// Sets over-fetched k for vector index scan.
989    #[cfg(feature = "vector_index")]
990    #[must_use]
991    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
992        self.vector_index_k = k;
993        self
994    }
995
996    /// Sets start time of the query.
997    #[must_use]
998    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
999        self.query_start = now;
1000        self
1001    }
1002
1003    #[must_use]
1004    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1005        self.append_mode = is_append_mode;
1006        self
1007    }
1008
1009    pub(crate) fn with_query_stat_counters(
1010        mut self,
1011        counters: Option<RegionQueryStatCounters>,
1012    ) -> Self {
1013        self.query_stat_counters = counters;
1014        self
1015    }
1016
1017    /// Sets whether to remove deletion markers during scan.
1018    #[must_use]
1019    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1020        self.filter_deleted = filter_deleted;
1021        self
1022    }
1023
1024    /// Sets the merge mode.
1025    #[must_use]
1026    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1027        self.merge_mode = merge_mode;
1028        self
1029    }
1030
1031    /// Sets the distribution hint.
1032    #[must_use]
1033    pub(crate) fn with_distribution(
1034        mut self,
1035        distribution: Option<TimeSeriesDistribution>,
1036    ) -> Self {
1037        self.distribution = distribution;
1038        self
1039    }
1040
1041    /// Sets whether the region's configured SST format is flat for explain output.
1042    #[must_use]
1043    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1044        self.explain_flat_format = explain_flat_format;
1045        self
1046    }
1047
1048    /// Sets the time series row selector.
1049    #[must_use]
1050    pub(crate) fn with_series_row_selector(
1051        mut self,
1052        series_row_selector: Option<TimeSeriesRowSelector>,
1053    ) -> Self {
1054        self.series_row_selector = series_row_selector;
1055        self
1056    }
1057
1058    #[must_use]
1059    pub(crate) fn with_snapshot_sequence(
1060        mut self,
1061        snapshot_sequence: Option<SequenceNumber>,
1062    ) -> Self {
1063        self.snapshot_sequence = snapshot_sequence;
1064        self
1065    }
1066
1067    /// Sets whether this scan is for compaction.
1068    #[must_use]
1069    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1070        self.compaction = compaction;
1071        self
1072    }
1073
1074    /// Builds memtable ranges to scan by `index`.
1075    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1076        let memtable = &self.memtables[index.index];
1077        let mut ranges = SmallVec::new();
1078        memtable.build_ranges(index.row_group_index, &mut ranges);
1079        ranges
1080    }
1081
1082    pub(crate) fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1083        if self.should_skip_region_partition(file) {
1084            self.predicate.predicate_without_region().cloned()
1085        } else {
1086            self.predicate.predicate().cloned()
1087        }
1088    }
1089
1090    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1091        match (
1092            self.region_partition_expr.as_ref(),
1093            file.meta_ref().partition_expr.as_ref(),
1094        ) {
1095            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1096            _ => false,
1097        }
1098    }
1099
1100    /// Tries to build file-level pruning statistics using only the [FileHandle]'s manifest-level
1101    /// time range, without reading any parquet metadata.
1102    ///
1103    /// Returns `None` if timestamp unit conversion overflows (conservative: keep the file).
1104    fn try_file_level_pruning_stats(&self, file: &FileHandle) -> Option<FileLevelPruningStats> {
1105        let (ts_min, ts_max) = file.time_range();
1106        let time_index = self.mapper.metadata().time_index_column();
1107        let time_index_unit = time_index.column_schema.data_type.as_timestamp()?.unit();
1108
1109        // Convert file timestamps to the time index column's unit. Use `convert_to_ceil` for
1110        // the upper bound to avoid accidentally shrinking the manifest range.
1111        let min_ts = ts_min.convert_to(time_index_unit)?;
1112        let max_ts = ts_max.convert_to_ceil(time_index_unit)?;
1113
1114        Some(FileLevelPruningStats {
1115            min_scalar: timestamp_to_scalar_value(time_index_unit, Some(min_ts.value())),
1116            max_scalar: timestamp_to_scalar_value(time_index_unit, Some(max_ts.value())),
1117            time_index_col_name: time_index.column_schema.name.clone(),
1118        })
1119    }
1120
1121    /// Checks whether a file can be definitively pruned using only its manifest-level
1122    /// time range and the current predicate, without reading any parquet metadata.
1123    ///
1124    /// Returns `true` if [PruningStatistics] proves the file cannot contain matching rows.
1125    #[inline]
1126    pub(crate) fn can_manifest_prune_file(&self, file: &FileHandle) -> bool {
1127        let predicate = self.predicate_for_file(file);
1128        self.manifest_prunes_file(file, predicate.as_ref())
1129    }
1130
1131    fn manifest_prunes_file(&self, file: &FileHandle, predicate: Option<&Predicate>) -> bool {
1132        if let Some(pred) = predicate
1133            && !pred.is_empty()
1134            && let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
1135        {
1136            let pruning_results = pred.prune_with_stats(
1137                &file_level_stats,
1138                self.mapper.metadata().schema.arrow_schema(),
1139            );
1140            pruning_results.first() == Some(&false)
1141        } else {
1142            false
1143        }
1144    }
1145
1146    /// Prunes a file to scan and returns the builder to build readers.
1147    ///
1148    /// This is the public entry point used by direct tests and non-pruner callers.
1149    /// It performs its own manifest-level pruning check internally.
1150    #[tracing::instrument(
1151        skip_all,
1152        fields(
1153            region_id = %self.region_metadata().region_id,
1154            file_id = %file.file_id()
1155        )
1156    )]
1157    pub async fn prune_file(
1158        &self,
1159        file: &FileHandle,
1160        pre_filter_mode: PreFilterMode,
1161        reader_metrics: &mut ReaderMetrics,
1162    ) -> Result<FileRangeBuilder> {
1163        let predicate = self.predicate_for_file(file);
1164
1165        // Early file-level pruning using manifest time range before any parquet metadata access.
1166        if self.manifest_prunes_file(file, predicate.as_ref()) {
1167            reader_metrics.filter_metrics.files_time_range_pruned += 1;
1168            return Ok(FileRangeBuilder::default());
1169        }
1170
1171        self.prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics)
1172            .await
1173    }
1174
1175    /// Second half of `prune_file` — performs the actual parquet metadata /
1176    /// reader setup. Callers that already performed manifest-level pruning
1177    /// (e.g. the `Pruner` via its shared `manifest_pruned_files` cache) should
1178    /// call this directly to avoid a redundant manifest check.
1179    ///
1180    /// `predicate` is the result of `self.predicate_for_file(file)` computed
1181    /// externally so the caller can reuse it if needed.
1182    pub(crate) async fn prune_file_after_manifest_check(
1183        &self,
1184        file: &FileHandle,
1185        pre_filter_mode: PreFilterMode,
1186        predicate: Option<Predicate>,
1187        reader_metrics: &mut ReaderMetrics,
1188    ) -> Result<FileRangeBuilder> {
1189        let may_build_selective_row_selection = predicate.is_some();
1190        let decode_pk_values = !self.compaction
1191            && self
1192                .mapper
1193                .read_columns()
1194                .column_ids_iter()
1195                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1196        let reader = self
1197            .access_layer
1198            .read_sst(file.clone())
1199            .predicate(predicate)
1200            .projection(Some(self.read_cols.clone()))
1201            .cache(self.cache_strategy.clone())
1202            .inverted_index_appliers(self.inverted_index_appliers.clone())
1203            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1204            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1205        let reader = if !self.compaction && may_build_selective_row_selection {
1206            reader.deferred_optional_page_index()
1207        } else {
1208            reader
1209        };
1210        #[cfg(feature = "vector_index")]
1211        let reader = {
1212            let mut reader = reader;
1213            reader =
1214                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1215            reader
1216        };
1217        let res = reader
1218            .expected_metadata(Some(self.mapper.metadata().clone()))
1219            .compaction(self.compaction)
1220            .pre_filter_mode(pre_filter_mode)
1221            .decode_primary_key_values(decode_pk_values)
1222            .build_reader_input(reader_metrics)
1223            .await;
1224        let read_input = match res {
1225            Ok(x) => x,
1226            Err(e) => {
1227                if e.is_object_not_found() && self.ignore_file_not_found {
1228                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1229                    return Ok(FileRangeBuilder::default());
1230                } else {
1231                    return Err(e);
1232                }
1233            }
1234        };
1235
1236        let Some((mut file_range_ctx, selection)) = read_input else {
1237            return Ok(FileRangeBuilder::default());
1238        };
1239
1240        let need_compat = !compat::has_same_columns_and_pk_encoding(
1241            &self.mapper,
1242            file_range_ctx.read_format(),
1243            self.compaction,
1244        );
1245        if need_compat {
1246            // They have different schema. We need to adapt the batch first so the
1247            // mapper can convert it.
1248            let compat = FlatCompatBatch::try_new(
1249                &self.mapper,
1250                file_range_ctx.read_format(),
1251                self.compaction,
1252            )?;
1253            file_range_ctx.set_compat_batch(compat);
1254        }
1255        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1256    }
1257
1258    /// Scans flat sources (RecordBatch streams) in parallel.
1259    ///
1260    /// # Panics if the input doesn't allow parallel scan.
1261    #[tracing::instrument(
1262        skip(self, sources, semaphore),
1263        fields(
1264            region_id = %self.region_metadata().region_id,
1265            source_count = sources.len()
1266        )
1267    )]
1268    pub(crate) fn create_parallel_flat_sources(
1269        &self,
1270        sources: Vec<BoxedRecordBatchStream>,
1271        semaphore: Arc<Semaphore>,
1272        channel_size: usize,
1273    ) -> Result<Vec<BoxedRecordBatchStream>> {
1274        if sources.len() <= 1 {
1275            return Ok(sources);
1276        }
1277
1278        // Spawn a task for each source.
1279        let sources = sources
1280            .into_iter()
1281            .map(|source| {
1282                let (sender, receiver) = mpsc::channel(channel_size);
1283                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1284                let stream = Box::pin(ReceiverStream::new(receiver));
1285                Box::pin(stream) as _
1286            })
1287            .collect();
1288        Ok(sources)
1289    }
1290
1291    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1292    #[tracing::instrument(
1293        skip(self, input, semaphore, sender),
1294        fields(region_id = %self.region_metadata().region_id)
1295    )]
1296    pub(crate) fn spawn_flat_scan_task(
1297        &self,
1298        mut input: BoxedRecordBatchStream,
1299        semaphore: Arc<Semaphore>,
1300        sender: mpsc::Sender<Result<RecordBatch>>,
1301    ) {
1302        let region_id = self.region_metadata().region_id;
1303        let span = tracing::info_span!(
1304            "ScanInput::parallel_scan_task",
1305            region_id = %region_id,
1306            stream_kind = "flat"
1307        );
1308        common_runtime::spawn_query(
1309            async move {
1310                loop {
1311                    // We release the permit before sending result to avoid the task waiting on
1312                    // the channel with the permit held.
1313                    let maybe_batch = {
1314                        // Safety: We never close the semaphore.
1315                        let _permit = semaphore.acquire().await.unwrap();
1316                        input.next().await
1317                    };
1318                    match maybe_batch {
1319                        Some(Ok(batch)) => {
1320                            let _ = sender.send(Ok(batch)).await;
1321                        }
1322                        Some(Err(e)) => {
1323                            let _ = sender.send(Err(e)).await;
1324                            break;
1325                        }
1326                        None => break,
1327                    }
1328                }
1329            }
1330            .instrument(span),
1331        );
1332    }
1333
1334    pub(crate) fn total_rows(&self) -> usize {
1335        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1336        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1337
1338        let rows = rows_in_files + rows_in_memtables;
1339        #[cfg(feature = "enterprise")]
1340        let rows = rows
1341            + self
1342                .extension_ranges
1343                .iter()
1344                .map(|x| x.num_rows())
1345                .sum::<u64>() as usize;
1346        rows
1347    }
1348
1349    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1350        &self.predicate
1351    }
1352
1353    /// Returns number of memtables to scan.
1354    pub(crate) fn num_memtables(&self) -> usize {
1355        self.memtables.len()
1356    }
1357
1358    /// Returns number of SST files to scan.
1359    pub(crate) fn num_files(&self) -> usize {
1360        self.files.len()
1361    }
1362
1363    /// Gets the file handle from a row group index.
1364    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1365        let file_index = index.index - self.num_memtables();
1366        &self.files[file_index]
1367    }
1368
1369    pub fn region_metadata(&self) -> &RegionMetadataRef {
1370        self.mapper.metadata()
1371    }
1372
1373    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1374        if source_count <= 1 {
1375            // Duplicated rows in the same source is not a normal case and we don't provide
1376            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1377            // are exactly identical in the same source so we use PreFilterMode::All for
1378            // performance reason.
1379            return PreFilterMode::All;
1380        }
1381
1382        pre_filter_mode(self.append_mode, self.merge_mode)
1383    }
1384}
1385
1386#[cfg(feature = "enterprise")]
1387impl ScanInput {
1388    #[must_use]
1389    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1390        Self {
1391            extension_ranges,
1392            ..self
1393        }
1394    }
1395
1396    #[cfg(feature = "enterprise")]
1397    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1398        &self.extension_ranges
1399    }
1400
1401    /// Get a boxed [ExtensionRange] by the index in all ranges.
1402    #[cfg(feature = "enterprise")]
1403    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1404        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1405    }
1406}
1407
1408/// Lightweight [PruningStatistics] that only uses the file-level time range from manifest
1409/// metadata, avoiding any parquet metadata reads. Used for early file-level pruning before
1410/// accessing row-group-level statistics.
1411pub(crate) struct FileLevelPruningStats {
1412    /// Scalar value for the file's minimum timestamp in the time index column's unit.
1413    pub(crate) min_scalar: ScalarValue,
1414    /// Scalar value for the file's maximum timestamp in the time index column's unit.
1415    pub(crate) max_scalar: ScalarValue,
1416    /// Name of the time index column.
1417    pub(crate) time_index_col_name: String,
1418}
1419
1420impl PruningStatistics for FileLevelPruningStats {
1421    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1422        if column.name == self.time_index_col_name {
1423            ScalarValue::iter_to_array(std::iter::once(self.min_scalar.clone())).ok()
1424        } else {
1425            None
1426        }
1427    }
1428
1429    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1430        if column.name == self.time_index_col_name {
1431            ScalarValue::iter_to_array(std::iter::once(self.max_scalar.clone())).ok()
1432        } else {
1433            None
1434        }
1435    }
1436
1437    fn num_containers(&self) -> usize {
1438        1
1439    }
1440
1441    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
1442        if column.name == self.time_index_col_name {
1443            // The time index column is NOT NULL.
1444            Some(Arc::new(UInt64Array::from(vec![0u64])))
1445        } else {
1446            None
1447        }
1448    }
1449
1450    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1451        None
1452    }
1453
1454    fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
1455        None
1456    }
1457}
1458
1459#[cfg(test)]
1460impl ScanInput {
1461    /// Returns SST file ids to scan.
1462    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1463        self.files.iter().map(|file| file.file_id()).collect()
1464    }
1465
1466    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1467        self.files.iter().map(|file| file.index_id()).collect()
1468    }
1469}
1470
1471fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1472    if append_mode {
1473        return PreFilterMode::All;
1474    }
1475
1476    match merge_mode {
1477        MergeMode::LastRow => PreFilterMode::SkipFields,
1478        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1479    }
1480}
1481
1482fn narrow_read_columns_by_json_type_hint(
1483    read_columns: &mut ReadColumns,
1484    json_type_hint: &HashMap<String, JsonNativeType>,
1485    metadata: &RegionMetadata,
1486) {
1487    if json_type_hint.is_empty() {
1488        return;
1489    }
1490
1491    for read_column in &mut read_columns.cols {
1492        let Some(column) = metadata.column_by_id(read_column.column_id) else {
1493            continue;
1494        };
1495        let column_name = &column.column_schema.name;
1496        let Some(json_type) = json_type_hint.get(column_name) else {
1497            continue;
1498        };
1499
1500        let mut paths = Vec::new();
1501        let mut current = vec![column_name.clone()];
1502        collect_json_nested_paths(json_type, &mut current, &mut paths);
1503        merge_nested_paths(&mut read_column.nested_paths, paths)
1504    }
1505}
1506
1507fn collect_json_nested_paths(
1508    json_type: &JsonNativeType,
1509    current: &mut NestedPath,
1510    paths: &mut Vec<NestedPath>,
1511) {
1512    match json_type {
1513        JsonNativeType::Object(fields) if !fields.is_empty() => {
1514            for (field, child) in fields {
1515                current.push(field.clone());
1516                collect_json_nested_paths(child, current, paths);
1517                current.pop();
1518            }
1519        }
1520        _ => paths.push(current.clone()),
1521    }
1522}
1523
1524/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
1525/// implied time range used to decide whether the cache key can drop the time
1526/// predicates for a given partition (see `build_range_cache_key`).
1527pub(crate) struct ScanFingerprintBundle {
1528    pub(crate) fingerprint: ScanRequestFingerprint,
1529    /// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the
1530    /// column's `TimeUnit`).
1531    /// `None`    = at least one time-only predicate could not be proven (e.g.
1532    /// `OR`), so the cache-key optimization is disabled for this scan.
1533    pub(crate) implied_time_range: Option<TimestampRange>,
1534}
1535
1536/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible
1537/// for partition range caching.
1538pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1539    let eligible = !input.compaction
1540        && !input.files.is_empty()
1541        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1542
1543    if !eligible {
1544        return None;
1545    }
1546
1547    let metadata = input.region_metadata();
1548    let tag_names: HashSet<&str> = metadata
1549        .column_metadatas
1550        .iter()
1551        .filter(|col| col.semantic_type == SemanticType::Tag)
1552        .map(|col| col.column_schema.name.as_str())
1553        .collect();
1554
1555    let time_index = metadata.time_index_column();
1556    let time_index_name = time_index.column_schema.name.clone();
1557    let ts_col_unit = time_index
1558        .column_schema
1559        .data_type
1560        .as_timestamp()
1561        .expect("Time index must have timestamp-compatible type")
1562        .unit();
1563
1564    let exprs = input
1565        .predicate_group()
1566        .predicate_without_region()
1567        .map(|predicate| predicate.exprs())
1568        .unwrap_or_default();
1569
1570    let mut filters = Vec::new();
1571    let mut time_only_exprs: Vec<&Expr> = Vec::new();
1572    let mut has_tag_filter = false;
1573    let mut columns = HashSet::new();
1574
1575    for expr in exprs {
1576        columns.clear();
1577        let is_time_only = match expr_to_columns(expr, &mut columns) {
1578            Ok(()) if !columns.is_empty() => {
1579                has_tag_filter |= columns
1580                    .iter()
1581                    .any(|col| tag_names.contains(col.name.as_str()));
1582                columns.iter().all(|col| col.name == time_index_name)
1583            }
1584            _ => false,
1585        };
1586
1587        // Route time-only exprs that the legacy extractor recognizes into
1588        // `time_only_exprs` so the implication walker
1589        // (`implied_time_range_from_exprs`, called below) can attempt to drop
1590        // them from the cache key when the partition's `FileTimeRange` is fully
1591        // covered, then stringify them into the fingerprint's `time_filters`
1592        // bucket. Time-only exprs that the extractor doesn't recognize stay in
1593        // `filters` and never get stripped — conservatively correct.
1594        if is_time_only
1595            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1596        {
1597            time_only_exprs.push(expr);
1598        } else {
1599            filters.push(expr.to_string());
1600        }
1601    }
1602
1603    if !has_tag_filter {
1604        // We only cache requests that have tag filters to avoid caching all series.
1605        return None;
1606    }
1607
1608    let implied_time_range =
1609        implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1610    let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1611
1612    // Ensure the filters are sorted for consistent fingerprinting.
1613    filters.sort_unstable();
1614    time_filters.sort_unstable();
1615    let read_columns = input.read_cols.clone();
1616    let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1617        read_column_types: read_columns
1618            .column_ids_iter()
1619            .map(|id| {
1620                metadata
1621                    .column_by_id(id)
1622                    .map(|col| col.column_schema.data_type.clone())
1623            })
1624            .collect(),
1625        read_columns,
1626        filters,
1627        time_filters,
1628        series_row_selector: input.series_row_selector,
1629        append_mode: input.append_mode,
1630        filter_deleted: input.filter_deleted,
1631        merge_mode: input.merge_mode,
1632        partition_expr_version: metadata.partition_expr_version,
1633    }
1634    .build();
1635
1636    Some(ScanFingerprintBundle {
1637        fingerprint,
1638        implied_time_range,
1639    })
1640}
1641
1642/// Context shared by different streams from a scanner.
1643/// It contains the input and ranges to scan.
1644pub struct StreamContext {
1645    /// Input memtables and files.
1646    pub input: ScanInput,
1647    /// Metadata for partition ranges.
1648    pub(crate) ranges: Vec<RangeMeta>,
1649    /// Precomputed scan fingerprint for partition range caching.
1650    /// `None` when the scan is not eligible for caching.
1651    #[allow(dead_code)]
1652    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1653    /// Implied range of every time-only predicate, in the time index column's
1654    /// `TimeUnit`. Used by `build_range_cache_key` to decide whether the
1655    /// partition's `FileTimeRange` is fully covered (allowing `time_filters`
1656    /// to be stripped from the cache key). `None` when caching is ineligible
1657    /// or when the implication walker bailed on an unsupported shape (e.g.
1658    /// `OR`).
1659    pub(crate) scan_implied_time_range: Option<TimestampRange>,
1660
1661    // Metrics:
1662    /// The start time of the query.
1663    pub(crate) query_start: Instant,
1664}
1665
1666impl StreamContext {
1667    /// Creates a new [StreamContext] for [SeqScan].
1668    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1669        let query_start = input.query_start.unwrap_or_else(Instant::now);
1670        let ranges = RangeMeta::seq_scan_ranges(&input);
1671        READ_SST_COUNT.observe(input.num_files() as f64);
1672        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1673            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1674            None => (None, None),
1675        };
1676
1677        Self {
1678            input,
1679            ranges,
1680            scan_fingerprint,
1681            scan_implied_time_range,
1682            query_start,
1683        }
1684    }
1685
1686    /// Creates a new [StreamContext] for [UnorderedScan].
1687    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1688        let query_start = input.query_start.unwrap_or_else(Instant::now);
1689        let ranges = RangeMeta::unordered_scan_ranges(&input);
1690        READ_SST_COUNT.observe(input.num_files() as f64);
1691        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1692            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1693            None => (None, None),
1694        };
1695
1696        Self {
1697            input,
1698            ranges,
1699            scan_fingerprint,
1700            scan_implied_time_range,
1701            query_start,
1702        }
1703    }
1704
1705    /// Returns true if the index refers to a memtable.
1706    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1707        self.input.num_memtables() > index.index
1708    }
1709
1710    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1711        !self.is_mem_range_index(index)
1712            && index.index < self.input.num_files() + self.input.num_memtables()
1713    }
1714
1715    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1716        let range_meta = &self.ranges[part_range.identifier];
1717        let source_count = range_meta.indices.len();
1718
1719        self.input.range_pre_filter_mode(source_count)
1720    }
1721
1722    /// Retrieves the partition ranges.
1723    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1724        self.ranges
1725            .iter()
1726            .enumerate()
1727            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1728            .collect()
1729    }
1730
1731    /// Format the context for explain.
1732    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1733        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1734        for range_meta in &self.ranges {
1735            for idx in &range_meta.row_group_indices {
1736                if self.is_mem_range_index(*idx) {
1737                    num_mem_ranges += 1;
1738                } else if self.is_file_range_index(*idx) {
1739                    num_file_ranges += 1;
1740                } else {
1741                    num_other_ranges += 1;
1742                }
1743            }
1744        }
1745        if verbose {
1746            write!(f, "{{")?;
1747        }
1748        write!(
1749            f,
1750            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1751            self.ranges.len(),
1752            num_mem_ranges,
1753            self.input.num_files(),
1754            num_file_ranges,
1755        )?;
1756        if num_other_ranges > 0 {
1757            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1758        }
1759        write!(f, "}}")?;
1760
1761        if let Some(selector) = &self.input.series_row_selector {
1762            write!(f, ", \"selector\":\"{}\"", selector)?;
1763        }
1764        if let Some(distribution) = &self.input.distribution {
1765            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1766        }
1767
1768        if verbose {
1769            self.format_verbose_content(f)?;
1770        }
1771
1772        Ok(())
1773    }
1774
1775    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1776        struct FileWrapper<'a> {
1777            file: &'a FileHandle,
1778        }
1779
1780        impl fmt::Debug for FileWrapper<'_> {
1781            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1782                let (start, end) = self.file.time_range();
1783                write!(
1784                    f,
1785                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1786                    self.file.file_id(),
1787                    start.value(),
1788                    start.unit(),
1789                    end.value(),
1790                    end.unit(),
1791                    self.file.num_rows(),
1792                    self.file.size(),
1793                    self.file.index_size()
1794                )
1795            }
1796        }
1797
1798        struct InputWrapper<'a> {
1799            input: &'a ScanInput,
1800        }
1801
1802        #[cfg(feature = "enterprise")]
1803        impl InputWrapper<'_> {
1804            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1805                if self.input.extension_ranges.is_empty() {
1806                    return Ok(());
1807                }
1808
1809                let mut delimiter = "";
1810                write!(f, ", extension_ranges: [")?;
1811                for range in self.input.extension_ranges() {
1812                    write!(f, "{}{:?}", delimiter, range)?;
1813                    delimiter = ", ";
1814                }
1815                write!(f, "]")?;
1816                Ok(())
1817            }
1818        }
1819
1820        impl fmt::Debug for InputWrapper<'_> {
1821            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1822                let output_schema = self.input.mapper.output_schema();
1823                if !output_schema.is_empty() {
1824                    let names: Vec<_> = output_schema
1825                        .column_schemas()
1826                        .iter()
1827                        .map(|col| &col.name)
1828                        .collect();
1829                    write!(f, ", \"projection\": {:?}", names)?;
1830                }
1831                if let Some(predicate) = &self.input.predicate.predicate() {
1832                    if !predicate.exprs().is_empty() {
1833                        let exprs: Vec<_> =
1834                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1835                        write!(f, ", \"filters\": {:?}", exprs)?;
1836                    }
1837                    if !predicate.dyn_filters().is_empty() {
1838                        let dyn_filters: Vec<_> = predicate
1839                            .dyn_filters()
1840                            .iter()
1841                            .map(|f| format!("{}", f))
1842                            .collect();
1843                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1844                    }
1845                }
1846                #[cfg(feature = "vector_index")]
1847                if let Some(vector_index_k) = self.input.vector_index_k {
1848                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1849                }
1850                if !self.input.files.is_empty() {
1851                    write!(f, ", \"files\": ")?;
1852                    f.debug_list()
1853                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1854                        .finish()?;
1855                }
1856                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1857                #[cfg(feature = "enterprise")]
1858                self.format_extension_ranges(f)?;
1859
1860                Ok(())
1861            }
1862        }
1863
1864        write!(f, "{:?}", InputWrapper { input: &self.input })
1865    }
1866
1867    /// Add new dynamic filters to the predicates.
1868    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1869    pub(crate) fn add_dyn_filter_to_predicate(
1870        self: &Arc<Self>,
1871        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1872    ) -> Vec<bool> {
1873        let mut supported = Vec::with_capacity(filter_exprs.len());
1874        let filter_expr = filter_exprs
1875            .into_iter()
1876            .filter_map(|expr| {
1877                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1878                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1879            {
1880                supported.push(true);
1881                Some(dyn_filter)
1882            } else {
1883                supported.push(false);
1884                None
1885            }
1886            })
1887            .collect();
1888        self.input.predicate.add_dyn_filters(filter_expr);
1889        supported
1890    }
1891}
1892
1893/// Predicates to evaluate.
1894/// It only keeps filters that [SimpleFilterEvaluator] supports.
1895#[derive(Clone, Default)]
1896pub struct PredicateGroup {
1897    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1898    /// Predicate that includes request filters and region partition expr (if any).
1899    predicate_all: Predicate,
1900    /// Predicate that only includes request filters.
1901    predicate_without_region: Predicate,
1902    /// Region partition expression restored from metadata.
1903    region_partition_expr: Option<PartitionExpr>,
1904}
1905
1906impl PredicateGroup {
1907    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1908    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1909        let mut combined_exprs = exprs.to_vec();
1910        let mut region_partition_expr = None;
1911
1912        if let Some(expr_json) = metadata.partition_expr.as_ref()
1913            && !expr_json.is_empty()
1914            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1915                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1916        {
1917            let logical_expr = expr
1918                .try_as_logical_expr()
1919                .context(InvalidPartitionExprSnafu {
1920                    expr: expr_json.clone(),
1921                })?;
1922
1923            combined_exprs.push(logical_expr);
1924            region_partition_expr = Some(expr);
1925        }
1926
1927        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1928        // Columns in the expr.
1929        let mut columns = HashSet::new();
1930        for expr in &combined_exprs {
1931            columns.clear();
1932            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1933                continue;
1934            };
1935            time_filters.push(filter);
1936        }
1937        let time_filters = if time_filters.is_empty() {
1938            None
1939        } else {
1940            Some(Arc::new(time_filters))
1941        };
1942
1943        let predicate_all = Predicate::new(combined_exprs);
1944        let predicate_without_region = Predicate::new(exprs.to_vec());
1945
1946        Ok(Self {
1947            time_filters,
1948            predicate_all,
1949            predicate_without_region,
1950            region_partition_expr,
1951        })
1952    }
1953
1954    /// Returns time filters.
1955    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1956        self.time_filters.clone()
1957    }
1958
1959    /// Returns predicate of all exprs (including region partition expr if present).
1960    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1961        if self.predicate_all.is_empty() {
1962            None
1963        } else {
1964            Some(&self.predicate_all)
1965        }
1966    }
1967
1968    /// Returns predicate that excludes region partition expr.
1969    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1970        if self.predicate_without_region.is_empty() {
1971            None
1972        } else {
1973            Some(&self.predicate_without_region)
1974        }
1975    }
1976
1977    /// Add dynamic filters in the predicates.
1978    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1979        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1980        self.predicate_without_region.add_dyn_filters(dyn_filters);
1981    }
1982
1983    /// Returns the region partition expr from metadata, if any.
1984    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1985        self.region_partition_expr.as_ref()
1986    }
1987
1988    fn expr_to_filter(
1989        expr: &Expr,
1990        metadata: &RegionMetadata,
1991        columns: &mut HashSet<Column>,
1992    ) -> Option<SimpleFilterEvaluator> {
1993        columns.clear();
1994        // `expr_to_columns` won't return error.
1995        // We still ignore these expressions for safety.
1996        expr_to_columns(expr, columns).ok()?;
1997        if columns.len() > 1 {
1998            // Simple filter doesn't support multiple columns.
1999            return None;
2000        }
2001        let column = columns.iter().next()?;
2002        let column_meta = metadata.column_by_name(&column.name)?;
2003        if column_meta.semantic_type == SemanticType::Timestamp {
2004            SimpleFilterEvaluator::try_new(expr)
2005        } else {
2006            None
2007        }
2008    }
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013    use std::sync::Arc;
2014
2015    use common_time::timestamp::{TimeUnit, Timestamp};
2016    use datafusion::physical_plan::expressions::{
2017        binary as physical_binary, col as physical_col, lit as physical_lit,
2018    };
2019    use datafusion_common::ScalarValue;
2020    use datafusion_expr::{Operator, col, lit};
2021    use datatypes::arrow::datatypes::{
2022        DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
2023    };
2024    use datatypes::prelude::ConcreteDataType;
2025    use datatypes::schema::ColumnSchema;
2026    use datatypes::types::json_type::JsonObjectType;
2027    use datatypes::value::Value;
2028    use partition::expr::col as partition_col;
2029    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2030    use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
2031
2032    use super::*;
2033    use crate::cache::CacheManager;
2034    use crate::error::InvalidMetadataSnafu;
2035    use crate::read::range_cache::ScanRequestFingerprintBuilder;
2036    use crate::read::read_columns::ReadColumn;
2037    use crate::sst::file::FileMeta;
2038    use crate::test_util::memtable_util::metadata_with_primary_key;
2039    use crate::test_util::scheduler_util::SchedulerEnv;
2040
2041    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
2042        let env = SchedulerEnv::new().await;
2043        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2044        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
2045        let file = FileHandle::new(
2046            crate::sst::file::FileMeta::default(),
2047            Arc::new(crate::sst::file_purger::NoopFilePurger),
2048        );
2049
2050        ScanInput::new(env.access_layer.clone(), mapper)
2051            .with_predicate(predicate)
2052            .with_cache(CacheStrategy::EnableAll(Arc::new(
2053                CacheManager::builder()
2054                    .range_result_cache_size(1024)
2055                    .build(),
2056            )))
2057            .with_files(vec![file])
2058    }
2059
2060    /// Helper to create a timestamp millisecond literal.
2061    fn ts_lit(val: i64) -> datafusion_expr::Expr {
2062        lit(ScalarValue::TimestampMillisecond(Some(val), None))
2063    }
2064
2065    fn metadata_with_time_index_unit(unit: TimeUnit) -> RegionMetadataRef {
2066        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2067        builder
2068            .push_column_metadata(ColumnMetadata {
2069                column_schema: ColumnSchema::new(
2070                    "k0".to_string(),
2071                    ConcreteDataType::string_datatype(),
2072                    false,
2073                ),
2074                semantic_type: SemanticType::Tag,
2075                column_id: 0,
2076            })
2077            .push_column_metadata(ColumnMetadata {
2078                column_schema: ColumnSchema::new(
2079                    "k1".to_string(),
2080                    ConcreteDataType::uint32_datatype(),
2081                    false,
2082                ),
2083                semantic_type: SemanticType::Tag,
2084                column_id: 1,
2085            })
2086            .push_column_metadata(ColumnMetadata {
2087                column_schema: ColumnSchema::new(
2088                    "ts".to_string(),
2089                    ConcreteDataType::timestamp_datatype(unit),
2090                    false,
2091                ),
2092                semantic_type: SemanticType::Timestamp,
2093                column_id: 2,
2094            })
2095            .push_column_metadata(ColumnMetadata {
2096                column_schema: ColumnSchema::new(
2097                    "v0".to_string(),
2098                    ConcreteDataType::int64_datatype(),
2099                    true,
2100                ),
2101                semantic_type: SemanticType::Field,
2102                column_id: 3,
2103            })
2104            .primary_key(vec![0, 1]);
2105
2106        Arc::new(builder.build().unwrap())
2107    }
2108
2109    fn file_handle_with_time_range(start: Timestamp, end: Timestamp) -> FileHandle {
2110        FileHandle::new(
2111            FileMeta {
2112                time_range: (start, end),
2113                ..Default::default()
2114            },
2115            Arc::new(crate::sst::file_purger::NoopFilePurger),
2116        )
2117    }
2118
2119    #[test]
2120    fn test_fill_json_nested_paths_from_hint() -> Result<()> {
2121        fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
2122            let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
2123            builder
2124                .push_column_metadata(ColumnMetadata {
2125                    column_schema: ColumnSchema::new(
2126                        "tag".to_string(),
2127                        ConcreteDataType::string_datatype(),
2128                        true,
2129                    ),
2130                    semantic_type: SemanticType::Tag,
2131                    column_id: 0,
2132                })
2133                .push_column_metadata(ColumnMetadata {
2134                    column_schema: ColumnSchema::new(
2135                        "j".to_string(),
2136                        ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
2137                        true,
2138                    ),
2139                    semantic_type: SemanticType::Field,
2140                    column_id: 1,
2141                })
2142                .push_column_metadata(ColumnMetadata {
2143                    column_schema: ColumnSchema::new(
2144                        "ts".to_string(),
2145                        ConcreteDataType::timestamp_millisecond_datatype(),
2146                        false,
2147                    ),
2148                    semantic_type: SemanticType::Timestamp,
2149                    column_id: 2,
2150                });
2151            builder.primary_key(vec![0]);
2152            builder.build().context(InvalidMetadataSnafu).map(Arc::new)
2153        }
2154
2155        let metadata = json_projection_test_metadata()?;
2156        let hint = HashMap::from([(
2157            "j".to_string(),
2158            JsonNativeType::Object(JsonObjectType::from([
2159                ("a".to_string(), JsonNativeType::i64()),
2160                (
2161                    "b".to_string(),
2162                    JsonNativeType::Object(JsonObjectType::from([(
2163                        "c".to_string(),
2164                        JsonNativeType::String,
2165                    )])),
2166                ),
2167            ])),
2168        )]);
2169
2170        fn nested_path(parts: &[&str]) -> NestedPath {
2171            parts.iter().map(|part| part.to_string()).collect()
2172        }
2173
2174        let mut read_columns = ReadColumns {
2175            cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
2176        };
2177        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2178        assert_eq!(
2179            read_columns,
2180            ReadColumns {
2181                cols: vec![
2182                    ReadColumn::new(
2183                        1,
2184                        vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
2185                    ),
2186                    ReadColumn::new(0, vec![])
2187                ]
2188            }
2189        );
2190
2191        let mut read_columns = ReadColumns {
2192            cols: vec![ReadColumn::new(0, vec![])],
2193        };
2194        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2195        assert_eq!(
2196            read_columns,
2197            ReadColumns {
2198                cols: vec![ReadColumn::new(0, vec![])]
2199            }
2200        );
2201        Ok(())
2202    }
2203
2204    #[tokio::test]
2205    async fn test_build_scan_fingerprint_for_eligible_scan() {
2206        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2207        let input = new_scan_input(
2208            metadata.clone(),
2209            vec![
2210                col("ts").gt_eq(ts_lit(1000)),
2211                col("k0").eq(lit("foo")),
2212                col("v0").gt(lit(1)),
2213            ],
2214        )
2215        .await
2216        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2217        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2218        .with_merge_mode(MergeMode::LastNonNull)
2219        .with_filter_deleted(false);
2220
2221        let fingerprint = build_scan_fingerprint(&input).unwrap();
2222
2223        let expected = ScanRequestFingerprintBuilder {
2224            read_columns: input.read_cols,
2225            read_column_types: vec![
2226                metadata
2227                    .column_by_id(0)
2228                    .map(|col| col.column_schema.data_type.clone()),
2229                metadata
2230                    .column_by_id(2)
2231                    .map(|col| col.column_schema.data_type.clone()),
2232                metadata
2233                    .column_by_id(3)
2234                    .map(|col| col.column_schema.data_type.clone()),
2235            ],
2236            filters: vec![
2237                col("k0").eq(lit("foo")).to_string(),
2238                col("v0").gt(lit(1)).to_string(),
2239            ],
2240            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2241            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2242            append_mode: false,
2243            filter_deleted: false,
2244            merge_mode: MergeMode::LastNonNull,
2245            partition_expr_version: 0,
2246        }
2247        .build();
2248        assert_eq!(expected, fingerprint.fingerprint);
2249    }
2250
2251    #[tokio::test]
2252    async fn test_build_scan_fingerprint_requires_tag_filter() {
2253        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2254        let input = new_scan_input(
2255            metadata,
2256            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2257        )
2258        .await;
2259
2260        assert!(build_scan_fingerprint(&input).is_none());
2261    }
2262
2263    #[tokio::test]
2264    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2265        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2266        let filters = vec![col("k0").eq(lit("foo"))];
2267
2268        let disabled = ScanInput::new(
2269            SchedulerEnv::new().await.access_layer.clone(),
2270            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2271        )
2272        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2273        assert!(build_scan_fingerprint(&disabled).is_none());
2274
2275        let compaction = new_scan_input(metadata.clone(), filters.clone())
2276            .await
2277            .with_compaction(true);
2278        assert!(build_scan_fingerprint(&compaction).is_none());
2279
2280        // No files to read.
2281        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2282        assert!(build_scan_fingerprint(&no_files).is_none());
2283    }
2284
2285    #[tokio::test]
2286    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2287        let base = metadata_with_primary_key(vec![0, 1], false);
2288        let mut builder = RegionMetadataBuilder::from_existing(base);
2289        let partition_expr = partition_col("k0")
2290            .gt_eq(Value::String("foo".into()))
2291            .as_json_str()
2292            .unwrap();
2293        builder.partition_expr_json(Some(partition_expr));
2294        let metadata = Arc::new(builder.build_without_validation().unwrap());
2295
2296        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2297        let fingerprint = build_scan_fingerprint(&input).unwrap();
2298
2299        let expected = ScanRequestFingerprintBuilder {
2300            read_columns: input.read_cols,
2301            read_column_types: vec![
2302                metadata
2303                    .column_by_id(0)
2304                    .map(|col| col.column_schema.data_type.clone()),
2305                metadata
2306                    .column_by_id(2)
2307                    .map(|col| col.column_schema.data_type.clone()),
2308                metadata
2309                    .column_by_id(3)
2310                    .map(|col| col.column_schema.data_type.clone()),
2311            ],
2312            filters: vec![col("k0").eq(lit("foo")).to_string()],
2313            time_filters: vec![],
2314            series_row_selector: None,
2315            append_mode: false,
2316            filter_deleted: true,
2317            merge_mode: MergeMode::LastRow,
2318            partition_expr_version: metadata.partition_expr_version,
2319        }
2320        .build();
2321        assert_eq!(expected, fingerprint.fingerprint);
2322        assert_ne!(0, metadata.partition_expr_version);
2323    }
2324
2325    #[test]
2326    fn test_update_dyn_filters_with_empty_base_predicates() {
2327        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2328        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2329        assert!(predicate_group.predicate().is_none());
2330        assert!(predicate_group.predicate_without_region().is_none());
2331
2332        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2333        predicate_group.add_dyn_filters(vec![dyn_filter]);
2334
2335        let predicate_all = predicate_group.predicate().unwrap();
2336        assert!(predicate_all.exprs().is_empty());
2337        assert_eq!(1, predicate_all.dyn_filters().len());
2338
2339        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2340        assert!(predicate_without_region.exprs().is_empty());
2341        assert_eq!(1, predicate_without_region.dyn_filters().len());
2342    }
2343
2344    #[test]
2345    fn test_file_level_pruning_stats_prunes_old_file() {
2346        let ts_col_name = "ts";
2347        let predicate = Predicate::new(vec![col(ts_col_name).gt(ts_lit(1000))]);
2348        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2349            ts_col_name,
2350            ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2351            false,
2352        )]));
2353
2354        // File with time range [0ms, 500ms] is completely before `ts > 1000ms`.
2355        let stats = FileLevelPruningStats {
2356            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2357            max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2358            time_index_col_name: ts_col_name.to_string(),
2359        };
2360        assert_eq!(
2361            vec![false],
2362            predicate.prune_with_stats(&stats, &arrow_schema)
2363        );
2364
2365        // File with time range [0ms, 2000ms] overlaps `ts > 1000ms`, so keep it.
2366        let stats = FileLevelPruningStats {
2367            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2368            max_scalar: ScalarValue::TimestampMillisecond(Some(2000), None),
2369            time_index_col_name: ts_col_name.to_string(),
2370        };
2371        assert_eq!(
2372            vec![true],
2373            predicate.prune_with_stats(&stats, &arrow_schema)
2374        );
2375    }
2376
2377    #[test]
2378    fn test_file_level_pruning_stats_no_predicate_keeps_all() {
2379        let predicate = Predicate::new(vec![]);
2380        assert!(predicate.is_empty());
2381
2382        let stats = FileLevelPruningStats {
2383            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2384            max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2385            time_index_col_name: "ts".to_string(),
2386        };
2387        let arrow_schema = Arc::new(ArrowSchema::new(Vec::<Field>::new()));
2388        assert_eq!(
2389            vec![true],
2390            predicate.prune_with_stats(&stats, &arrow_schema)
2391        );
2392    }
2393
2394    #[tokio::test]
2395    async fn test_file_level_pruning_stats_ceil_max_unit_conversion() {
2396        let metadata = metadata_with_time_index_unit(TimeUnit::Millisecond);
2397        let input = new_scan_input(metadata, vec![]).await;
2398        let file = file_handle_with_time_range(
2399            Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2400            Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2401        );
2402
2403        let stats = input.try_file_level_pruning_stats(&file).unwrap();
2404        assert_eq!(
2405            ScalarValue::TimestampMillisecond(Some(1), None),
2406            stats.min_scalar
2407        );
2408        assert_eq!(
2409            ScalarValue::TimestampMillisecond(Some(2), None),
2410            stats.max_scalar
2411        );
2412
2413        // The actual max timestamp is slightly greater than 1ms. It must be kept for `ts > 1ms`.
2414        let predicate = Predicate::new(vec![col("ts").gt(ts_lit(1))]);
2415        assert_eq!(
2416            vec![true],
2417            predicate.prune_with_stats(&stats, input.mapper.metadata().schema.arrow_schema())
2418        );
2419    }
2420
2421    #[tokio::test]
2422    async fn test_file_level_pruning_stats_overflow_keeps_file() {
2423        let metadata = metadata_with_time_index_unit(TimeUnit::Nanosecond);
2424        let input = new_scan_input(metadata, vec![]).await;
2425        let file = file_handle_with_time_range(
2426            Timestamp::new(0, TimeUnit::Second),
2427            Timestamp::new(i64::MAX, TimeUnit::Second),
2428        );
2429
2430        assert!(input.try_file_level_pruning_stats(&file).is_none());
2431    }
2432
2433    #[test]
2434    fn test_file_level_pruning_stats_keeps_inclusive_boundary() {
2435        let ts_col_name = "ts";
2436        let predicate = Predicate::new(vec![col(ts_col_name).gt_eq(ts_lit(1000))]);
2437        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2438            ts_col_name,
2439            ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2440            false,
2441        )]));
2442        let stats = FileLevelPruningStats {
2443            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2444            max_scalar: ScalarValue::TimestampMillisecond(Some(1000), None),
2445            time_index_col_name: ts_col_name.to_string(),
2446        };
2447
2448        assert_eq!(
2449            vec![true],
2450            predicate.prune_with_stats(&stats, &arrow_schema)
2451        );
2452    }
2453
2454    #[tokio::test]
2455    async fn test_file_level_pruning_with_dyn_filter_only_predicate() {
2456        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2457        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2458        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2459        predicate_group.add_dyn_filters(vec![Arc::new(DynamicFilterPhysicalExpr::new(
2460            vec![],
2461            physical_lit(false),
2462        ))]);
2463        let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2464            .with_predicate(predicate_group);
2465        let file = file_handle_with_time_range(
2466            Timestamp::new_millisecond(0),
2467            Timestamp::new_millisecond(1000),
2468        );
2469        let mut reader_metrics = ReaderMetrics::default();
2470
2471        let builder = input
2472            .prune_file(&file, PreFilterMode::SkipFields, &mut reader_metrics)
2473            .await
2474            .unwrap();
2475
2476        assert_eq!(1, reader_metrics.filter_metrics.files_time_range_pruned);
2477        let mut ranges = SmallVec::new();
2478        builder.build_ranges(-1, &mut ranges);
2479        assert!(ranges.is_empty());
2480    }
2481
2482    #[tokio::test]
2483    async fn test_manifest_pruning_observes_dynamic_filter_update() {
2484        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2485        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2486        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2487        let arrow_schema = metadata.schema.arrow_schema();
2488        let ts_expr = physical_col("ts", arrow_schema.as_ref()).unwrap();
2489        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
2490            vec![ts_expr.clone()],
2491            physical_lit(true),
2492        ));
2493        predicate_group.add_dyn_filters(vec![dyn_filter.clone()]);
2494        let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2495            .with_predicate(predicate_group);
2496        let file = file_handle_with_time_range(
2497            Timestamp::new_millisecond(0),
2498            Timestamp::new_millisecond(1000),
2499        );
2500
2501        assert!(!input.can_manifest_prune_file(&file));
2502
2503        let updated = physical_binary(
2504            ts_expr,
2505            Operator::Gt,
2506            physical_lit(ScalarValue::TimestampMillisecond(Some(1000), None)),
2507            arrow_schema.as_ref(),
2508        )
2509        .unwrap();
2510        dyn_filter.update(updated).unwrap();
2511
2512        assert!(input.can_manifest_prune_file(&file));
2513    }
2514
2515    #[tokio::test]
2516    async fn test_range_pre_filter_mode() {
2517        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2518        let cases = [
2519            (true, MergeMode::LastRow, 1, PreFilterMode::All),
2520            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2521            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2522            (true, MergeMode::LastRow, 2, PreFilterMode::All),
2523        ];
2524
2525        for (append_mode, merge_mode, source_count, expected_mode) in cases {
2526            let input = new_scan_input(metadata.clone(), vec![])
2527                .await
2528                .with_append_mode(append_mode)
2529                .with_merge_mode(merge_mode);
2530
2531            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2532        }
2533    }
2534}