Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::pruning::PruningStatistics;
32use datafusion_common::{Column, ScalarValue};
33use datafusion_expr::Expr;
34use datafusion_expr::utils::expr_to_columns;
35use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
36use datatypes::extension::json::is_structured_json_field;
37use datatypes::types::json_type::JsonNativeType;
38use datatypes::value::timestamp_to_scalar_value;
39use futures::StreamExt;
40use itertools::Itertools;
41use partition::expr::PartitionExpr;
42use smallvec::SmallVec;
43use snafu::ResultExt;
44use store_api::metadata::{RegionMetadata, RegionMetadataRef};
45use store_api::region_engine::{PartitionRange, RegionScannerRef};
46use store_api::storage::{
47    NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
48    TimeSeriesRowSelector,
49};
50use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
51use tokio::sync::{Semaphore, mpsc};
52use tokio_stream::wrappers::ReceiverStream;
53
54use crate::access_layer::AccessLayerRef;
55use crate::cache::CacheStrategy;
56use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
57use crate::error::{InvalidPartitionExprSnafu, Result};
58#[cfg(feature = "enterprise")]
59use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
60use crate::memtable::{MemtableRange, RangesOptions};
61use crate::metrics::READ_SST_COUNT;
62use crate::read::compat::{self, FlatCompatBatch};
63use crate::read::flat_projection::FlatProjectionMapper;
64use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
65use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
66use crate::read::read_columns::{
67    ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
68    read_columns_from_projection,
69};
70use crate::read::seq_scan::SeqScan;
71use crate::read::series_scan::SeriesScan;
72use crate::read::stream::ScanBatchStream;
73use crate::read::unordered_scan::UnorderedScan;
74use crate::read::{BoxedRecordBatchStream, RecordBatch};
75use crate::region::options::MergeMode;
76use crate::region::version::VersionRef;
77use crate::sst::file::FileHandle;
78use crate::sst::index::bloom_filter::applier::{
79    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
80};
81use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
82use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
83use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
84use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
85#[cfg(feature = "vector_index")]
86use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
87use crate::sst::parquet::file_range::PreFilterMode;
88use crate::sst::parquet::reader::ReaderMetrics;
89
90#[cfg(feature = "vector_index")]
91const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
92
93/// A scanner scans a region and returns a [SendableRecordBatchStream].
94pub(crate) enum Scanner {
95    /// Sequential scan.
96    Seq(SeqScan),
97    /// Unordered scan.
98    Unordered(UnorderedScan),
99    /// Per-series scan.
100    Series(SeriesScan),
101}
102
103impl Scanner {
104    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
105    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
106    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
107        match self {
108            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
109            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
110            Scanner::Series(series_scan) => series_scan.build_stream().await,
111        }
112    }
113
114    /// Create a stream of [`Batch`] by this scanner.
115    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
116        match self {
117            Scanner::Seq(x) => x.scan_all_partitions(),
118            Scanner::Unordered(x) => x.scan_all_partitions(),
119            Scanner::Series(x) => x.scan_all_partitions(),
120        }
121    }
122}
123
124#[cfg(test)]
125impl Scanner {
126    /// Returns number of files to scan.
127    pub(crate) fn num_files(&self) -> usize {
128        match self {
129            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
130            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
131            Scanner::Series(series_scan) => series_scan.input().num_files(),
132        }
133    }
134
135    /// Returns number of memtables to scan.
136    pub(crate) fn num_memtables(&self) -> usize {
137        match self {
138            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
139            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
140            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
141        }
142    }
143
144    /// Returns SST file ids to scan.
145    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
146        match self {
147            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
148            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
149            Scanner::Series(series_scan) => series_scan.input().file_ids(),
150        }
151    }
152
153    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
154        match self {
155            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
156            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
157            Scanner::Series(series_scan) => series_scan.input().index_ids(),
158        }
159    }
160
161    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
162        match self {
163            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
164            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
165            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
166        }
167    }
168
169    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
170    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
171        use store_api::region_engine::{PrepareRequest, RegionScanner};
172
173        let request = PrepareRequest::default().with_target_partitions(target_partitions);
174        match self {
175            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
176            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
177            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
178        }
179    }
180}
181
182#[cfg_attr(doc, aquamarine::aquamarine)]
183/// Helper to scans a region by [ScanRequest].
184///
185/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
186/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
187///
188/// ```mermaid
189/// classDiagram
190/// class ScanRegion {
191///     -VersionRef version
192///     -ScanRequest request
193///     ~scanner() Scanner
194///     ~seq_scan() SeqScan
195/// }
196/// class Scanner {
197///     <<enumeration>>
198///     SeqScan
199///     UnorderedScan
200///     +scan() SendableRecordBatchStream
201/// }
202/// class SeqScan {
203///     -ScanInput input
204///     +build() SendableRecordBatchStream
205/// }
206/// class UnorderedScan {
207///     -ScanInput input
208///     +build() SendableRecordBatchStream
209/// }
210/// class ScanInput {
211///     -ProjectionMapper mapper
212///     -Option~TimeRange~ time_range
213///     -Option~Predicate~ predicate
214///     -Vec~MemtableRef~ memtables
215///     -Vec~FileHandle~ files
216/// }
217/// class ProjectionMapper {
218///     ~output_schema() SchemaRef
219///     ~convert(Batch) RecordBatch
220/// }
221/// ScanRegion -- Scanner
222/// ScanRegion o-- ScanRequest
223/// Scanner o-- SeqScan
224/// Scanner o-- UnorderedScan
225/// SeqScan o-- ScanInput
226/// UnorderedScan o-- ScanInput
227/// Scanner -- SendableRecordBatchStream
228/// ScanInput o-- ProjectionMapper
229/// SeqScan -- SendableRecordBatchStream
230/// UnorderedScan -- SendableRecordBatchStream
231/// ```
232pub(crate) struct ScanRegion {
233    /// Version of the region at scan.
234    version: VersionRef,
235    /// Access layer of the region.
236    access_layer: AccessLayerRef,
237    /// Scan request.
238    request: ScanRequest,
239    /// Cache.
240    cache_strategy: CacheStrategy,
241    /// Maximum number of SST files to scan concurrently.
242    max_concurrent_scan_files: usize,
243    /// Whether to ignore inverted index.
244    ignore_inverted_index: bool,
245    /// Whether to ignore fulltext index.
246    ignore_fulltext_index: bool,
247    /// Whether to ignore bloom filter.
248    ignore_bloom_filter: bool,
249    /// Start time of the scan task.
250    start_time: Option<Instant>,
251    /// Whether to filter out the deleted rows.
252    /// Usually true for normal read, and false for scan for compaction.
253    filter_deleted: bool,
254    #[cfg(feature = "enterprise")]
255    extension_range_provider: Option<BoxedExtensionRangeProvider>,
256}
257
258impl ScanRegion {
259    /// Creates a [ScanRegion].
260    pub(crate) fn new(
261        version: VersionRef,
262        access_layer: AccessLayerRef,
263        request: ScanRequest,
264        cache_strategy: CacheStrategy,
265    ) -> ScanRegion {
266        ScanRegion {
267            version,
268            access_layer,
269            request,
270            cache_strategy,
271            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
272            ignore_inverted_index: false,
273            ignore_fulltext_index: false,
274            ignore_bloom_filter: false,
275            start_time: None,
276            filter_deleted: true,
277            #[cfg(feature = "enterprise")]
278            extension_range_provider: None,
279        }
280    }
281
282    /// Sets maximum number of SST files to scan concurrently.
283    #[must_use]
284    pub(crate) fn with_max_concurrent_scan_files(
285        mut self,
286        max_concurrent_scan_files: usize,
287    ) -> Self {
288        self.max_concurrent_scan_files = max_concurrent_scan_files;
289        self
290    }
291
292    /// Sets whether to ignore inverted index.
293    #[must_use]
294    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
295        self.ignore_inverted_index = ignore;
296        self
297    }
298
299    /// Sets whether to ignore fulltext index.
300    #[must_use]
301    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
302        self.ignore_fulltext_index = ignore;
303        self
304    }
305
306    /// Sets whether to ignore bloom filter.
307    #[must_use]
308    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
309        self.ignore_bloom_filter = ignore;
310        self
311    }
312
313    #[must_use]
314    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
315        self.start_time = Some(now);
316        self
317    }
318
319    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
320        self.filter_deleted = filter_deleted;
321    }
322
323    #[cfg(feature = "enterprise")]
324    pub(crate) fn set_extension_range_provider(
325        &mut self,
326        extension_range_provider: BoxedExtensionRangeProvider,
327    ) {
328        self.extension_range_provider = Some(extension_range_provider);
329    }
330
331    /// Returns a [Scanner] to scan the region.
332    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
333    pub(crate) async fn scanner(self) -> Result<Scanner> {
334        if self.use_series_scan() {
335            self.series_scan().await.map(Scanner::Series)
336        } else if self.use_unordered_scan() {
337            // If table is append only and there is no series row selector, we use unordered scan in query.
338            // We still use seq scan in compaction.
339            self.unordered_scan().await.map(Scanner::Unordered)
340        } else {
341            self.seq_scan().await.map(Scanner::Seq)
342        }
343    }
344
345    /// Returns a [RegionScanner] to scan the region.
346    #[tracing::instrument(
347        level = tracing::Level::DEBUG,
348        skip_all,
349        fields(region_id = %self.region_id())
350    )]
351    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
352        if self.use_series_scan() {
353            self.series_scan()
354                .await
355                .map(|scanner| Box::new(scanner) as _)
356        } else if self.use_unordered_scan() {
357            self.unordered_scan()
358                .await
359                .map(|scanner| Box::new(scanner) as _)
360        } else {
361            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
362        }
363    }
364
365    /// Scan sequentially.
366    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
367    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
368        let input = self.scan_input().await?.with_compaction(false);
369        Ok(SeqScan::new(input))
370    }
371
372    /// Unordered scan.
373    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
374    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
375        let input = self.scan_input().await?;
376        Ok(UnorderedScan::new(input))
377    }
378
379    /// Scans by series.
380    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
381    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
382        let input = self.scan_input().await?;
383        Ok(SeriesScan::new(input))
384    }
385
386    /// Returns true if the region can use unordered scan for current request.
387    fn use_unordered_scan(&self) -> bool {
388        // We use unordered scan when:
389        // 1. The region is in append mode.
390        // 2. There is no series row selector.
391        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
392        //
393        // We still use seq scan in compaction.
394        self.version.options.append_mode
395            && self.request.series_row_selector.is_none()
396            && (self.request.distribution.is_none()
397                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
398    }
399
400    /// Returns true if the region can use series scan for current request.
401    fn use_series_scan(&self) -> bool {
402        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
403    }
404
405    /// Creates a scan input.
406    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407    async fn scan_input(self) -> Result<ScanInput> {
408        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409        let time_range = self.build_time_range_predicate();
410        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411
412        let mut read_cols = match &self.request.projection_input {
413            Some(p) => {
414                // Read columns include the pushed-down projection and columns
415                // resolved from the predicate.
416                let metadata = &self.version.metadata;
417                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
418                let from_predicate = read_columns_from_predicate(&predicate, metadata);
419                merge(from_projection, from_predicate)
420            }
421            None => {
422                let read_col_ids = self
423                    .version
424                    .metadata
425                    .column_metadatas
426                    .iter()
427                    .map(|col| col.column_id);
428                ReadColumns::from_deduped_column_ids(read_col_ids)
429            }
430        };
431        // Only narrow read columns and pass JSON type hints for structured JSON (JSON2)
432        // columns. Legacy JSONB columns have JSON extension metadata but their physical
433        // Arrow type is Binary, not Struct, so they must not enter structured JSON paths.
434        let has_structured_json = self
435            .version
436            .metadata
437            .schema
438            .arrow_schema()
439            .fields()
440            .iter()
441            .any(is_structured_json_field);
442        if has_structured_json {
443            narrow_read_columns_by_json_type_hint(
444                &mut read_cols,
445                &self.request.json_type_hint,
446                &self.version.metadata,
447            );
448        }
449        let read_col_ids = read_cols.column_ids();
450
451        // The mapper always computes projected column ids as the schema of SSTs may change.
452        let projection = self
453            .request
454            .projection_indices()
455            .map(|x| x.to_vec())
456            .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
457        let json_type_hint = has_structured_json
458            .then_some(&self.request.json_type_hint)
459            .inspect(|json_type_hint| {
460                debug!(
461                    "Concretized JSON type: {{{}}}",
462                    json_type_hint
463                        .iter()
464                        .map(|(k, v)| format!("{}: {}", k, v))
465                        .join(", ")
466                );
467            });
468        let mapper = FlatProjectionMapper::new_with_read_columns(
469            &self.version.metadata,
470            projection,
471            read_cols,
472            json_type_hint,
473        )?;
474
475        let ssts = &self.version.ssts;
476        let mut files = Vec::new();
477        if !self.request.skip_sst_files {
478            for level in ssts.levels() {
479                for file in level.files.values() {
480                    let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
481                        (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
482                        // If the file's sequence is None (or actually is zero), it could mean the file
483                        // is generated and added to the region "directly". In this case, its data should
484                        // be considered as fresh as the memtable. So its sequence is treated greater than
485                        // the min_sequence, whatever the value of min_sequence is. Hence the default
486                        // "true" in this arm.
487                        (Some(_), None) => true,
488                        (None, _) => true,
489                    };
490
491                    // Finds SST files in range.
492                    if exceed_min_sequence && file_in_range(file, &time_range) {
493                        files.push(file.clone());
494                    }
495                    // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
496                    // unless the timing is too good, or the sequence number wouldn't be in file.
497                    // and the batch will be filtered out by tree reader anyway.
498                }
499            }
500        }
501
502        let memtables = self.version.memtables.list_memtables();
503        // Skip empty memtables and memtables out of time range.
504        let mut mem_range_builders = Vec::new();
505        let filter_mode = pre_filter_mode(
506            self.version.options.append_mode,
507            self.version.options.merge_mode(),
508        );
509
510        for m in memtables {
511            // check if memtable is empty by reading stats.
512            let Some((start, end)) = m.stats().time_range() else {
513                continue;
514            };
515            // The time range of the memtable is inclusive.
516            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
517            if !memtable_range.intersects(&time_range) {
518                continue;
519            }
520            let ranges_in_memtable = m.ranges(
521                Some(&read_col_ids),
522                RangesOptions::default()
523                    .with_predicate(predicate.clone())
524                    .with_sequence(SequenceRange::new(
525                        self.request.memtable_min_sequence,
526                        self.request.memtable_max_sequence,
527                    ))
528                    .with_pre_filter_mode(filter_mode),
529            )?;
530            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
531                let stats = v.stats().clone();
532                MemRangeBuilder::new(v, stats)
533            }));
534        }
535
536        let region_id = self.region_id();
537        debug!(
538            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
539            region_id,
540            self.request,
541            time_range,
542            mem_range_builders.len(),
543            files.len(),
544            self.version.options.append_mode,
545        );
546
547        let (non_field_filters, field_filters) = self.partition_by_field_filters();
548        let inverted_index_appliers = [
549            self.build_invereted_index_applier(&non_field_filters),
550            self.build_invereted_index_applier(&field_filters),
551        ];
552        let bloom_filter_appliers = [
553            self.build_bloom_filter_applier(&non_field_filters),
554            self.build_bloom_filter_applier(&field_filters),
555        ];
556        let fulltext_index_appliers = [
557            self.build_fulltext_index_applier(&non_field_filters),
558            self.build_fulltext_index_applier(&field_filters),
559        ];
560        #[cfg(feature = "vector_index")]
561        let vector_index_applier = self.build_vector_index_applier();
562        #[cfg(feature = "vector_index")]
563        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
564            if self.request.filters.is_empty() {
565                search.k
566            } else {
567                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
568            }
569        });
570
571        let input = ScanInput::new(self.access_layer, mapper)
572            .with_time_range(Some(time_range))
573            .with_predicate(predicate)
574            .with_memtables(mem_range_builders)
575            .with_files(files)
576            .with_cache(self.cache_strategy)
577            .with_inverted_index_appliers(inverted_index_appliers)
578            .with_bloom_filter_index_appliers(bloom_filter_appliers)
579            .with_fulltext_index_appliers(fulltext_index_appliers)
580            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
581            .with_start_time(self.start_time)
582            .with_append_mode(self.version.options.append_mode)
583            .with_filter_deleted(self.filter_deleted)
584            .with_merge_mode(self.version.options.merge_mode())
585            .with_series_row_selector(self.request.series_row_selector)
586            .with_distribution(self.request.distribution)
587            .with_explain_flat_format(
588                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
589            )
590            .with_snapshot_sequence(
591                self.request
592                    .snapshot_on_scan
593                    .then_some(self.request.memtable_max_sequence)
594                    .flatten(),
595            );
596        #[cfg(feature = "vector_index")]
597        let input = input
598            .with_vector_index_applier(vector_index_applier)
599            .with_vector_index_k(vector_index_k);
600
601        #[cfg(feature = "enterprise")]
602        let input = if !self.request.skip_sst_files
603            && let Some(provider) = self.extension_range_provider
604        {
605            let ranges = provider
606                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
607                .await?;
608            debug!("Find extension ranges: {ranges:?}");
609            input.with_extension_ranges(ranges)
610        } else {
611            input
612        };
613        Ok(input)
614    }
615
616    fn region_id(&self) -> RegionId {
617        self.version.metadata.region_id
618    }
619
620    /// Build time range predicate from filters.
621    fn build_time_range_predicate(&self) -> TimestampRange {
622        let time_index = self.version.metadata.time_index_column();
623        let unit = time_index
624            .column_schema
625            .data_type
626            .as_timestamp()
627            .expect("Time index must have timestamp-compatible type")
628            .unit();
629        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
630    }
631
632    /// Partitions filters into two groups: non-field filters and field filters.
633    /// Returns `(non_field_filters, field_filters)`.
634    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
635        let field_columns = self
636            .version
637            .metadata
638            .field_columns()
639            .map(|col| &col.column_schema.name)
640            .collect::<HashSet<_>>();
641
642        let mut columns = HashSet::new();
643
644        self.request.filters.iter().cloned().partition(|expr| {
645            columns.clear();
646            // `expr_to_columns` won't return error.
647            if expr_to_columns(expr, &mut columns).is_err() {
648                // If we can't extract columns, treat it as non-field filter
649                return true;
650            }
651            // Return true for non-field filters (partition puts true cases in first vec)
652            !columns
653                .iter()
654                .any(|column| field_columns.contains(&column.name))
655        })
656    }
657
658    /// Use the latest schema to build the inverted index applier.
659    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
660        if self.ignore_inverted_index {
661            return None;
662        }
663
664        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
665        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
666
667        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
668
669        InvertedIndexApplierBuilder::new(
670            self.access_layer.table_dir().to_string(),
671            self.access_layer.path_type(),
672            self.access_layer.object_store().clone(),
673            self.version.metadata.as_ref(),
674            self.version.metadata.inverted_indexed_column_ids(
675                self.version
676                    .options
677                    .index_options
678                    .inverted_index
679                    .ignore_column_ids
680                    .iter(),
681            ),
682            self.access_layer.puffin_manager_factory().clone(),
683        )
684        .with_file_cache(file_cache)
685        .with_inverted_index_cache(inverted_index_cache)
686        .with_puffin_metadata_cache(puffin_metadata_cache)
687        .build(filters)
688        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
689        .ok()
690        .flatten()
691        .map(Arc::new)
692    }
693
694    /// Use the latest schema to build the bloom filter index applier.
695    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
696        if self.ignore_bloom_filter {
697            return None;
698        }
699
700        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
701        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
702        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
703
704        BloomFilterIndexApplierBuilder::new(
705            self.access_layer.table_dir().to_string(),
706            self.access_layer.path_type(),
707            self.access_layer.object_store().clone(),
708            self.version.metadata.as_ref(),
709            self.access_layer.puffin_manager_factory().clone(),
710        )
711        .with_file_cache(file_cache)
712        .with_bloom_filter_index_cache(bloom_filter_index_cache)
713        .with_puffin_metadata_cache(puffin_metadata_cache)
714        .build(filters)
715        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
716        .ok()
717        .flatten()
718        .map(Arc::new)
719    }
720
721    /// Use the latest schema to build the fulltext index applier.
722    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
723        if self.ignore_fulltext_index {
724            return None;
725        }
726
727        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
728        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
729        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
730        FulltextIndexApplierBuilder::new(
731            self.access_layer.table_dir().to_string(),
732            self.access_layer.path_type(),
733            self.access_layer.object_store().clone(),
734            self.access_layer.puffin_manager_factory().clone(),
735            self.version.metadata.as_ref(),
736        )
737        .with_file_cache(file_cache)
738        .with_puffin_metadata_cache(puffin_metadata_cache)
739        .with_bloom_filter_cache(bloom_filter_index_cache)
740        .build(filters)
741        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
742        .ok()
743        .flatten()
744        .map(Arc::new)
745    }
746
747    /// Build the vector index applier from vector search request.
748    #[cfg(feature = "vector_index")]
749    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
750        let vector_search = self.request.vector_search.as_ref()?;
751
752        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
753        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
754        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
755
756        let applier = VectorIndexApplier::new(
757            self.access_layer.table_dir().to_string(),
758            self.access_layer.path_type(),
759            self.access_layer.object_store().clone(),
760            self.access_layer.puffin_manager_factory().clone(),
761            vector_search.column_id,
762            vector_search.query_vector.clone(),
763            vector_search.metric,
764        )
765        .with_file_cache(file_cache)
766        .with_puffin_metadata_cache(puffin_metadata_cache)
767        .with_vector_index_cache(vector_index_cache);
768
769        Some(Arc::new(applier))
770    }
771}
772
773/// Returns true if the time range of a SST `file` matches the `predicate`.
774fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
775    if predicate == &TimestampRange::min_to_max() {
776        return true;
777    }
778    // end timestamp of a SST is inclusive.
779    let (start, end) = file.time_range();
780    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
781    file_ts_range.intersects(predicate)
782}
783
784/// Common input for different scanners.
785pub struct ScanInput {
786    /// Region SST access layer.
787    access_layer: AccessLayerRef,
788    /// Maps projected Batches to RecordBatches.
789    pub(crate) mapper: Arc<FlatProjectionMapper>,
790    /// The columns to read from memtables and SSTs.
791    /// Notice this is different from the columns in `mapper` which are projected columns.
792    /// But this read columns might also include non-projected columns needed for filtering.
793    pub(crate) read_cols: ReadColumns,
794    /// Time range filter for time index.
795    pub(crate) time_range: Option<TimestampRange>,
796    /// Predicate to push down.
797    pub(crate) predicate: PredicateGroup,
798    /// Region partition expr applied at read time.
799    region_partition_expr: Option<PartitionExpr>,
800    /// Memtable range builders for memtables in the time range..
801    pub(crate) memtables: Vec<MemRangeBuilder>,
802    /// Handles to SST files to scan.
803    pub(crate) files: Vec<FileHandle>,
804    /// Cache.
805    pub(crate) cache_strategy: CacheStrategy,
806    /// Ignores file not found error.
807    ignore_file_not_found: bool,
808    /// Maximum number of SST files to scan concurrently.
809    pub(crate) max_concurrent_scan_files: usize,
810    /// Index appliers.
811    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
812    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
813    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
814    /// Vector index applier for KNN search.
815    #[cfg(feature = "vector_index")]
816    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
817    /// Over-fetched k for vector index scan.
818    #[cfg(feature = "vector_index")]
819    pub(crate) vector_index_k: Option<usize>,
820    /// Start time of the query.
821    pub(crate) query_start: Option<Instant>,
822    /// The region is using append mode.
823    pub(crate) append_mode: bool,
824    /// Whether to remove deletion markers.
825    pub(crate) filter_deleted: bool,
826    /// Mode to merge duplicate rows.
827    pub(crate) merge_mode: MergeMode,
828    /// Hint to select rows from time series.
829    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
830    /// Hint for the required distribution of the scanner.
831    pub(crate) distribution: Option<TimeSeriesDistribution>,
832    /// Whether the region's configured SST format is flat.
833    explain_flat_format: bool,
834    /// Snapshot upper bound bound at scan open and propagated back to the caller.
835    pub(crate) snapshot_sequence: Option<SequenceNumber>,
836    /// Whether this scan is for compaction.
837    pub(crate) compaction: bool,
838    #[cfg(feature = "enterprise")]
839    extension_ranges: Vec<BoxedExtensionRange>,
840}
841
842impl ScanInput {
843    /// Creates a new [ScanInput].
844    #[must_use]
845    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
846        ScanInput {
847            access_layer,
848            read_cols: mapper.read_columns().clone(),
849            mapper: Arc::new(mapper),
850            time_range: None,
851            predicate: PredicateGroup::default(),
852            region_partition_expr: None,
853            memtables: Vec::new(),
854            files: Vec::new(),
855            cache_strategy: CacheStrategy::Disabled,
856            ignore_file_not_found: false,
857            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
858            inverted_index_appliers: [None, None],
859            bloom_filter_index_appliers: [None, None],
860            fulltext_index_appliers: [None, None],
861            #[cfg(feature = "vector_index")]
862            vector_index_applier: None,
863            #[cfg(feature = "vector_index")]
864            vector_index_k: None,
865            query_start: None,
866            append_mode: false,
867            filter_deleted: true,
868            merge_mode: MergeMode::default(),
869            series_row_selector: None,
870            distribution: None,
871            explain_flat_format: false,
872            snapshot_sequence: None,
873            compaction: false,
874            #[cfg(feature = "enterprise")]
875            extension_ranges: Vec::new(),
876        }
877    }
878
879    /// Sets time range filter for time index.
880    #[must_use]
881    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
882        self.time_range = time_range;
883        self
884    }
885
886    /// Sets predicate to push down.
887    #[must_use]
888    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
889        self.region_partition_expr = predicate.region_partition_expr().cloned();
890        self.predicate = predicate;
891        self
892    }
893
894    /// Sets memtable range builders.
895    #[must_use]
896    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
897        self.memtables = memtables;
898        self
899    }
900
901    /// Sets files to read.
902    #[must_use]
903    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
904        self.files = files;
905        self
906    }
907
908    /// Sets cache for this query.
909    #[must_use]
910    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
911        self.cache_strategy = cache;
912        self
913    }
914
915    /// Ignores file not found error.
916    #[must_use]
917    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
918        self.ignore_file_not_found = ignore;
919        self
920    }
921
922    /// Sets maximum number of SST files to scan concurrently.
923    #[must_use]
924    pub(crate) fn with_max_concurrent_scan_files(
925        mut self,
926        max_concurrent_scan_files: usize,
927    ) -> Self {
928        self.max_concurrent_scan_files = max_concurrent_scan_files;
929        self
930    }
931
932    /// Sets inverted index appliers.
933    #[must_use]
934    pub(crate) fn with_inverted_index_appliers(
935        mut self,
936        appliers: [Option<InvertedIndexApplierRef>; 2],
937    ) -> Self {
938        self.inverted_index_appliers = appliers;
939        self
940    }
941
942    /// Sets bloom filter appliers.
943    #[must_use]
944    pub(crate) fn with_bloom_filter_index_appliers(
945        mut self,
946        appliers: [Option<BloomFilterIndexApplierRef>; 2],
947    ) -> Self {
948        self.bloom_filter_index_appliers = appliers;
949        self
950    }
951
952    /// Sets fulltext index appliers.
953    #[must_use]
954    pub(crate) fn with_fulltext_index_appliers(
955        mut self,
956        appliers: [Option<FulltextIndexApplierRef>; 2],
957    ) -> Self {
958        self.fulltext_index_appliers = appliers;
959        self
960    }
961
962    /// Sets vector index applier for KNN search.
963    #[cfg(feature = "vector_index")]
964    #[must_use]
965    pub(crate) fn with_vector_index_applier(
966        mut self,
967        applier: Option<VectorIndexApplierRef>,
968    ) -> Self {
969        self.vector_index_applier = applier;
970        self
971    }
972
973    /// Sets over-fetched k for vector index scan.
974    #[cfg(feature = "vector_index")]
975    #[must_use]
976    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
977        self.vector_index_k = k;
978        self
979    }
980
981    /// Sets start time of the query.
982    #[must_use]
983    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
984        self.query_start = now;
985        self
986    }
987
988    #[must_use]
989    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
990        self.append_mode = is_append_mode;
991        self
992    }
993
994    /// Sets whether to remove deletion markers during scan.
995    #[must_use]
996    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
997        self.filter_deleted = filter_deleted;
998        self
999    }
1000
1001    /// Sets the merge mode.
1002    #[must_use]
1003    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1004        self.merge_mode = merge_mode;
1005        self
1006    }
1007
1008    /// Sets the distribution hint.
1009    #[must_use]
1010    pub(crate) fn with_distribution(
1011        mut self,
1012        distribution: Option<TimeSeriesDistribution>,
1013    ) -> Self {
1014        self.distribution = distribution;
1015        self
1016    }
1017
1018    /// Sets whether the region's configured SST format is flat for explain output.
1019    #[must_use]
1020    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1021        self.explain_flat_format = explain_flat_format;
1022        self
1023    }
1024
1025    /// Sets the time series row selector.
1026    #[must_use]
1027    pub(crate) fn with_series_row_selector(
1028        mut self,
1029        series_row_selector: Option<TimeSeriesRowSelector>,
1030    ) -> Self {
1031        self.series_row_selector = series_row_selector;
1032        self
1033    }
1034
1035    #[must_use]
1036    pub(crate) fn with_snapshot_sequence(
1037        mut self,
1038        snapshot_sequence: Option<SequenceNumber>,
1039    ) -> Self {
1040        self.snapshot_sequence = snapshot_sequence;
1041        self
1042    }
1043
1044    /// Sets whether this scan is for compaction.
1045    #[must_use]
1046    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1047        self.compaction = compaction;
1048        self
1049    }
1050
1051    /// Builds memtable ranges to scan by `index`.
1052    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1053        let memtable = &self.memtables[index.index];
1054        let mut ranges = SmallVec::new();
1055        memtable.build_ranges(index.row_group_index, &mut ranges);
1056        ranges
1057    }
1058
1059    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1060        if self.should_skip_region_partition(file) {
1061            self.predicate.predicate_without_region().cloned()
1062        } else {
1063            self.predicate.predicate().cloned()
1064        }
1065    }
1066
1067    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1068        match (
1069            self.region_partition_expr.as_ref(),
1070            file.meta_ref().partition_expr.as_ref(),
1071        ) {
1072            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1073            _ => false,
1074        }
1075    }
1076
1077    /// Tries to build file-level pruning statistics using only the [FileHandle]'s manifest-level
1078    /// time range, without reading any parquet metadata.
1079    ///
1080    /// Returns `None` if timestamp unit conversion overflows (conservative: keep the file).
1081    fn try_file_level_pruning_stats(&self, file: &FileHandle) -> Option<FileLevelPruningStats> {
1082        let (ts_min, ts_max) = file.time_range();
1083        let time_index = self.mapper.metadata().time_index_column();
1084        let time_index_unit = time_index.column_schema.data_type.as_timestamp()?.unit();
1085
1086        // Convert file timestamps to the time index column's unit. Use `convert_to_ceil` for
1087        // the upper bound to avoid accidentally shrinking the manifest range.
1088        let min_ts = ts_min.convert_to(time_index_unit)?;
1089        let max_ts = ts_max.convert_to_ceil(time_index_unit)?;
1090
1091        Some(FileLevelPruningStats {
1092            min_scalar: timestamp_to_scalar_value(time_index_unit, Some(min_ts.value())),
1093            max_scalar: timestamp_to_scalar_value(time_index_unit, Some(max_ts.value())),
1094            time_index_col_name: time_index.column_schema.name.clone(),
1095        })
1096    }
1097
1098    /// Prunes a file to scan and returns the builder to build readers.
1099    #[tracing::instrument(
1100        skip_all,
1101        fields(
1102            region_id = %self.region_metadata().region_id,
1103            file_id = %file.file_id()
1104        )
1105    )]
1106    pub async fn prune_file(
1107        &self,
1108        file: &FileHandle,
1109        pre_filter_mode: PreFilterMode,
1110        reader_metrics: &mut ReaderMetrics,
1111    ) -> Result<FileRangeBuilder> {
1112        let predicate = self.predicate_for_file(file);
1113
1114        // Early file-level pruning using manifest time range before any parquet metadata access.
1115        // This avoids I/O for files that definitely can't match the current predicate snapshot,
1116        // especially after TopK dynamic filters have established a timestamp threshold.
1117        if let Some(ref pred) = predicate
1118            && !pred.is_empty()
1119            && let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
1120        {
1121            let pruning_results = pred.prune_with_stats(
1122                &file_level_stats,
1123                self.mapper.metadata().schema.arrow_schema(),
1124            );
1125            if pruning_results.first() == Some(&false) {
1126                reader_metrics.filter_metrics.files_time_range_pruned += 1;
1127                return Ok(FileRangeBuilder::default());
1128            }
1129        }
1130
1131        let may_build_selective_row_selection = predicate.is_some();
1132        let decode_pk_values = !self.compaction
1133            && self
1134                .mapper
1135                .read_columns()
1136                .column_ids_iter()
1137                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1138        let reader = self
1139            .access_layer
1140            .read_sst(file.clone())
1141            .predicate(predicate)
1142            .projection(Some(self.read_cols.clone()))
1143            .cache(self.cache_strategy.clone())
1144            .inverted_index_appliers(self.inverted_index_appliers.clone())
1145            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1146            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1147        let reader = if !self.compaction && may_build_selective_row_selection {
1148            reader.deferred_optional_page_index()
1149        } else {
1150            reader
1151        };
1152        #[cfg(feature = "vector_index")]
1153        let reader = {
1154            let mut reader = reader;
1155            reader =
1156                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1157            reader
1158        };
1159        let res = reader
1160            .expected_metadata(Some(self.mapper.metadata().clone()))
1161            .compaction(self.compaction)
1162            .pre_filter_mode(pre_filter_mode)
1163            .decode_primary_key_values(decode_pk_values)
1164            .build_reader_input(reader_metrics)
1165            .await;
1166        let read_input = match res {
1167            Ok(x) => x,
1168            Err(e) => {
1169                if e.is_object_not_found() && self.ignore_file_not_found {
1170                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1171                    return Ok(FileRangeBuilder::default());
1172                } else {
1173                    return Err(e);
1174                }
1175            }
1176        };
1177
1178        let Some((mut file_range_ctx, selection)) = read_input else {
1179            return Ok(FileRangeBuilder::default());
1180        };
1181
1182        let need_compat = !compat::has_same_columns_and_pk_encoding(
1183            &self.mapper,
1184            file_range_ctx.read_format(),
1185            self.compaction,
1186        );
1187        if need_compat {
1188            // They have different schema. We need to adapt the batch first so the
1189            // mapper can convert it.
1190            let compat = FlatCompatBatch::try_new(
1191                &self.mapper,
1192                file_range_ctx.read_format(),
1193                self.compaction,
1194            )?;
1195            file_range_ctx.set_compat_batch(compat);
1196        }
1197        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1198    }
1199
1200    /// Scans flat sources (RecordBatch streams) in parallel.
1201    ///
1202    /// # Panics if the input doesn't allow parallel scan.
1203    #[tracing::instrument(
1204        skip(self, sources, semaphore),
1205        fields(
1206            region_id = %self.region_metadata().region_id,
1207            source_count = sources.len()
1208        )
1209    )]
1210    pub(crate) fn create_parallel_flat_sources(
1211        &self,
1212        sources: Vec<BoxedRecordBatchStream>,
1213        semaphore: Arc<Semaphore>,
1214        channel_size: usize,
1215    ) -> Result<Vec<BoxedRecordBatchStream>> {
1216        if sources.len() <= 1 {
1217            return Ok(sources);
1218        }
1219
1220        // Spawn a task for each source.
1221        let sources = sources
1222            .into_iter()
1223            .map(|source| {
1224                let (sender, receiver) = mpsc::channel(channel_size);
1225                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1226                let stream = Box::pin(ReceiverStream::new(receiver));
1227                Box::pin(stream) as _
1228            })
1229            .collect();
1230        Ok(sources)
1231    }
1232
1233    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1234    #[tracing::instrument(
1235        skip(self, input, semaphore, sender),
1236        fields(region_id = %self.region_metadata().region_id)
1237    )]
1238    pub(crate) fn spawn_flat_scan_task(
1239        &self,
1240        mut input: BoxedRecordBatchStream,
1241        semaphore: Arc<Semaphore>,
1242        sender: mpsc::Sender<Result<RecordBatch>>,
1243    ) {
1244        let region_id = self.region_metadata().region_id;
1245        let span = tracing::info_span!(
1246            "ScanInput::parallel_scan_task",
1247            region_id = %region_id,
1248            stream_kind = "flat"
1249        );
1250        common_runtime::spawn_query(
1251            async move {
1252                loop {
1253                    // We release the permit before sending result to avoid the task waiting on
1254                    // the channel with the permit held.
1255                    let maybe_batch = {
1256                        // Safety: We never close the semaphore.
1257                        let _permit = semaphore.acquire().await.unwrap();
1258                        input.next().await
1259                    };
1260                    match maybe_batch {
1261                        Some(Ok(batch)) => {
1262                            let _ = sender.send(Ok(batch)).await;
1263                        }
1264                        Some(Err(e)) => {
1265                            let _ = sender.send(Err(e)).await;
1266                            break;
1267                        }
1268                        None => break,
1269                    }
1270                }
1271            }
1272            .instrument(span),
1273        );
1274    }
1275
1276    pub(crate) fn total_rows(&self) -> usize {
1277        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1278        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1279
1280        let rows = rows_in_files + rows_in_memtables;
1281        #[cfg(feature = "enterprise")]
1282        let rows = rows
1283            + self
1284                .extension_ranges
1285                .iter()
1286                .map(|x| x.num_rows())
1287                .sum::<u64>() as usize;
1288        rows
1289    }
1290
1291    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1292        &self.predicate
1293    }
1294
1295    /// Returns number of memtables to scan.
1296    pub(crate) fn num_memtables(&self) -> usize {
1297        self.memtables.len()
1298    }
1299
1300    /// Returns number of SST files to scan.
1301    pub(crate) fn num_files(&self) -> usize {
1302        self.files.len()
1303    }
1304
1305    /// Gets the file handle from a row group index.
1306    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1307        let file_index = index.index - self.num_memtables();
1308        &self.files[file_index]
1309    }
1310
1311    pub fn region_metadata(&self) -> &RegionMetadataRef {
1312        self.mapper.metadata()
1313    }
1314
1315    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1316        if source_count <= 1 {
1317            // Duplicated rows in the same source is not a normal case and we don't provide
1318            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1319            // are exactly identical in the same source so we use PreFilterMode::All for
1320            // performance reason.
1321            return PreFilterMode::All;
1322        }
1323
1324        pre_filter_mode(self.append_mode, self.merge_mode)
1325    }
1326}
1327
1328#[cfg(feature = "enterprise")]
1329impl ScanInput {
1330    #[must_use]
1331    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1332        Self {
1333            extension_ranges,
1334            ..self
1335        }
1336    }
1337
1338    #[cfg(feature = "enterprise")]
1339    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1340        &self.extension_ranges
1341    }
1342
1343    /// Get a boxed [ExtensionRange] by the index in all ranges.
1344    #[cfg(feature = "enterprise")]
1345    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1346        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1347    }
1348}
1349
1350/// Lightweight [PruningStatistics] that only uses the file-level time range from manifest
1351/// metadata, avoiding any parquet metadata reads. Used for early file-level pruning before
1352/// accessing row-group-level statistics.
1353pub(crate) struct FileLevelPruningStats {
1354    /// Scalar value for the file's minimum timestamp in the time index column's unit.
1355    pub(crate) min_scalar: ScalarValue,
1356    /// Scalar value for the file's maximum timestamp in the time index column's unit.
1357    pub(crate) max_scalar: ScalarValue,
1358    /// Name of the time index column.
1359    pub(crate) time_index_col_name: String,
1360}
1361
1362impl PruningStatistics for FileLevelPruningStats {
1363    fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1364        if column.name == self.time_index_col_name {
1365            ScalarValue::iter_to_array(std::iter::once(self.min_scalar.clone())).ok()
1366        } else {
1367            None
1368        }
1369    }
1370
1371    fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1372        if column.name == self.time_index_col_name {
1373            ScalarValue::iter_to_array(std::iter::once(self.max_scalar.clone())).ok()
1374        } else {
1375            None
1376        }
1377    }
1378
1379    fn num_containers(&self) -> usize {
1380        1
1381    }
1382
1383    fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
1384        if column.name == self.time_index_col_name {
1385            // The time index column is NOT NULL.
1386            Some(Arc::new(UInt64Array::from(vec![0u64])))
1387        } else {
1388            None
1389        }
1390    }
1391
1392    fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1393        None
1394    }
1395
1396    fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
1397        None
1398    }
1399}
1400
1401#[cfg(test)]
1402impl ScanInput {
1403    /// Returns SST file ids to scan.
1404    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1405        self.files.iter().map(|file| file.file_id()).collect()
1406    }
1407
1408    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1409        self.files.iter().map(|file| file.index_id()).collect()
1410    }
1411}
1412
1413fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1414    if append_mode {
1415        return PreFilterMode::All;
1416    }
1417
1418    match merge_mode {
1419        MergeMode::LastRow => PreFilterMode::SkipFields,
1420        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1421    }
1422}
1423
1424fn narrow_read_columns_by_json_type_hint(
1425    read_columns: &mut ReadColumns,
1426    json_type_hint: &HashMap<String, JsonNativeType>,
1427    metadata: &RegionMetadata,
1428) {
1429    if json_type_hint.is_empty() {
1430        return;
1431    }
1432
1433    for read_column in &mut read_columns.cols {
1434        let Some(column) = metadata.column_by_id(read_column.column_id) else {
1435            continue;
1436        };
1437        let column_name = &column.column_schema.name;
1438        let Some(json_type) = json_type_hint.get(column_name) else {
1439            continue;
1440        };
1441
1442        let mut paths = Vec::new();
1443        let mut current = vec![column_name.clone()];
1444        collect_json_nested_paths(json_type, &mut current, &mut paths);
1445        merge_nested_paths(&mut read_column.nested_paths, paths)
1446    }
1447}
1448
1449fn collect_json_nested_paths(
1450    json_type: &JsonNativeType,
1451    current: &mut NestedPath,
1452    paths: &mut Vec<NestedPath>,
1453) {
1454    match json_type {
1455        JsonNativeType::Object(fields) if !fields.is_empty() => {
1456            for (field, child) in fields {
1457                current.push(field.clone());
1458                collect_json_nested_paths(child, current, paths);
1459                current.pop();
1460            }
1461        }
1462        _ => paths.push(current.clone()),
1463    }
1464}
1465
1466/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
1467/// implied time range used to decide whether the cache key can drop the time
1468/// predicates for a given partition (see `build_range_cache_key`).
1469pub(crate) struct ScanFingerprintBundle {
1470    pub(crate) fingerprint: ScanRequestFingerprint,
1471    /// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the
1472    /// column's `TimeUnit`).
1473    /// `None`    = at least one time-only predicate could not be proven (e.g.
1474    /// `OR`), so the cache-key optimization is disabled for this scan.
1475    pub(crate) implied_time_range: Option<TimestampRange>,
1476}
1477
1478/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible
1479/// for partition range caching.
1480pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1481    let eligible = !input.compaction
1482        && !input.files.is_empty()
1483        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1484
1485    if !eligible {
1486        return None;
1487    }
1488
1489    let metadata = input.region_metadata();
1490    let tag_names: HashSet<&str> = metadata
1491        .column_metadatas
1492        .iter()
1493        .filter(|col| col.semantic_type == SemanticType::Tag)
1494        .map(|col| col.column_schema.name.as_str())
1495        .collect();
1496
1497    let time_index = metadata.time_index_column();
1498    let time_index_name = time_index.column_schema.name.clone();
1499    let ts_col_unit = time_index
1500        .column_schema
1501        .data_type
1502        .as_timestamp()
1503        .expect("Time index must have timestamp-compatible type")
1504        .unit();
1505
1506    let exprs = input
1507        .predicate_group()
1508        .predicate_without_region()
1509        .map(|predicate| predicate.exprs())
1510        .unwrap_or_default();
1511
1512    let mut filters = Vec::new();
1513    let mut time_only_exprs: Vec<&Expr> = Vec::new();
1514    let mut has_tag_filter = false;
1515    let mut columns = HashSet::new();
1516
1517    for expr in exprs {
1518        columns.clear();
1519        let is_time_only = match expr_to_columns(expr, &mut columns) {
1520            Ok(()) if !columns.is_empty() => {
1521                has_tag_filter |= columns
1522                    .iter()
1523                    .any(|col| tag_names.contains(col.name.as_str()));
1524                columns.iter().all(|col| col.name == time_index_name)
1525            }
1526            _ => false,
1527        };
1528
1529        // Route time-only exprs that the legacy extractor recognizes into
1530        // `time_only_exprs` so the implication walker
1531        // (`implied_time_range_from_exprs`, called below) can attempt to drop
1532        // them from the cache key when the partition's `FileTimeRange` is fully
1533        // covered, then stringify them into the fingerprint's `time_filters`
1534        // bucket. Time-only exprs that the extractor doesn't recognize stay in
1535        // `filters` and never get stripped — conservatively correct.
1536        if is_time_only
1537            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1538        {
1539            time_only_exprs.push(expr);
1540        } else {
1541            filters.push(expr.to_string());
1542        }
1543    }
1544
1545    if !has_tag_filter {
1546        // We only cache requests that have tag filters to avoid caching all series.
1547        return None;
1548    }
1549
1550    let implied_time_range =
1551        implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1552    let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1553
1554    // Ensure the filters are sorted for consistent fingerprinting.
1555    filters.sort_unstable();
1556    time_filters.sort_unstable();
1557    let read_columns = input.read_cols.clone();
1558    let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1559        read_column_types: read_columns
1560            .column_ids_iter()
1561            .map(|id| {
1562                metadata
1563                    .column_by_id(id)
1564                    .map(|col| col.column_schema.data_type.clone())
1565            })
1566            .collect(),
1567        read_columns,
1568        filters,
1569        time_filters,
1570        series_row_selector: input.series_row_selector,
1571        append_mode: input.append_mode,
1572        filter_deleted: input.filter_deleted,
1573        merge_mode: input.merge_mode,
1574        partition_expr_version: metadata.partition_expr_version,
1575    }
1576    .build();
1577
1578    Some(ScanFingerprintBundle {
1579        fingerprint,
1580        implied_time_range,
1581    })
1582}
1583
1584/// Context shared by different streams from a scanner.
1585/// It contains the input and ranges to scan.
1586pub struct StreamContext {
1587    /// Input memtables and files.
1588    pub input: ScanInput,
1589    /// Metadata for partition ranges.
1590    pub(crate) ranges: Vec<RangeMeta>,
1591    /// Precomputed scan fingerprint for partition range caching.
1592    /// `None` when the scan is not eligible for caching.
1593    #[allow(dead_code)]
1594    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1595    /// Implied range of every time-only predicate, in the time index column's
1596    /// `TimeUnit`. Used by `build_range_cache_key` to decide whether the
1597    /// partition's `FileTimeRange` is fully covered (allowing `time_filters`
1598    /// to be stripped from the cache key). `None` when caching is ineligible
1599    /// or when the implication walker bailed on an unsupported shape (e.g.
1600    /// `OR`).
1601    pub(crate) scan_implied_time_range: Option<TimestampRange>,
1602
1603    // Metrics:
1604    /// The start time of the query.
1605    pub(crate) query_start: Instant,
1606}
1607
1608impl StreamContext {
1609    /// Creates a new [StreamContext] for [SeqScan].
1610    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1611        let query_start = input.query_start.unwrap_or_else(Instant::now);
1612        let ranges = RangeMeta::seq_scan_ranges(&input);
1613        READ_SST_COUNT.observe(input.num_files() as f64);
1614        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1615            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1616            None => (None, None),
1617        };
1618
1619        Self {
1620            input,
1621            ranges,
1622            scan_fingerprint,
1623            scan_implied_time_range,
1624            query_start,
1625        }
1626    }
1627
1628    /// Creates a new [StreamContext] for [UnorderedScan].
1629    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1630        let query_start = input.query_start.unwrap_or_else(Instant::now);
1631        let ranges = RangeMeta::unordered_scan_ranges(&input);
1632        READ_SST_COUNT.observe(input.num_files() as f64);
1633        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1634            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1635            None => (None, None),
1636        };
1637
1638        Self {
1639            input,
1640            ranges,
1641            scan_fingerprint,
1642            scan_implied_time_range,
1643            query_start,
1644        }
1645    }
1646
1647    /// Returns true if the index refers to a memtable.
1648    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1649        self.input.num_memtables() > index.index
1650    }
1651
1652    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1653        !self.is_mem_range_index(index)
1654            && index.index < self.input.num_files() + self.input.num_memtables()
1655    }
1656
1657    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1658        let range_meta = &self.ranges[part_range.identifier];
1659        let source_count = range_meta.indices.len();
1660
1661        self.input.range_pre_filter_mode(source_count)
1662    }
1663
1664    /// Retrieves the partition ranges.
1665    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1666        self.ranges
1667            .iter()
1668            .enumerate()
1669            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1670            .collect()
1671    }
1672
1673    /// Format the context for explain.
1674    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1675        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1676        for range_meta in &self.ranges {
1677            for idx in &range_meta.row_group_indices {
1678                if self.is_mem_range_index(*idx) {
1679                    num_mem_ranges += 1;
1680                } else if self.is_file_range_index(*idx) {
1681                    num_file_ranges += 1;
1682                } else {
1683                    num_other_ranges += 1;
1684                }
1685            }
1686        }
1687        if verbose {
1688            write!(f, "{{")?;
1689        }
1690        write!(
1691            f,
1692            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1693            self.ranges.len(),
1694            num_mem_ranges,
1695            self.input.num_files(),
1696            num_file_ranges,
1697        )?;
1698        if num_other_ranges > 0 {
1699            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1700        }
1701        write!(f, "}}")?;
1702
1703        if let Some(selector) = &self.input.series_row_selector {
1704            write!(f, ", \"selector\":\"{}\"", selector)?;
1705        }
1706        if let Some(distribution) = &self.input.distribution {
1707            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1708        }
1709
1710        if verbose {
1711            self.format_verbose_content(f)?;
1712        }
1713
1714        Ok(())
1715    }
1716
1717    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1718        struct FileWrapper<'a> {
1719            file: &'a FileHandle,
1720        }
1721
1722        impl fmt::Debug for FileWrapper<'_> {
1723            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724                let (start, end) = self.file.time_range();
1725                write!(
1726                    f,
1727                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1728                    self.file.file_id(),
1729                    start.value(),
1730                    start.unit(),
1731                    end.value(),
1732                    end.unit(),
1733                    self.file.num_rows(),
1734                    self.file.size(),
1735                    self.file.index_size()
1736                )
1737            }
1738        }
1739
1740        struct InputWrapper<'a> {
1741            input: &'a ScanInput,
1742        }
1743
1744        #[cfg(feature = "enterprise")]
1745        impl InputWrapper<'_> {
1746            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1747                if self.input.extension_ranges.is_empty() {
1748                    return Ok(());
1749                }
1750
1751                let mut delimiter = "";
1752                write!(f, ", extension_ranges: [")?;
1753                for range in self.input.extension_ranges() {
1754                    write!(f, "{}{:?}", delimiter, range)?;
1755                    delimiter = ", ";
1756                }
1757                write!(f, "]")?;
1758                Ok(())
1759            }
1760        }
1761
1762        impl fmt::Debug for InputWrapper<'_> {
1763            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1764                let output_schema = self.input.mapper.output_schema();
1765                if !output_schema.is_empty() {
1766                    let names: Vec<_> = output_schema
1767                        .column_schemas()
1768                        .iter()
1769                        .map(|col| &col.name)
1770                        .collect();
1771                    write!(f, ", \"projection\": {:?}", names)?;
1772                }
1773                if let Some(predicate) = &self.input.predicate.predicate() {
1774                    if !predicate.exprs().is_empty() {
1775                        let exprs: Vec<_> =
1776                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1777                        write!(f, ", \"filters\": {:?}", exprs)?;
1778                    }
1779                    if !predicate.dyn_filters().is_empty() {
1780                        let dyn_filters: Vec<_> = predicate
1781                            .dyn_filters()
1782                            .iter()
1783                            .map(|f| format!("{}", f))
1784                            .collect();
1785                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1786                    }
1787                }
1788                #[cfg(feature = "vector_index")]
1789                if let Some(vector_index_k) = self.input.vector_index_k {
1790                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1791                }
1792                if !self.input.files.is_empty() {
1793                    write!(f, ", \"files\": ")?;
1794                    f.debug_list()
1795                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1796                        .finish()?;
1797                }
1798                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1799                #[cfg(feature = "enterprise")]
1800                self.format_extension_ranges(f)?;
1801
1802                Ok(())
1803            }
1804        }
1805
1806        write!(f, "{:?}", InputWrapper { input: &self.input })
1807    }
1808
1809    /// Add new dynamic filters to the predicates.
1810    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1811    pub(crate) fn add_dyn_filter_to_predicate(
1812        self: &Arc<Self>,
1813        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1814    ) -> Vec<bool> {
1815        let mut supported = Vec::with_capacity(filter_exprs.len());
1816        let filter_expr = filter_exprs
1817            .into_iter()
1818            .filter_map(|expr| {
1819                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1820                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1821            {
1822                supported.push(true);
1823                Some(dyn_filter)
1824            } else {
1825                supported.push(false);
1826                None
1827            }
1828            })
1829            .collect();
1830        self.input.predicate.add_dyn_filters(filter_expr);
1831        supported
1832    }
1833}
1834
1835/// Predicates to evaluate.
1836/// It only keeps filters that [SimpleFilterEvaluator] supports.
1837#[derive(Clone, Default)]
1838pub struct PredicateGroup {
1839    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1840    /// Predicate that includes request filters and region partition expr (if any).
1841    predicate_all: Predicate,
1842    /// Predicate that only includes request filters.
1843    predicate_without_region: Predicate,
1844    /// Region partition expression restored from metadata.
1845    region_partition_expr: Option<PartitionExpr>,
1846}
1847
1848impl PredicateGroup {
1849    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1850    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1851        let mut combined_exprs = exprs.to_vec();
1852        let mut region_partition_expr = None;
1853
1854        if let Some(expr_json) = metadata.partition_expr.as_ref()
1855            && !expr_json.is_empty()
1856            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1857                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1858        {
1859            let logical_expr = expr
1860                .try_as_logical_expr()
1861                .context(InvalidPartitionExprSnafu {
1862                    expr: expr_json.clone(),
1863                })?;
1864
1865            combined_exprs.push(logical_expr);
1866            region_partition_expr = Some(expr);
1867        }
1868
1869        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1870        // Columns in the expr.
1871        let mut columns = HashSet::new();
1872        for expr in &combined_exprs {
1873            columns.clear();
1874            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1875                continue;
1876            };
1877            time_filters.push(filter);
1878        }
1879        let time_filters = if time_filters.is_empty() {
1880            None
1881        } else {
1882            Some(Arc::new(time_filters))
1883        };
1884
1885        let predicate_all = Predicate::new(combined_exprs);
1886        let predicate_without_region = Predicate::new(exprs.to_vec());
1887
1888        Ok(Self {
1889            time_filters,
1890            predicate_all,
1891            predicate_without_region,
1892            region_partition_expr,
1893        })
1894    }
1895
1896    /// Returns time filters.
1897    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1898        self.time_filters.clone()
1899    }
1900
1901    /// Returns predicate of all exprs (including region partition expr if present).
1902    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1903        if self.predicate_all.is_empty() {
1904            None
1905        } else {
1906            Some(&self.predicate_all)
1907        }
1908    }
1909
1910    /// Returns predicate that excludes region partition expr.
1911    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1912        if self.predicate_without_region.is_empty() {
1913            None
1914        } else {
1915            Some(&self.predicate_without_region)
1916        }
1917    }
1918
1919    /// Add dynamic filters in the predicates.
1920    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1921        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1922        self.predicate_without_region.add_dyn_filters(dyn_filters);
1923    }
1924
1925    /// Returns the region partition expr from metadata, if any.
1926    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1927        self.region_partition_expr.as_ref()
1928    }
1929
1930    fn expr_to_filter(
1931        expr: &Expr,
1932        metadata: &RegionMetadata,
1933        columns: &mut HashSet<Column>,
1934    ) -> Option<SimpleFilterEvaluator> {
1935        columns.clear();
1936        // `expr_to_columns` won't return error.
1937        // We still ignore these expressions for safety.
1938        expr_to_columns(expr, columns).ok()?;
1939        if columns.len() > 1 {
1940            // Simple filter doesn't support multiple columns.
1941            return None;
1942        }
1943        let column = columns.iter().next()?;
1944        let column_meta = metadata.column_by_name(&column.name)?;
1945        if column_meta.semantic_type == SemanticType::Timestamp {
1946            SimpleFilterEvaluator::try_new(expr)
1947        } else {
1948            None
1949        }
1950    }
1951}
1952
1953#[cfg(test)]
1954mod tests {
1955    use std::sync::Arc;
1956
1957    use common_time::timestamp::{TimeUnit, Timestamp};
1958    use datafusion::physical_plan::expressions::lit as physical_lit;
1959    use datafusion_common::ScalarValue;
1960    use datafusion_expr::{col, lit};
1961    use datatypes::arrow::datatypes::{
1962        DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
1963    };
1964    use datatypes::prelude::ConcreteDataType;
1965    use datatypes::schema::ColumnSchema;
1966    use datatypes::types::json_type::JsonObjectType;
1967    use datatypes::value::Value;
1968    use partition::expr::col as partition_col;
1969    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1970    use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1971
1972    use super::*;
1973    use crate::cache::CacheManager;
1974    use crate::error::InvalidMetadataSnafu;
1975    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1976    use crate::read::read_columns::ReadColumn;
1977    use crate::sst::file::FileMeta;
1978    use crate::test_util::memtable_util::metadata_with_primary_key;
1979    use crate::test_util::scheduler_util::SchedulerEnv;
1980
1981    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1982        let env = SchedulerEnv::new().await;
1983        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1984        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1985        let file = FileHandle::new(
1986            crate::sst::file::FileMeta::default(),
1987            Arc::new(crate::sst::file_purger::NoopFilePurger),
1988        );
1989
1990        ScanInput::new(env.access_layer.clone(), mapper)
1991            .with_predicate(predicate)
1992            .with_cache(CacheStrategy::EnableAll(Arc::new(
1993                CacheManager::builder()
1994                    .range_result_cache_size(1024)
1995                    .build(),
1996            )))
1997            .with_files(vec![file])
1998    }
1999
2000    /// Helper to create a timestamp millisecond literal.
2001    fn ts_lit(val: i64) -> datafusion_expr::Expr {
2002        lit(ScalarValue::TimestampMillisecond(Some(val), None))
2003    }
2004
2005    fn metadata_with_time_index_unit(unit: TimeUnit) -> RegionMetadataRef {
2006        let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2007        builder
2008            .push_column_metadata(ColumnMetadata {
2009                column_schema: ColumnSchema::new(
2010                    "k0".to_string(),
2011                    ConcreteDataType::string_datatype(),
2012                    false,
2013                ),
2014                semantic_type: SemanticType::Tag,
2015                column_id: 0,
2016            })
2017            .push_column_metadata(ColumnMetadata {
2018                column_schema: ColumnSchema::new(
2019                    "k1".to_string(),
2020                    ConcreteDataType::uint32_datatype(),
2021                    false,
2022                ),
2023                semantic_type: SemanticType::Tag,
2024                column_id: 1,
2025            })
2026            .push_column_metadata(ColumnMetadata {
2027                column_schema: ColumnSchema::new(
2028                    "ts".to_string(),
2029                    ConcreteDataType::timestamp_datatype(unit),
2030                    false,
2031                ),
2032                semantic_type: SemanticType::Timestamp,
2033                column_id: 2,
2034            })
2035            .push_column_metadata(ColumnMetadata {
2036                column_schema: ColumnSchema::new(
2037                    "v0".to_string(),
2038                    ConcreteDataType::int64_datatype(),
2039                    true,
2040                ),
2041                semantic_type: SemanticType::Field,
2042                column_id: 3,
2043            })
2044            .primary_key(vec![0, 1]);
2045
2046        Arc::new(builder.build().unwrap())
2047    }
2048
2049    fn file_handle_with_time_range(start: Timestamp, end: Timestamp) -> FileHandle {
2050        FileHandle::new(
2051            FileMeta {
2052                time_range: (start, end),
2053                ..Default::default()
2054            },
2055            Arc::new(crate::sst::file_purger::NoopFilePurger),
2056        )
2057    }
2058
2059    #[test]
2060    fn test_fill_json_nested_paths_from_hint() -> Result<()> {
2061        fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
2062            let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
2063            builder
2064                .push_column_metadata(ColumnMetadata {
2065                    column_schema: ColumnSchema::new(
2066                        "tag".to_string(),
2067                        ConcreteDataType::string_datatype(),
2068                        true,
2069                    ),
2070                    semantic_type: SemanticType::Tag,
2071                    column_id: 0,
2072                })
2073                .push_column_metadata(ColumnMetadata {
2074                    column_schema: ColumnSchema::new(
2075                        "j".to_string(),
2076                        ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
2077                        true,
2078                    ),
2079                    semantic_type: SemanticType::Field,
2080                    column_id: 1,
2081                })
2082                .push_column_metadata(ColumnMetadata {
2083                    column_schema: ColumnSchema::new(
2084                        "ts".to_string(),
2085                        ConcreteDataType::timestamp_millisecond_datatype(),
2086                        false,
2087                    ),
2088                    semantic_type: SemanticType::Timestamp,
2089                    column_id: 2,
2090                });
2091            builder.primary_key(vec![0]);
2092            builder.build().context(InvalidMetadataSnafu).map(Arc::new)
2093        }
2094
2095        let metadata = json_projection_test_metadata()?;
2096        let hint = HashMap::from([(
2097            "j".to_string(),
2098            JsonNativeType::Object(JsonObjectType::from([
2099                ("a".to_string(), JsonNativeType::i64()),
2100                (
2101                    "b".to_string(),
2102                    JsonNativeType::Object(JsonObjectType::from([(
2103                        "c".to_string(),
2104                        JsonNativeType::String,
2105                    )])),
2106                ),
2107            ])),
2108        )]);
2109
2110        fn nested_path(parts: &[&str]) -> NestedPath {
2111            parts.iter().map(|part| part.to_string()).collect()
2112        }
2113
2114        let mut read_columns = ReadColumns {
2115            cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
2116        };
2117        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2118        assert_eq!(
2119            read_columns,
2120            ReadColumns {
2121                cols: vec![
2122                    ReadColumn::new(
2123                        1,
2124                        vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
2125                    ),
2126                    ReadColumn::new(0, vec![])
2127                ]
2128            }
2129        );
2130
2131        let mut read_columns = ReadColumns {
2132            cols: vec![ReadColumn::new(0, vec![])],
2133        };
2134        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2135        assert_eq!(
2136            read_columns,
2137            ReadColumns {
2138                cols: vec![ReadColumn::new(0, vec![])]
2139            }
2140        );
2141        Ok(())
2142    }
2143
2144    #[tokio::test]
2145    async fn test_build_scan_fingerprint_for_eligible_scan() {
2146        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2147        let input = new_scan_input(
2148            metadata.clone(),
2149            vec![
2150                col("ts").gt_eq(ts_lit(1000)),
2151                col("k0").eq(lit("foo")),
2152                col("v0").gt(lit(1)),
2153            ],
2154        )
2155        .await
2156        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2157        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2158        .with_merge_mode(MergeMode::LastNonNull)
2159        .with_filter_deleted(false);
2160
2161        let fingerprint = build_scan_fingerprint(&input).unwrap();
2162
2163        let expected = ScanRequestFingerprintBuilder {
2164            read_columns: input.read_cols,
2165            read_column_types: vec![
2166                metadata
2167                    .column_by_id(0)
2168                    .map(|col| col.column_schema.data_type.clone()),
2169                metadata
2170                    .column_by_id(2)
2171                    .map(|col| col.column_schema.data_type.clone()),
2172                metadata
2173                    .column_by_id(3)
2174                    .map(|col| col.column_schema.data_type.clone()),
2175            ],
2176            filters: vec![
2177                col("k0").eq(lit("foo")).to_string(),
2178                col("v0").gt(lit(1)).to_string(),
2179            ],
2180            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2181            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2182            append_mode: false,
2183            filter_deleted: false,
2184            merge_mode: MergeMode::LastNonNull,
2185            partition_expr_version: 0,
2186        }
2187        .build();
2188        assert_eq!(expected, fingerprint.fingerprint);
2189    }
2190
2191    #[tokio::test]
2192    async fn test_build_scan_fingerprint_requires_tag_filter() {
2193        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2194        let input = new_scan_input(
2195            metadata,
2196            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2197        )
2198        .await;
2199
2200        assert!(build_scan_fingerprint(&input).is_none());
2201    }
2202
2203    #[tokio::test]
2204    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2205        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2206        let filters = vec![col("k0").eq(lit("foo"))];
2207
2208        let disabled = ScanInput::new(
2209            SchedulerEnv::new().await.access_layer.clone(),
2210            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2211        )
2212        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2213        assert!(build_scan_fingerprint(&disabled).is_none());
2214
2215        let compaction = new_scan_input(metadata.clone(), filters.clone())
2216            .await
2217            .with_compaction(true);
2218        assert!(build_scan_fingerprint(&compaction).is_none());
2219
2220        // No files to read.
2221        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2222        assert!(build_scan_fingerprint(&no_files).is_none());
2223    }
2224
2225    #[tokio::test]
2226    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2227        let base = metadata_with_primary_key(vec![0, 1], false);
2228        let mut builder = RegionMetadataBuilder::from_existing(base);
2229        let partition_expr = partition_col("k0")
2230            .gt_eq(Value::String("foo".into()))
2231            .as_json_str()
2232            .unwrap();
2233        builder.partition_expr_json(Some(partition_expr));
2234        let metadata = Arc::new(builder.build_without_validation().unwrap());
2235
2236        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2237        let fingerprint = build_scan_fingerprint(&input).unwrap();
2238
2239        let expected = ScanRequestFingerprintBuilder {
2240            read_columns: input.read_cols,
2241            read_column_types: vec![
2242                metadata
2243                    .column_by_id(0)
2244                    .map(|col| col.column_schema.data_type.clone()),
2245                metadata
2246                    .column_by_id(2)
2247                    .map(|col| col.column_schema.data_type.clone()),
2248                metadata
2249                    .column_by_id(3)
2250                    .map(|col| col.column_schema.data_type.clone()),
2251            ],
2252            filters: vec![col("k0").eq(lit("foo")).to_string()],
2253            time_filters: vec![],
2254            series_row_selector: None,
2255            append_mode: false,
2256            filter_deleted: true,
2257            merge_mode: MergeMode::LastRow,
2258            partition_expr_version: metadata.partition_expr_version,
2259        }
2260        .build();
2261        assert_eq!(expected, fingerprint.fingerprint);
2262        assert_ne!(0, metadata.partition_expr_version);
2263    }
2264
2265    #[test]
2266    fn test_update_dyn_filters_with_empty_base_predicates() {
2267        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2268        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2269        assert!(predicate_group.predicate().is_none());
2270        assert!(predicate_group.predicate_without_region().is_none());
2271
2272        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2273        predicate_group.add_dyn_filters(vec![dyn_filter]);
2274
2275        let predicate_all = predicate_group.predicate().unwrap();
2276        assert!(predicate_all.exprs().is_empty());
2277        assert_eq!(1, predicate_all.dyn_filters().len());
2278
2279        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2280        assert!(predicate_without_region.exprs().is_empty());
2281        assert_eq!(1, predicate_without_region.dyn_filters().len());
2282    }
2283
2284    #[test]
2285    fn test_file_level_pruning_stats_prunes_old_file() {
2286        let ts_col_name = "ts";
2287        let predicate = Predicate::new(vec![col(ts_col_name).gt(ts_lit(1000))]);
2288        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2289            ts_col_name,
2290            ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2291            false,
2292        )]));
2293
2294        // File with time range [0ms, 500ms] is completely before `ts > 1000ms`.
2295        let stats = FileLevelPruningStats {
2296            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2297            max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2298            time_index_col_name: ts_col_name.to_string(),
2299        };
2300        assert_eq!(
2301            vec![false],
2302            predicate.prune_with_stats(&stats, &arrow_schema)
2303        );
2304
2305        // File with time range [0ms, 2000ms] overlaps `ts > 1000ms`, so keep it.
2306        let stats = FileLevelPruningStats {
2307            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2308            max_scalar: ScalarValue::TimestampMillisecond(Some(2000), None),
2309            time_index_col_name: ts_col_name.to_string(),
2310        };
2311        assert_eq!(
2312            vec![true],
2313            predicate.prune_with_stats(&stats, &arrow_schema)
2314        );
2315    }
2316
2317    #[test]
2318    fn test_file_level_pruning_stats_no_predicate_keeps_all() {
2319        let predicate = Predicate::new(vec![]);
2320        assert!(predicate.is_empty());
2321
2322        let stats = FileLevelPruningStats {
2323            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2324            max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2325            time_index_col_name: "ts".to_string(),
2326        };
2327        let arrow_schema = Arc::new(ArrowSchema::new(Vec::<Field>::new()));
2328        assert_eq!(
2329            vec![true],
2330            predicate.prune_with_stats(&stats, &arrow_schema)
2331        );
2332    }
2333
2334    #[tokio::test]
2335    async fn test_file_level_pruning_stats_ceil_max_unit_conversion() {
2336        let metadata = metadata_with_time_index_unit(TimeUnit::Millisecond);
2337        let input = new_scan_input(metadata, vec![]).await;
2338        let file = file_handle_with_time_range(
2339            Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2340            Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2341        );
2342
2343        let stats = input.try_file_level_pruning_stats(&file).unwrap();
2344        assert_eq!(
2345            ScalarValue::TimestampMillisecond(Some(1), None),
2346            stats.min_scalar
2347        );
2348        assert_eq!(
2349            ScalarValue::TimestampMillisecond(Some(2), None),
2350            stats.max_scalar
2351        );
2352
2353        // The actual max timestamp is slightly greater than 1ms. It must be kept for `ts > 1ms`.
2354        let predicate = Predicate::new(vec![col("ts").gt(ts_lit(1))]);
2355        assert_eq!(
2356            vec![true],
2357            predicate.prune_with_stats(&stats, input.mapper.metadata().schema.arrow_schema())
2358        );
2359    }
2360
2361    #[tokio::test]
2362    async fn test_file_level_pruning_stats_overflow_keeps_file() {
2363        let metadata = metadata_with_time_index_unit(TimeUnit::Nanosecond);
2364        let input = new_scan_input(metadata, vec![]).await;
2365        let file = file_handle_with_time_range(
2366            Timestamp::new(0, TimeUnit::Second),
2367            Timestamp::new(i64::MAX, TimeUnit::Second),
2368        );
2369
2370        assert!(input.try_file_level_pruning_stats(&file).is_none());
2371    }
2372
2373    #[test]
2374    fn test_file_level_pruning_stats_keeps_inclusive_boundary() {
2375        let ts_col_name = "ts";
2376        let predicate = Predicate::new(vec![col(ts_col_name).gt_eq(ts_lit(1000))]);
2377        let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2378            ts_col_name,
2379            ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2380            false,
2381        )]));
2382        let stats = FileLevelPruningStats {
2383            min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2384            max_scalar: ScalarValue::TimestampMillisecond(Some(1000), None),
2385            time_index_col_name: ts_col_name.to_string(),
2386        };
2387
2388        assert_eq!(
2389            vec![true],
2390            predicate.prune_with_stats(&stats, &arrow_schema)
2391        );
2392    }
2393
2394    #[tokio::test]
2395    async fn test_file_level_pruning_with_dyn_filter_only_predicate() {
2396        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2397        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2398        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2399        predicate_group.add_dyn_filters(vec![Arc::new(DynamicFilterPhysicalExpr::new(
2400            vec![],
2401            physical_lit(false),
2402        ))]);
2403        let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2404            .with_predicate(predicate_group);
2405        let file = file_handle_with_time_range(
2406            Timestamp::new_millisecond(0),
2407            Timestamp::new_millisecond(1000),
2408        );
2409        let mut reader_metrics = ReaderMetrics::default();
2410
2411        let builder = input
2412            .prune_file(&file, PreFilterMode::SkipFields, &mut reader_metrics)
2413            .await
2414            .unwrap();
2415
2416        assert_eq!(1, reader_metrics.filter_metrics.files_time_range_pruned);
2417        let mut ranges = SmallVec::new();
2418        builder.build_ranges(-1, &mut ranges);
2419        assert!(ranges.is_empty());
2420    }
2421
2422    #[tokio::test]
2423    async fn test_range_pre_filter_mode() {
2424        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2425        let cases = [
2426            (true, MergeMode::LastRow, 1, PreFilterMode::All),
2427            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2428            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2429            (true, MergeMode::LastRow, 2, PreFilterMode::All),
2430        ];
2431
2432        for (append_mode, merge_mode, source_count, expected_mode) in cases {
2433            let input = new_scan_input(metadata.clone(), vec![])
2434                .await
2435                .with_append_mode(append_mode)
2436                .with_merge_mode(merge_mode);
2437
2438            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2439        }
2440    }
2441}