Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::extension::json::is_structured_json_field;
35use datatypes::types::json_type::JsonNativeType;
36use futures::StreamExt;
37use itertools::Itertools;
38use partition::expr::PartitionExpr;
39use smallvec::SmallVec;
40use snafu::ResultExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::region_engine::{PartitionRange, RegionScannerRef};
43use store_api::storage::{
44    NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
45    TimeSeriesRowSelector,
46};
47use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
48use tokio::sync::{Semaphore, mpsc};
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::access_layer::AccessLayerRef;
52use crate::cache::CacheStrategy;
53use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
54use crate::error::{InvalidPartitionExprSnafu, Result};
55#[cfg(feature = "enterprise")]
56use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
57use crate::memtable::{MemtableRange, RangesOptions};
58use crate::metrics::READ_SST_COUNT;
59use crate::read::compat::{self, FlatCompatBatch};
60use crate::read::flat_projection::FlatProjectionMapper;
61use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
62use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
63use crate::read::read_columns::{
64    ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
65    read_columns_from_projection,
66};
67use crate::read::seq_scan::SeqScan;
68use crate::read::series_scan::SeriesScan;
69use crate::read::stream::ScanBatchStream;
70use crate::read::unordered_scan::UnorderedScan;
71use crate::read::{BoxedRecordBatchStream, RecordBatch};
72use crate::region::options::MergeMode;
73use crate::region::version::VersionRef;
74use crate::sst::file::FileHandle;
75use crate::sst::index::bloom_filter::applier::{
76    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
77};
78use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
79use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
80use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
81use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
82#[cfg(feature = "vector_index")]
83use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
84use crate::sst::parquet::file_range::PreFilterMode;
85use crate::sst::parquet::reader::ReaderMetrics;
86
87#[cfg(feature = "vector_index")]
88const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
89
90/// A scanner scans a region and returns a [SendableRecordBatchStream].
91pub(crate) enum Scanner {
92    /// Sequential scan.
93    Seq(SeqScan),
94    /// Unordered scan.
95    Unordered(UnorderedScan),
96    /// Per-series scan.
97    Series(SeriesScan),
98}
99
100impl Scanner {
101    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
102    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
103    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104        match self {
105            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
106            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
107            Scanner::Series(series_scan) => series_scan.build_stream().await,
108        }
109    }
110
111    /// Create a stream of [`Batch`] by this scanner.
112    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
113        match self {
114            Scanner::Seq(x) => x.scan_all_partitions(),
115            Scanner::Unordered(x) => x.scan_all_partitions(),
116            Scanner::Series(x) => x.scan_all_partitions(),
117        }
118    }
119}
120
121#[cfg(test)]
122impl Scanner {
123    /// Returns number of files to scan.
124    pub(crate) fn num_files(&self) -> usize {
125        match self {
126            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
127            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
128            Scanner::Series(series_scan) => series_scan.input().num_files(),
129        }
130    }
131
132    /// Returns number of memtables to scan.
133    pub(crate) fn num_memtables(&self) -> usize {
134        match self {
135            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
136            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
137            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
138        }
139    }
140
141    /// Returns SST file ids to scan.
142    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
143        match self {
144            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
145            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
146            Scanner::Series(series_scan) => series_scan.input().file_ids(),
147        }
148    }
149
150    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
151        match self {
152            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
153            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
154            Scanner::Series(series_scan) => series_scan.input().index_ids(),
155        }
156    }
157
158    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
159        match self {
160            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
161            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
162            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
163        }
164    }
165
166    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
167    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
168        use store_api::region_engine::{PrepareRequest, RegionScanner};
169
170        let request = PrepareRequest::default().with_target_partitions(target_partitions);
171        match self {
172            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
173            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
174            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
175        }
176    }
177}
178
179#[cfg_attr(doc, aquamarine::aquamarine)]
180/// Helper to scans a region by [ScanRequest].
181///
182/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
183/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
184///
185/// ```mermaid
186/// classDiagram
187/// class ScanRegion {
188///     -VersionRef version
189///     -ScanRequest request
190///     ~scanner() Scanner
191///     ~seq_scan() SeqScan
192/// }
193/// class Scanner {
194///     <<enumeration>>
195///     SeqScan
196///     UnorderedScan
197///     +scan() SendableRecordBatchStream
198/// }
199/// class SeqScan {
200///     -ScanInput input
201///     +build() SendableRecordBatchStream
202/// }
203/// class UnorderedScan {
204///     -ScanInput input
205///     +build() SendableRecordBatchStream
206/// }
207/// class ScanInput {
208///     -ProjectionMapper mapper
209///     -Option~TimeRange~ time_range
210///     -Option~Predicate~ predicate
211///     -Vec~MemtableRef~ memtables
212///     -Vec~FileHandle~ files
213/// }
214/// class ProjectionMapper {
215///     ~output_schema() SchemaRef
216///     ~convert(Batch) RecordBatch
217/// }
218/// ScanRegion -- Scanner
219/// ScanRegion o-- ScanRequest
220/// Scanner o-- SeqScan
221/// Scanner o-- UnorderedScan
222/// SeqScan o-- ScanInput
223/// UnorderedScan o-- ScanInput
224/// Scanner -- SendableRecordBatchStream
225/// ScanInput o-- ProjectionMapper
226/// SeqScan -- SendableRecordBatchStream
227/// UnorderedScan -- SendableRecordBatchStream
228/// ```
229pub(crate) struct ScanRegion {
230    /// Version of the region at scan.
231    version: VersionRef,
232    /// Access layer of the region.
233    access_layer: AccessLayerRef,
234    /// Scan request.
235    request: ScanRequest,
236    /// Cache.
237    cache_strategy: CacheStrategy,
238    /// Maximum number of SST files to scan concurrently.
239    max_concurrent_scan_files: usize,
240    /// Whether to ignore inverted index.
241    ignore_inverted_index: bool,
242    /// Whether to ignore fulltext index.
243    ignore_fulltext_index: bool,
244    /// Whether to ignore bloom filter.
245    ignore_bloom_filter: bool,
246    /// Start time of the scan task.
247    start_time: Option<Instant>,
248    /// Whether to filter out the deleted rows.
249    /// Usually true for normal read, and false for scan for compaction.
250    filter_deleted: bool,
251    #[cfg(feature = "enterprise")]
252    extension_range_provider: Option<BoxedExtensionRangeProvider>,
253}
254
255impl ScanRegion {
256    /// Creates a [ScanRegion].
257    pub(crate) fn new(
258        version: VersionRef,
259        access_layer: AccessLayerRef,
260        request: ScanRequest,
261        cache_strategy: CacheStrategy,
262    ) -> ScanRegion {
263        ScanRegion {
264            version,
265            access_layer,
266            request,
267            cache_strategy,
268            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
269            ignore_inverted_index: false,
270            ignore_fulltext_index: false,
271            ignore_bloom_filter: false,
272            start_time: None,
273            filter_deleted: true,
274            #[cfg(feature = "enterprise")]
275            extension_range_provider: None,
276        }
277    }
278
279    /// Sets maximum number of SST files to scan concurrently.
280    #[must_use]
281    pub(crate) fn with_max_concurrent_scan_files(
282        mut self,
283        max_concurrent_scan_files: usize,
284    ) -> Self {
285        self.max_concurrent_scan_files = max_concurrent_scan_files;
286        self
287    }
288
289    /// Sets whether to ignore inverted index.
290    #[must_use]
291    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292        self.ignore_inverted_index = ignore;
293        self
294    }
295
296    /// Sets whether to ignore fulltext index.
297    #[must_use]
298    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299        self.ignore_fulltext_index = ignore;
300        self
301    }
302
303    /// Sets whether to ignore bloom filter.
304    #[must_use]
305    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306        self.ignore_bloom_filter = ignore;
307        self
308    }
309
310    #[must_use]
311    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312        self.start_time = Some(now);
313        self
314    }
315
316    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317        self.filter_deleted = filter_deleted;
318    }
319
320    #[cfg(feature = "enterprise")]
321    pub(crate) fn set_extension_range_provider(
322        &mut self,
323        extension_range_provider: BoxedExtensionRangeProvider,
324    ) {
325        self.extension_range_provider = Some(extension_range_provider);
326    }
327
328    /// Returns a [Scanner] to scan the region.
329    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330    pub(crate) async fn scanner(self) -> Result<Scanner> {
331        if self.use_series_scan() {
332            self.series_scan().await.map(Scanner::Series)
333        } else if self.use_unordered_scan() {
334            // If table is append only and there is no series row selector, we use unordered scan in query.
335            // We still use seq scan in compaction.
336            self.unordered_scan().await.map(Scanner::Unordered)
337        } else {
338            self.seq_scan().await.map(Scanner::Seq)
339        }
340    }
341
342    /// Returns a [RegionScanner] to scan the region.
343    #[tracing::instrument(
344        level = tracing::Level::DEBUG,
345        skip_all,
346        fields(region_id = %self.region_id())
347    )]
348    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349        if self.use_series_scan() {
350            self.series_scan()
351                .await
352                .map(|scanner| Box::new(scanner) as _)
353        } else if self.use_unordered_scan() {
354            self.unordered_scan()
355                .await
356                .map(|scanner| Box::new(scanner) as _)
357        } else {
358            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359        }
360    }
361
362    /// Scan sequentially.
363    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365        let input = self.scan_input().await?.with_compaction(false);
366        Ok(SeqScan::new(input))
367    }
368
369    /// Unordered scan.
370    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372        let input = self.scan_input().await?;
373        Ok(UnorderedScan::new(input))
374    }
375
376    /// Scans by series.
377    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379        let input = self.scan_input().await?;
380        Ok(SeriesScan::new(input))
381    }
382
383    /// Returns true if the region can use unordered scan for current request.
384    fn use_unordered_scan(&self) -> bool {
385        // We use unordered scan when:
386        // 1. The region is in append mode.
387        // 2. There is no series row selector.
388        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
389        //
390        // We still use seq scan in compaction.
391        self.version.options.append_mode
392            && self.request.series_row_selector.is_none()
393            && (self.request.distribution.is_none()
394                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395    }
396
397    /// Returns true if the region can use series scan for current request.
398    fn use_series_scan(&self) -> bool {
399        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400    }
401
402    /// Creates a scan input.
403    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
404    async fn scan_input(self) -> Result<ScanInput> {
405        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
406        let time_range = self.build_time_range_predicate();
407        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
408
409        let mut read_cols = match &self.request.projection_input {
410            Some(p) => {
411                // Read columns include the pushed-down projection and columns
412                // resolved from the predicate.
413                let metadata = &self.version.metadata;
414                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
415                let from_predicate = read_columns_from_predicate(&predicate, metadata);
416                merge(from_projection, from_predicate)
417            }
418            None => {
419                let read_col_ids = self
420                    .version
421                    .metadata
422                    .column_metadatas
423                    .iter()
424                    .map(|col| col.column_id);
425                ReadColumns::from_deduped_column_ids(read_col_ids)
426            }
427        };
428        // Only narrow read columns and pass JSON type hints for structured JSON (JSON2)
429        // columns. Legacy JSONB columns have JSON extension metadata but their physical
430        // Arrow type is Binary, not Struct, so they must not enter structured JSON paths.
431        let has_structured_json = self
432            .version
433            .metadata
434            .schema
435            .arrow_schema()
436            .fields()
437            .iter()
438            .any(is_structured_json_field);
439        if has_structured_json {
440            narrow_read_columns_by_json_type_hint(
441                &mut read_cols,
442                &self.request.json_type_hint,
443                &self.version.metadata,
444            );
445        }
446        let read_col_ids = read_cols.column_ids();
447
448        // The mapper always computes projected column ids as the schema of SSTs may change.
449        let projection = self
450            .request
451            .projection_indices()
452            .map(|x| x.to_vec())
453            .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
454        let json_type_hint = has_structured_json
455            .then_some(&self.request.json_type_hint)
456            .inspect(|json_type_hint| {
457                debug!(
458                    "Concretized JSON type: {{{}}}",
459                    json_type_hint
460                        .iter()
461                        .map(|(k, v)| format!("{}: {}", k, v))
462                        .join(", ")
463                );
464            });
465        let mapper = FlatProjectionMapper::new_with_read_columns(
466            &self.version.metadata,
467            projection,
468            read_cols,
469            json_type_hint,
470        )?;
471
472        let ssts = &self.version.ssts;
473        let mut files = Vec::new();
474        if !self.request.skip_sst_files {
475            for level in ssts.levels() {
476                for file in level.files.values() {
477                    let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
478                        (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
479                        // If the file's sequence is None (or actually is zero), it could mean the file
480                        // is generated and added to the region "directly". In this case, its data should
481                        // be considered as fresh as the memtable. So its sequence is treated greater than
482                        // the min_sequence, whatever the value of min_sequence is. Hence the default
483                        // "true" in this arm.
484                        (Some(_), None) => true,
485                        (None, _) => true,
486                    };
487
488                    // Finds SST files in range.
489                    if exceed_min_sequence && file_in_range(file, &time_range) {
490                        files.push(file.clone());
491                    }
492                    // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
493                    // unless the timing is too good, or the sequence number wouldn't be in file.
494                    // and the batch will be filtered out by tree reader anyway.
495                }
496            }
497        }
498
499        let memtables = self.version.memtables.list_memtables();
500        // Skip empty memtables and memtables out of time range.
501        let mut mem_range_builders = Vec::new();
502        let filter_mode = pre_filter_mode(
503            self.version.options.append_mode,
504            self.version.options.merge_mode(),
505        );
506
507        for m in memtables {
508            // check if memtable is empty by reading stats.
509            let Some((start, end)) = m.stats().time_range() else {
510                continue;
511            };
512            // The time range of the memtable is inclusive.
513            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
514            if !memtable_range.intersects(&time_range) {
515                continue;
516            }
517            let ranges_in_memtable = m.ranges(
518                Some(&read_col_ids),
519                RangesOptions::default()
520                    .with_predicate(predicate.clone())
521                    .with_sequence(SequenceRange::new(
522                        self.request.memtable_min_sequence,
523                        self.request.memtable_max_sequence,
524                    ))
525                    .with_pre_filter_mode(filter_mode),
526            )?;
527            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
528                let stats = v.stats().clone();
529                MemRangeBuilder::new(v, stats)
530            }));
531        }
532
533        let region_id = self.region_id();
534        debug!(
535            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
536            region_id,
537            self.request,
538            time_range,
539            mem_range_builders.len(),
540            files.len(),
541            self.version.options.append_mode,
542        );
543
544        let (non_field_filters, field_filters) = self.partition_by_field_filters();
545        let inverted_index_appliers = [
546            self.build_invereted_index_applier(&non_field_filters),
547            self.build_invereted_index_applier(&field_filters),
548        ];
549        let bloom_filter_appliers = [
550            self.build_bloom_filter_applier(&non_field_filters),
551            self.build_bloom_filter_applier(&field_filters),
552        ];
553        let fulltext_index_appliers = [
554            self.build_fulltext_index_applier(&non_field_filters),
555            self.build_fulltext_index_applier(&field_filters),
556        ];
557        #[cfg(feature = "vector_index")]
558        let vector_index_applier = self.build_vector_index_applier();
559        #[cfg(feature = "vector_index")]
560        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
561            if self.request.filters.is_empty() {
562                search.k
563            } else {
564                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
565            }
566        });
567
568        let input = ScanInput::new(self.access_layer, mapper)
569            .with_time_range(Some(time_range))
570            .with_predicate(predicate)
571            .with_memtables(mem_range_builders)
572            .with_files(files)
573            .with_cache(self.cache_strategy)
574            .with_inverted_index_appliers(inverted_index_appliers)
575            .with_bloom_filter_index_appliers(bloom_filter_appliers)
576            .with_fulltext_index_appliers(fulltext_index_appliers)
577            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
578            .with_start_time(self.start_time)
579            .with_append_mode(self.version.options.append_mode)
580            .with_filter_deleted(self.filter_deleted)
581            .with_merge_mode(self.version.options.merge_mode())
582            .with_series_row_selector(self.request.series_row_selector)
583            .with_distribution(self.request.distribution)
584            .with_explain_flat_format(
585                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
586            )
587            .with_snapshot_sequence(
588                self.request
589                    .snapshot_on_scan
590                    .then_some(self.request.memtable_max_sequence)
591                    .flatten(),
592            );
593        #[cfg(feature = "vector_index")]
594        let input = input
595            .with_vector_index_applier(vector_index_applier)
596            .with_vector_index_k(vector_index_k);
597
598        #[cfg(feature = "enterprise")]
599        let input = if !self.request.skip_sst_files
600            && let Some(provider) = self.extension_range_provider
601        {
602            let ranges = provider
603                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
604                .await?;
605            debug!("Find extension ranges: {ranges:?}");
606            input.with_extension_ranges(ranges)
607        } else {
608            input
609        };
610        Ok(input)
611    }
612
613    fn region_id(&self) -> RegionId {
614        self.version.metadata.region_id
615    }
616
617    /// Build time range predicate from filters.
618    fn build_time_range_predicate(&self) -> TimestampRange {
619        let time_index = self.version.metadata.time_index_column();
620        let unit = time_index
621            .column_schema
622            .data_type
623            .as_timestamp()
624            .expect("Time index must have timestamp-compatible type")
625            .unit();
626        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
627    }
628
629    /// Partitions filters into two groups: non-field filters and field filters.
630    /// Returns `(non_field_filters, field_filters)`.
631    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
632        let field_columns = self
633            .version
634            .metadata
635            .field_columns()
636            .map(|col| &col.column_schema.name)
637            .collect::<HashSet<_>>();
638
639        let mut columns = HashSet::new();
640
641        self.request.filters.iter().cloned().partition(|expr| {
642            columns.clear();
643            // `expr_to_columns` won't return error.
644            if expr_to_columns(expr, &mut columns).is_err() {
645                // If we can't extract columns, treat it as non-field filter
646                return true;
647            }
648            // Return true for non-field filters (partition puts true cases in first vec)
649            !columns
650                .iter()
651                .any(|column| field_columns.contains(&column.name))
652        })
653    }
654
655    /// Use the latest schema to build the inverted index applier.
656    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
657        if self.ignore_inverted_index {
658            return None;
659        }
660
661        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
662        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
663
664        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
665
666        InvertedIndexApplierBuilder::new(
667            self.access_layer.table_dir().to_string(),
668            self.access_layer.path_type(),
669            self.access_layer.object_store().clone(),
670            self.version.metadata.as_ref(),
671            self.version.metadata.inverted_indexed_column_ids(
672                self.version
673                    .options
674                    .index_options
675                    .inverted_index
676                    .ignore_column_ids
677                    .iter(),
678            ),
679            self.access_layer.puffin_manager_factory().clone(),
680        )
681        .with_file_cache(file_cache)
682        .with_inverted_index_cache(inverted_index_cache)
683        .with_puffin_metadata_cache(puffin_metadata_cache)
684        .build(filters)
685        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
686        .ok()
687        .flatten()
688        .map(Arc::new)
689    }
690
691    /// Use the latest schema to build the bloom filter index applier.
692    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
693        if self.ignore_bloom_filter {
694            return None;
695        }
696
697        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
698        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
699        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
700
701        BloomFilterIndexApplierBuilder::new(
702            self.access_layer.table_dir().to_string(),
703            self.access_layer.path_type(),
704            self.access_layer.object_store().clone(),
705            self.version.metadata.as_ref(),
706            self.access_layer.puffin_manager_factory().clone(),
707        )
708        .with_file_cache(file_cache)
709        .with_bloom_filter_index_cache(bloom_filter_index_cache)
710        .with_puffin_metadata_cache(puffin_metadata_cache)
711        .build(filters)
712        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
713        .ok()
714        .flatten()
715        .map(Arc::new)
716    }
717
718    /// Use the latest schema to build the fulltext index applier.
719    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
720        if self.ignore_fulltext_index {
721            return None;
722        }
723
724        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
725        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
726        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
727        FulltextIndexApplierBuilder::new(
728            self.access_layer.table_dir().to_string(),
729            self.access_layer.path_type(),
730            self.access_layer.object_store().clone(),
731            self.access_layer.puffin_manager_factory().clone(),
732            self.version.metadata.as_ref(),
733        )
734        .with_file_cache(file_cache)
735        .with_puffin_metadata_cache(puffin_metadata_cache)
736        .with_bloom_filter_cache(bloom_filter_index_cache)
737        .build(filters)
738        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
739        .ok()
740        .flatten()
741        .map(Arc::new)
742    }
743
744    /// Build the vector index applier from vector search request.
745    #[cfg(feature = "vector_index")]
746    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
747        let vector_search = self.request.vector_search.as_ref()?;
748
749        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
750        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
751        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
752
753        let applier = VectorIndexApplier::new(
754            self.access_layer.table_dir().to_string(),
755            self.access_layer.path_type(),
756            self.access_layer.object_store().clone(),
757            self.access_layer.puffin_manager_factory().clone(),
758            vector_search.column_id,
759            vector_search.query_vector.clone(),
760            vector_search.metric,
761        )
762        .with_file_cache(file_cache)
763        .with_puffin_metadata_cache(puffin_metadata_cache)
764        .with_vector_index_cache(vector_index_cache);
765
766        Some(Arc::new(applier))
767    }
768}
769
770/// Returns true if the time range of a SST `file` matches the `predicate`.
771fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
772    if predicate == &TimestampRange::min_to_max() {
773        return true;
774    }
775    // end timestamp of a SST is inclusive.
776    let (start, end) = file.time_range();
777    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
778    file_ts_range.intersects(predicate)
779}
780
781/// Common input for different scanners.
782pub struct ScanInput {
783    /// Region SST access layer.
784    access_layer: AccessLayerRef,
785    /// Maps projected Batches to RecordBatches.
786    pub(crate) mapper: Arc<FlatProjectionMapper>,
787    /// The columns to read from memtables and SSTs.
788    /// Notice this is different from the columns in `mapper` which are projected columns.
789    /// But this read columns might also include non-projected columns needed for filtering.
790    pub(crate) read_cols: ReadColumns,
791    /// Time range filter for time index.
792    pub(crate) time_range: Option<TimestampRange>,
793    /// Predicate to push down.
794    pub(crate) predicate: PredicateGroup,
795    /// Region partition expr applied at read time.
796    region_partition_expr: Option<PartitionExpr>,
797    /// Memtable range builders for memtables in the time range..
798    pub(crate) memtables: Vec<MemRangeBuilder>,
799    /// Handles to SST files to scan.
800    pub(crate) files: Vec<FileHandle>,
801    /// Cache.
802    pub(crate) cache_strategy: CacheStrategy,
803    /// Ignores file not found error.
804    ignore_file_not_found: bool,
805    /// Maximum number of SST files to scan concurrently.
806    pub(crate) max_concurrent_scan_files: usize,
807    /// Index appliers.
808    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
809    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
810    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
811    /// Vector index applier for KNN search.
812    #[cfg(feature = "vector_index")]
813    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
814    /// Over-fetched k for vector index scan.
815    #[cfg(feature = "vector_index")]
816    pub(crate) vector_index_k: Option<usize>,
817    /// Start time of the query.
818    pub(crate) query_start: Option<Instant>,
819    /// The region is using append mode.
820    pub(crate) append_mode: bool,
821    /// Whether to remove deletion markers.
822    pub(crate) filter_deleted: bool,
823    /// Mode to merge duplicate rows.
824    pub(crate) merge_mode: MergeMode,
825    /// Hint to select rows from time series.
826    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
827    /// Hint for the required distribution of the scanner.
828    pub(crate) distribution: Option<TimeSeriesDistribution>,
829    /// Whether the region's configured SST format is flat.
830    explain_flat_format: bool,
831    /// Snapshot upper bound bound at scan open and propagated back to the caller.
832    pub(crate) snapshot_sequence: Option<SequenceNumber>,
833    /// Whether this scan is for compaction.
834    pub(crate) compaction: bool,
835    #[cfg(feature = "enterprise")]
836    extension_ranges: Vec<BoxedExtensionRange>,
837}
838
839impl ScanInput {
840    /// Creates a new [ScanInput].
841    #[must_use]
842    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
843        ScanInput {
844            access_layer,
845            read_cols: mapper.read_columns().clone(),
846            mapper: Arc::new(mapper),
847            time_range: None,
848            predicate: PredicateGroup::default(),
849            region_partition_expr: None,
850            memtables: Vec::new(),
851            files: Vec::new(),
852            cache_strategy: CacheStrategy::Disabled,
853            ignore_file_not_found: false,
854            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
855            inverted_index_appliers: [None, None],
856            bloom_filter_index_appliers: [None, None],
857            fulltext_index_appliers: [None, None],
858            #[cfg(feature = "vector_index")]
859            vector_index_applier: None,
860            #[cfg(feature = "vector_index")]
861            vector_index_k: None,
862            query_start: None,
863            append_mode: false,
864            filter_deleted: true,
865            merge_mode: MergeMode::default(),
866            series_row_selector: None,
867            distribution: None,
868            explain_flat_format: false,
869            snapshot_sequence: None,
870            compaction: false,
871            #[cfg(feature = "enterprise")]
872            extension_ranges: Vec::new(),
873        }
874    }
875
876    /// Sets time range filter for time index.
877    #[must_use]
878    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
879        self.time_range = time_range;
880        self
881    }
882
883    /// Sets predicate to push down.
884    #[must_use]
885    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
886        self.region_partition_expr = predicate.region_partition_expr().cloned();
887        self.predicate = predicate;
888        self
889    }
890
891    /// Sets memtable range builders.
892    #[must_use]
893    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
894        self.memtables = memtables;
895        self
896    }
897
898    /// Sets files to read.
899    #[must_use]
900    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
901        self.files = files;
902        self
903    }
904
905    /// Sets cache for this query.
906    #[must_use]
907    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
908        self.cache_strategy = cache;
909        self
910    }
911
912    /// Ignores file not found error.
913    #[must_use]
914    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
915        self.ignore_file_not_found = ignore;
916        self
917    }
918
919    /// Sets maximum number of SST files to scan concurrently.
920    #[must_use]
921    pub(crate) fn with_max_concurrent_scan_files(
922        mut self,
923        max_concurrent_scan_files: usize,
924    ) -> Self {
925        self.max_concurrent_scan_files = max_concurrent_scan_files;
926        self
927    }
928
929    /// Sets inverted index appliers.
930    #[must_use]
931    pub(crate) fn with_inverted_index_appliers(
932        mut self,
933        appliers: [Option<InvertedIndexApplierRef>; 2],
934    ) -> Self {
935        self.inverted_index_appliers = appliers;
936        self
937    }
938
939    /// Sets bloom filter appliers.
940    #[must_use]
941    pub(crate) fn with_bloom_filter_index_appliers(
942        mut self,
943        appliers: [Option<BloomFilterIndexApplierRef>; 2],
944    ) -> Self {
945        self.bloom_filter_index_appliers = appliers;
946        self
947    }
948
949    /// Sets fulltext index appliers.
950    #[must_use]
951    pub(crate) fn with_fulltext_index_appliers(
952        mut self,
953        appliers: [Option<FulltextIndexApplierRef>; 2],
954    ) -> Self {
955        self.fulltext_index_appliers = appliers;
956        self
957    }
958
959    /// Sets vector index applier for KNN search.
960    #[cfg(feature = "vector_index")]
961    #[must_use]
962    pub(crate) fn with_vector_index_applier(
963        mut self,
964        applier: Option<VectorIndexApplierRef>,
965    ) -> Self {
966        self.vector_index_applier = applier;
967        self
968    }
969
970    /// Sets over-fetched k for vector index scan.
971    #[cfg(feature = "vector_index")]
972    #[must_use]
973    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
974        self.vector_index_k = k;
975        self
976    }
977
978    /// Sets start time of the query.
979    #[must_use]
980    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
981        self.query_start = now;
982        self
983    }
984
985    #[must_use]
986    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
987        self.append_mode = is_append_mode;
988        self
989    }
990
991    /// Sets whether to remove deletion markers during scan.
992    #[must_use]
993    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
994        self.filter_deleted = filter_deleted;
995        self
996    }
997
998    /// Sets the merge mode.
999    #[must_use]
1000    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1001        self.merge_mode = merge_mode;
1002        self
1003    }
1004
1005    /// Sets the distribution hint.
1006    #[must_use]
1007    pub(crate) fn with_distribution(
1008        mut self,
1009        distribution: Option<TimeSeriesDistribution>,
1010    ) -> Self {
1011        self.distribution = distribution;
1012        self
1013    }
1014
1015    /// Sets whether the region's configured SST format is flat for explain output.
1016    #[must_use]
1017    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1018        self.explain_flat_format = explain_flat_format;
1019        self
1020    }
1021
1022    /// Sets the time series row selector.
1023    #[must_use]
1024    pub(crate) fn with_series_row_selector(
1025        mut self,
1026        series_row_selector: Option<TimeSeriesRowSelector>,
1027    ) -> Self {
1028        self.series_row_selector = series_row_selector;
1029        self
1030    }
1031
1032    #[must_use]
1033    pub(crate) fn with_snapshot_sequence(
1034        mut self,
1035        snapshot_sequence: Option<SequenceNumber>,
1036    ) -> Self {
1037        self.snapshot_sequence = snapshot_sequence;
1038        self
1039    }
1040
1041    /// Sets whether this scan is for compaction.
1042    #[must_use]
1043    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1044        self.compaction = compaction;
1045        self
1046    }
1047
1048    /// Builds memtable ranges to scan by `index`.
1049    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1050        let memtable = &self.memtables[index.index];
1051        let mut ranges = SmallVec::new();
1052        memtable.build_ranges(index.row_group_index, &mut ranges);
1053        ranges
1054    }
1055
1056    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1057        if self.should_skip_region_partition(file) {
1058            self.predicate.predicate_without_region().cloned()
1059        } else {
1060            self.predicate.predicate().cloned()
1061        }
1062    }
1063
1064    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1065        match (
1066            self.region_partition_expr.as_ref(),
1067            file.meta_ref().partition_expr.as_ref(),
1068        ) {
1069            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1070            _ => false,
1071        }
1072    }
1073
1074    /// Prunes a file to scan and returns the builder to build readers.
1075    #[tracing::instrument(
1076        skip_all,
1077        fields(
1078            region_id = %self.region_metadata().region_id,
1079            file_id = %file.file_id()
1080        )
1081    )]
1082    pub async fn prune_file(
1083        &self,
1084        file: &FileHandle,
1085        pre_filter_mode: PreFilterMode,
1086        reader_metrics: &mut ReaderMetrics,
1087    ) -> Result<FileRangeBuilder> {
1088        let predicate = self.predicate_for_file(file);
1089        let may_build_selective_row_selection = predicate.is_some();
1090        let decode_pk_values = !self.compaction
1091            && self
1092                .mapper
1093                .read_columns()
1094                .column_ids_iter()
1095                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1096        let reader = self
1097            .access_layer
1098            .read_sst(file.clone())
1099            .predicate(predicate)
1100            .projection(Some(self.read_cols.clone()))
1101            .cache(self.cache_strategy.clone())
1102            .inverted_index_appliers(self.inverted_index_appliers.clone())
1103            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1104            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1105        let reader = if !self.compaction && may_build_selective_row_selection {
1106            reader.deferred_optional_page_index()
1107        } else {
1108            reader
1109        };
1110        #[cfg(feature = "vector_index")]
1111        let reader = {
1112            let mut reader = reader;
1113            reader =
1114                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1115            reader
1116        };
1117        let res = reader
1118            .expected_metadata(Some(self.mapper.metadata().clone()))
1119            .compaction(self.compaction)
1120            .pre_filter_mode(pre_filter_mode)
1121            .decode_primary_key_values(decode_pk_values)
1122            .build_reader_input(reader_metrics)
1123            .await;
1124        let read_input = match res {
1125            Ok(x) => x,
1126            Err(e) => {
1127                if e.is_object_not_found() && self.ignore_file_not_found {
1128                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1129                    return Ok(FileRangeBuilder::default());
1130                } else {
1131                    return Err(e);
1132                }
1133            }
1134        };
1135
1136        let Some((mut file_range_ctx, selection)) = read_input else {
1137            return Ok(FileRangeBuilder::default());
1138        };
1139
1140        let need_compat = !compat::has_same_columns_and_pk_encoding(
1141            &self.mapper,
1142            file_range_ctx.read_format(),
1143            self.compaction,
1144        );
1145        if need_compat {
1146            // They have different schema. We need to adapt the batch first so the
1147            // mapper can convert it.
1148            let compat = FlatCompatBatch::try_new(
1149                &self.mapper,
1150                file_range_ctx.read_format(),
1151                self.compaction,
1152            )?;
1153            file_range_ctx.set_compat_batch(compat);
1154        }
1155        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1156    }
1157
1158    /// Scans flat sources (RecordBatch streams) in parallel.
1159    ///
1160    /// # Panics if the input doesn't allow parallel scan.
1161    #[tracing::instrument(
1162        skip(self, sources, semaphore),
1163        fields(
1164            region_id = %self.region_metadata().region_id,
1165            source_count = sources.len()
1166        )
1167    )]
1168    pub(crate) fn create_parallel_flat_sources(
1169        &self,
1170        sources: Vec<BoxedRecordBatchStream>,
1171        semaphore: Arc<Semaphore>,
1172        channel_size: usize,
1173    ) -> Result<Vec<BoxedRecordBatchStream>> {
1174        if sources.len() <= 1 {
1175            return Ok(sources);
1176        }
1177
1178        // Spawn a task for each source.
1179        let sources = sources
1180            .into_iter()
1181            .map(|source| {
1182                let (sender, receiver) = mpsc::channel(channel_size);
1183                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1184                let stream = Box::pin(ReceiverStream::new(receiver));
1185                Box::pin(stream) as _
1186            })
1187            .collect();
1188        Ok(sources)
1189    }
1190
1191    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1192    #[tracing::instrument(
1193        skip(self, input, semaphore, sender),
1194        fields(region_id = %self.region_metadata().region_id)
1195    )]
1196    pub(crate) fn spawn_flat_scan_task(
1197        &self,
1198        mut input: BoxedRecordBatchStream,
1199        semaphore: Arc<Semaphore>,
1200        sender: mpsc::Sender<Result<RecordBatch>>,
1201    ) {
1202        let region_id = self.region_metadata().region_id;
1203        let span = tracing::info_span!(
1204            "ScanInput::parallel_scan_task",
1205            region_id = %region_id,
1206            stream_kind = "flat"
1207        );
1208        common_runtime::spawn_query(
1209            async move {
1210                loop {
1211                    // We release the permit before sending result to avoid the task waiting on
1212                    // the channel with the permit held.
1213                    let maybe_batch = {
1214                        // Safety: We never close the semaphore.
1215                        let _permit = semaphore.acquire().await.unwrap();
1216                        input.next().await
1217                    };
1218                    match maybe_batch {
1219                        Some(Ok(batch)) => {
1220                            let _ = sender.send(Ok(batch)).await;
1221                        }
1222                        Some(Err(e)) => {
1223                            let _ = sender.send(Err(e)).await;
1224                            break;
1225                        }
1226                        None => break,
1227                    }
1228                }
1229            }
1230            .instrument(span),
1231        );
1232    }
1233
1234    pub(crate) fn total_rows(&self) -> usize {
1235        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1236        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1237
1238        let rows = rows_in_files + rows_in_memtables;
1239        #[cfg(feature = "enterprise")]
1240        let rows = rows
1241            + self
1242                .extension_ranges
1243                .iter()
1244                .map(|x| x.num_rows())
1245                .sum::<u64>() as usize;
1246        rows
1247    }
1248
1249    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1250        &self.predicate
1251    }
1252
1253    /// Returns number of memtables to scan.
1254    pub(crate) fn num_memtables(&self) -> usize {
1255        self.memtables.len()
1256    }
1257
1258    /// Returns number of SST files to scan.
1259    pub(crate) fn num_files(&self) -> usize {
1260        self.files.len()
1261    }
1262
1263    /// Gets the file handle from a row group index.
1264    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1265        let file_index = index.index - self.num_memtables();
1266        &self.files[file_index]
1267    }
1268
1269    pub fn region_metadata(&self) -> &RegionMetadataRef {
1270        self.mapper.metadata()
1271    }
1272
1273    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1274        if source_count <= 1 {
1275            // Duplicated rows in the same source is not a normal case and we don't provide
1276            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1277            // are exactly identical in the same source so we use PreFilterMode::All for
1278            // performance reason.
1279            return PreFilterMode::All;
1280        }
1281
1282        pre_filter_mode(self.append_mode, self.merge_mode)
1283    }
1284}
1285
1286#[cfg(feature = "enterprise")]
1287impl ScanInput {
1288    #[must_use]
1289    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1290        Self {
1291            extension_ranges,
1292            ..self
1293        }
1294    }
1295
1296    #[cfg(feature = "enterprise")]
1297    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1298        &self.extension_ranges
1299    }
1300
1301    /// Get a boxed [ExtensionRange] by the index in all ranges.
1302    #[cfg(feature = "enterprise")]
1303    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1304        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1305    }
1306}
1307
1308#[cfg(test)]
1309impl ScanInput {
1310    /// Returns SST file ids to scan.
1311    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1312        self.files.iter().map(|file| file.file_id()).collect()
1313    }
1314
1315    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1316        self.files.iter().map(|file| file.index_id()).collect()
1317    }
1318}
1319
1320fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1321    if append_mode {
1322        return PreFilterMode::All;
1323    }
1324
1325    match merge_mode {
1326        MergeMode::LastRow => PreFilterMode::SkipFields,
1327        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1328    }
1329}
1330
1331fn narrow_read_columns_by_json_type_hint(
1332    read_columns: &mut ReadColumns,
1333    json_type_hint: &HashMap<String, JsonNativeType>,
1334    metadata: &RegionMetadata,
1335) {
1336    if json_type_hint.is_empty() {
1337        return;
1338    }
1339
1340    for read_column in &mut read_columns.cols {
1341        let Some(column) = metadata.column_by_id(read_column.column_id) else {
1342            continue;
1343        };
1344        let column_name = &column.column_schema.name;
1345        let Some(json_type) = json_type_hint.get(column_name) else {
1346            continue;
1347        };
1348
1349        let mut paths = Vec::new();
1350        let mut current = vec![column_name.clone()];
1351        collect_json_nested_paths(json_type, &mut current, &mut paths);
1352        merge_nested_paths(&mut read_column.nested_paths, paths)
1353    }
1354}
1355
1356fn collect_json_nested_paths(
1357    json_type: &JsonNativeType,
1358    current: &mut NestedPath,
1359    paths: &mut Vec<NestedPath>,
1360) {
1361    match json_type {
1362        JsonNativeType::Object(fields) if !fields.is_empty() => {
1363            for (field, child) in fields {
1364                current.push(field.clone());
1365                collect_json_nested_paths(child, current, paths);
1366                current.pop();
1367            }
1368        }
1369        _ => paths.push(current.clone()),
1370    }
1371}
1372
1373/// Output of [build_scan_fingerprint]: the cache fingerprint plus the derived
1374/// implied time range used to decide whether the cache key can drop the time
1375/// predicates for a given partition (see `build_range_cache_key`).
1376pub(crate) struct ScanFingerprintBundle {
1377    pub(crate) fingerprint: ScanRequestFingerprint,
1378    /// `Some(r)` = all time-only predicates are guaranteed true on `r` (in the
1379    /// column's `TimeUnit`).
1380    /// `None`    = at least one time-only predicate could not be proven (e.g.
1381    /// `OR`), so the cache-key optimization is disabled for this scan.
1382    pub(crate) implied_time_range: Option<TimestampRange>,
1383}
1384
1385/// Builds a [ScanFingerprintBundle] from a [ScanInput] if the scan is eligible
1386/// for partition range caching.
1387pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1388    let eligible = !input.compaction
1389        && !input.files.is_empty()
1390        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1391
1392    if !eligible {
1393        return None;
1394    }
1395
1396    let metadata = input.region_metadata();
1397    let tag_names: HashSet<&str> = metadata
1398        .column_metadatas
1399        .iter()
1400        .filter(|col| col.semantic_type == SemanticType::Tag)
1401        .map(|col| col.column_schema.name.as_str())
1402        .collect();
1403
1404    let time_index = metadata.time_index_column();
1405    let time_index_name = time_index.column_schema.name.clone();
1406    let ts_col_unit = time_index
1407        .column_schema
1408        .data_type
1409        .as_timestamp()
1410        .expect("Time index must have timestamp-compatible type")
1411        .unit();
1412
1413    let exprs = input
1414        .predicate_group()
1415        .predicate_without_region()
1416        .map(|predicate| predicate.exprs())
1417        .unwrap_or_default();
1418
1419    let mut filters = Vec::new();
1420    let mut time_only_exprs: Vec<&Expr> = Vec::new();
1421    let mut has_tag_filter = false;
1422    let mut columns = HashSet::new();
1423
1424    for expr in exprs {
1425        columns.clear();
1426        let is_time_only = match expr_to_columns(expr, &mut columns) {
1427            Ok(()) if !columns.is_empty() => {
1428                has_tag_filter |= columns
1429                    .iter()
1430                    .any(|col| tag_names.contains(col.name.as_str()));
1431                columns.iter().all(|col| col.name == time_index_name)
1432            }
1433            _ => false,
1434        };
1435
1436        // Route time-only exprs that the legacy extractor recognizes into
1437        // `time_only_exprs` so the implication walker
1438        // (`implied_time_range_from_exprs`, called below) can attempt to drop
1439        // them from the cache key when the partition's `FileTimeRange` is fully
1440        // covered, then stringify them into the fingerprint's `time_filters`
1441        // bucket. Time-only exprs that the extractor doesn't recognize stay in
1442        // `filters` and never get stripped — conservatively correct.
1443        if is_time_only
1444            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1445        {
1446            time_only_exprs.push(expr);
1447        } else {
1448            filters.push(expr.to_string());
1449        }
1450    }
1451
1452    if !has_tag_filter {
1453        // We only cache requests that have tag filters to avoid caching all series.
1454        return None;
1455    }
1456
1457    let implied_time_range =
1458        implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1459    let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1460
1461    // Ensure the filters are sorted for consistent fingerprinting.
1462    filters.sort_unstable();
1463    time_filters.sort_unstable();
1464    let read_columns = input.read_cols.clone();
1465    let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1466        read_column_types: read_columns
1467            .column_ids_iter()
1468            .map(|id| {
1469                metadata
1470                    .column_by_id(id)
1471                    .map(|col| col.column_schema.data_type.clone())
1472            })
1473            .collect(),
1474        read_columns,
1475        filters,
1476        time_filters,
1477        series_row_selector: input.series_row_selector,
1478        append_mode: input.append_mode,
1479        filter_deleted: input.filter_deleted,
1480        merge_mode: input.merge_mode,
1481        partition_expr_version: metadata.partition_expr_version,
1482    }
1483    .build();
1484
1485    Some(ScanFingerprintBundle {
1486        fingerprint,
1487        implied_time_range,
1488    })
1489}
1490
1491/// Context shared by different streams from a scanner.
1492/// It contains the input and ranges to scan.
1493pub struct StreamContext {
1494    /// Input memtables and files.
1495    pub input: ScanInput,
1496    /// Metadata for partition ranges.
1497    pub(crate) ranges: Vec<RangeMeta>,
1498    /// Precomputed scan fingerprint for partition range caching.
1499    /// `None` when the scan is not eligible for caching.
1500    #[allow(dead_code)]
1501    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1502    /// Implied range of every time-only predicate, in the time index column's
1503    /// `TimeUnit`. Used by `build_range_cache_key` to decide whether the
1504    /// partition's `FileTimeRange` is fully covered (allowing `time_filters`
1505    /// to be stripped from the cache key). `None` when caching is ineligible
1506    /// or when the implication walker bailed on an unsupported shape (e.g.
1507    /// `OR`).
1508    pub(crate) scan_implied_time_range: Option<TimestampRange>,
1509
1510    // Metrics:
1511    /// The start time of the query.
1512    pub(crate) query_start: Instant,
1513}
1514
1515impl StreamContext {
1516    /// Creates a new [StreamContext] for [SeqScan].
1517    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1518        let query_start = input.query_start.unwrap_or_else(Instant::now);
1519        let ranges = RangeMeta::seq_scan_ranges(&input);
1520        READ_SST_COUNT.observe(input.num_files() as f64);
1521        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1522            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1523            None => (None, None),
1524        };
1525
1526        Self {
1527            input,
1528            ranges,
1529            scan_fingerprint,
1530            scan_implied_time_range,
1531            query_start,
1532        }
1533    }
1534
1535    /// Creates a new [StreamContext] for [UnorderedScan].
1536    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1537        let query_start = input.query_start.unwrap_or_else(Instant::now);
1538        let ranges = RangeMeta::unordered_scan_ranges(&input);
1539        READ_SST_COUNT.observe(input.num_files() as f64);
1540        let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1541            Some(b) => (Some(b.fingerprint), b.implied_time_range),
1542            None => (None, None),
1543        };
1544
1545        Self {
1546            input,
1547            ranges,
1548            scan_fingerprint,
1549            scan_implied_time_range,
1550            query_start,
1551        }
1552    }
1553
1554    /// Returns true if the index refers to a memtable.
1555    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1556        self.input.num_memtables() > index.index
1557    }
1558
1559    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1560        !self.is_mem_range_index(index)
1561            && index.index < self.input.num_files() + self.input.num_memtables()
1562    }
1563
1564    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1565        let range_meta = &self.ranges[part_range.identifier];
1566        let source_count = range_meta.indices.len();
1567
1568        self.input.range_pre_filter_mode(source_count)
1569    }
1570
1571    /// Retrieves the partition ranges.
1572    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1573        self.ranges
1574            .iter()
1575            .enumerate()
1576            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1577            .collect()
1578    }
1579
1580    /// Format the context for explain.
1581    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1582        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1583        for range_meta in &self.ranges {
1584            for idx in &range_meta.row_group_indices {
1585                if self.is_mem_range_index(*idx) {
1586                    num_mem_ranges += 1;
1587                } else if self.is_file_range_index(*idx) {
1588                    num_file_ranges += 1;
1589                } else {
1590                    num_other_ranges += 1;
1591                }
1592            }
1593        }
1594        if verbose {
1595            write!(f, "{{")?;
1596        }
1597        write!(
1598            f,
1599            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1600            self.ranges.len(),
1601            num_mem_ranges,
1602            self.input.num_files(),
1603            num_file_ranges,
1604        )?;
1605        if num_other_ranges > 0 {
1606            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1607        }
1608        write!(f, "}}")?;
1609
1610        if let Some(selector) = &self.input.series_row_selector {
1611            write!(f, ", \"selector\":\"{}\"", selector)?;
1612        }
1613        if let Some(distribution) = &self.input.distribution {
1614            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1615        }
1616
1617        if verbose {
1618            self.format_verbose_content(f)?;
1619        }
1620
1621        Ok(())
1622    }
1623
1624    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1625        struct FileWrapper<'a> {
1626            file: &'a FileHandle,
1627        }
1628
1629        impl fmt::Debug for FileWrapper<'_> {
1630            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1631                let (start, end) = self.file.time_range();
1632                write!(
1633                    f,
1634                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1635                    self.file.file_id(),
1636                    start.value(),
1637                    start.unit(),
1638                    end.value(),
1639                    end.unit(),
1640                    self.file.num_rows(),
1641                    self.file.size(),
1642                    self.file.index_size()
1643                )
1644            }
1645        }
1646
1647        struct InputWrapper<'a> {
1648            input: &'a ScanInput,
1649        }
1650
1651        #[cfg(feature = "enterprise")]
1652        impl InputWrapper<'_> {
1653            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654                if self.input.extension_ranges.is_empty() {
1655                    return Ok(());
1656                }
1657
1658                let mut delimiter = "";
1659                write!(f, ", extension_ranges: [")?;
1660                for range in self.input.extension_ranges() {
1661                    write!(f, "{}{:?}", delimiter, range)?;
1662                    delimiter = ", ";
1663                }
1664                write!(f, "]")?;
1665                Ok(())
1666            }
1667        }
1668
1669        impl fmt::Debug for InputWrapper<'_> {
1670            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1671                let output_schema = self.input.mapper.output_schema();
1672                if !output_schema.is_empty() {
1673                    let names: Vec<_> = output_schema
1674                        .column_schemas()
1675                        .iter()
1676                        .map(|col| &col.name)
1677                        .collect();
1678                    write!(f, ", \"projection\": {:?}", names)?;
1679                }
1680                if let Some(predicate) = &self.input.predicate.predicate() {
1681                    if !predicate.exprs().is_empty() {
1682                        let exprs: Vec<_> =
1683                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1684                        write!(f, ", \"filters\": {:?}", exprs)?;
1685                    }
1686                    if !predicate.dyn_filters().is_empty() {
1687                        let dyn_filters: Vec<_> = predicate
1688                            .dyn_filters()
1689                            .iter()
1690                            .map(|f| format!("{}", f))
1691                            .collect();
1692                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1693                    }
1694                }
1695                #[cfg(feature = "vector_index")]
1696                if let Some(vector_index_k) = self.input.vector_index_k {
1697                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1698                }
1699                if !self.input.files.is_empty() {
1700                    write!(f, ", \"files\": ")?;
1701                    f.debug_list()
1702                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1703                        .finish()?;
1704                }
1705                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1706                #[cfg(feature = "enterprise")]
1707                self.format_extension_ranges(f)?;
1708
1709                Ok(())
1710            }
1711        }
1712
1713        write!(f, "{:?}", InputWrapper { input: &self.input })
1714    }
1715
1716    /// Add new dynamic filters to the predicates.
1717    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1718    pub(crate) fn add_dyn_filter_to_predicate(
1719        self: &Arc<Self>,
1720        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1721    ) -> Vec<bool> {
1722        let mut supported = Vec::with_capacity(filter_exprs.len());
1723        let filter_expr = filter_exprs
1724            .into_iter()
1725            .filter_map(|expr| {
1726                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1727                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1728            {
1729                supported.push(true);
1730                Some(dyn_filter)
1731            } else {
1732                supported.push(false);
1733                None
1734            }
1735            })
1736            .collect();
1737        self.input.predicate.add_dyn_filters(filter_expr);
1738        supported
1739    }
1740}
1741
1742/// Predicates to evaluate.
1743/// It only keeps filters that [SimpleFilterEvaluator] supports.
1744#[derive(Clone, Default)]
1745pub struct PredicateGroup {
1746    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1747    /// Predicate that includes request filters and region partition expr (if any).
1748    predicate_all: Predicate,
1749    /// Predicate that only includes request filters.
1750    predicate_without_region: Predicate,
1751    /// Region partition expression restored from metadata.
1752    region_partition_expr: Option<PartitionExpr>,
1753}
1754
1755impl PredicateGroup {
1756    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1757    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1758        let mut combined_exprs = exprs.to_vec();
1759        let mut region_partition_expr = None;
1760
1761        if let Some(expr_json) = metadata.partition_expr.as_ref()
1762            && !expr_json.is_empty()
1763            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1764                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1765        {
1766            let logical_expr = expr
1767                .try_as_logical_expr()
1768                .context(InvalidPartitionExprSnafu {
1769                    expr: expr_json.clone(),
1770                })?;
1771
1772            combined_exprs.push(logical_expr);
1773            region_partition_expr = Some(expr);
1774        }
1775
1776        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1777        // Columns in the expr.
1778        let mut columns = HashSet::new();
1779        for expr in &combined_exprs {
1780            columns.clear();
1781            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1782                continue;
1783            };
1784            time_filters.push(filter);
1785        }
1786        let time_filters = if time_filters.is_empty() {
1787            None
1788        } else {
1789            Some(Arc::new(time_filters))
1790        };
1791
1792        let predicate_all = Predicate::new(combined_exprs);
1793        let predicate_without_region = Predicate::new(exprs.to_vec());
1794
1795        Ok(Self {
1796            time_filters,
1797            predicate_all,
1798            predicate_without_region,
1799            region_partition_expr,
1800        })
1801    }
1802
1803    /// Returns time filters.
1804    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1805        self.time_filters.clone()
1806    }
1807
1808    /// Returns predicate of all exprs (including region partition expr if present).
1809    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1810        if self.predicate_all.is_empty() {
1811            None
1812        } else {
1813            Some(&self.predicate_all)
1814        }
1815    }
1816
1817    /// Returns predicate that excludes region partition expr.
1818    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1819        if self.predicate_without_region.is_empty() {
1820            None
1821        } else {
1822            Some(&self.predicate_without_region)
1823        }
1824    }
1825
1826    /// Add dynamic filters in the predicates.
1827    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1828        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1829        self.predicate_without_region.add_dyn_filters(dyn_filters);
1830    }
1831
1832    /// Returns the region partition expr from metadata, if any.
1833    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1834        self.region_partition_expr.as_ref()
1835    }
1836
1837    fn expr_to_filter(
1838        expr: &Expr,
1839        metadata: &RegionMetadata,
1840        columns: &mut HashSet<Column>,
1841    ) -> Option<SimpleFilterEvaluator> {
1842        columns.clear();
1843        // `expr_to_columns` won't return error.
1844        // We still ignore these expressions for safety.
1845        expr_to_columns(expr, columns).ok()?;
1846        if columns.len() > 1 {
1847            // Simple filter doesn't support multiple columns.
1848            return None;
1849        }
1850        let column = columns.iter().next()?;
1851        let column_meta = metadata.column_by_name(&column.name)?;
1852        if column_meta.semantic_type == SemanticType::Timestamp {
1853            SimpleFilterEvaluator::try_new(expr)
1854        } else {
1855            None
1856        }
1857    }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862    use std::sync::Arc;
1863
1864    use datafusion::physical_plan::expressions::lit as physical_lit;
1865    use datafusion_common::ScalarValue;
1866    use datafusion_expr::{col, lit};
1867    use datatypes::prelude::ConcreteDataType;
1868    use datatypes::schema::ColumnSchema;
1869    use datatypes::types::json_type::JsonObjectType;
1870    use datatypes::value::Value;
1871    use partition::expr::col as partition_col;
1872    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1873    use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1874
1875    use super::*;
1876    use crate::cache::CacheManager;
1877    use crate::error::InvalidMetadataSnafu;
1878    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1879    use crate::read::read_columns::ReadColumn;
1880    use crate::test_util::memtable_util::metadata_with_primary_key;
1881    use crate::test_util::scheduler_util::SchedulerEnv;
1882
1883    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1884        let env = SchedulerEnv::new().await;
1885        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1886        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1887        let file = FileHandle::new(
1888            crate::sst::file::FileMeta::default(),
1889            Arc::new(crate::sst::file_purger::NoopFilePurger),
1890        );
1891
1892        ScanInput::new(env.access_layer.clone(), mapper)
1893            .with_predicate(predicate)
1894            .with_cache(CacheStrategy::EnableAll(Arc::new(
1895                CacheManager::builder()
1896                    .range_result_cache_size(1024)
1897                    .build(),
1898            )))
1899            .with_files(vec![file])
1900    }
1901
1902    /// Helper to create a timestamp millisecond literal.
1903    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1904        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1905    }
1906
1907    #[test]
1908    fn test_fill_json_nested_paths_from_hint() -> Result<()> {
1909        fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
1910            let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
1911            builder
1912                .push_column_metadata(ColumnMetadata {
1913                    column_schema: ColumnSchema::new(
1914                        "tag".to_string(),
1915                        ConcreteDataType::string_datatype(),
1916                        true,
1917                    ),
1918                    semantic_type: SemanticType::Tag,
1919                    column_id: 0,
1920                })
1921                .push_column_metadata(ColumnMetadata {
1922                    column_schema: ColumnSchema::new(
1923                        "j".to_string(),
1924                        ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
1925                        true,
1926                    ),
1927                    semantic_type: SemanticType::Field,
1928                    column_id: 1,
1929                })
1930                .push_column_metadata(ColumnMetadata {
1931                    column_schema: ColumnSchema::new(
1932                        "ts".to_string(),
1933                        ConcreteDataType::timestamp_millisecond_datatype(),
1934                        false,
1935                    ),
1936                    semantic_type: SemanticType::Timestamp,
1937                    column_id: 2,
1938                });
1939            builder.primary_key(vec![0]);
1940            builder.build().context(InvalidMetadataSnafu).map(Arc::new)
1941        }
1942
1943        let metadata = json_projection_test_metadata()?;
1944        let hint = HashMap::from([(
1945            "j".to_string(),
1946            JsonNativeType::Object(JsonObjectType::from([
1947                ("a".to_string(), JsonNativeType::i64()),
1948                (
1949                    "b".to_string(),
1950                    JsonNativeType::Object(JsonObjectType::from([(
1951                        "c".to_string(),
1952                        JsonNativeType::String,
1953                    )])),
1954                ),
1955            ])),
1956        )]);
1957
1958        fn nested_path(parts: &[&str]) -> NestedPath {
1959            parts.iter().map(|part| part.to_string()).collect()
1960        }
1961
1962        let mut read_columns = ReadColumns {
1963            cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
1964        };
1965        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1966        assert_eq!(
1967            read_columns,
1968            ReadColumns {
1969                cols: vec![
1970                    ReadColumn::new(
1971                        1,
1972                        vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
1973                    ),
1974                    ReadColumn::new(0, vec![])
1975                ]
1976            }
1977        );
1978
1979        let mut read_columns = ReadColumns {
1980            cols: vec![ReadColumn::new(0, vec![])],
1981        };
1982        narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1983        assert_eq!(
1984            read_columns,
1985            ReadColumns {
1986                cols: vec![ReadColumn::new(0, vec![])]
1987            }
1988        );
1989        Ok(())
1990    }
1991
1992    #[tokio::test]
1993    async fn test_build_scan_fingerprint_for_eligible_scan() {
1994        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1995        let input = new_scan_input(
1996            metadata.clone(),
1997            vec![
1998                col("ts").gt_eq(ts_lit(1000)),
1999                col("k0").eq(lit("foo")),
2000                col("v0").gt(lit(1)),
2001            ],
2002        )
2003        .await
2004        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2005        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2006        .with_merge_mode(MergeMode::LastNonNull)
2007        .with_filter_deleted(false);
2008
2009        let fingerprint = build_scan_fingerprint(&input).unwrap();
2010
2011        let expected = ScanRequestFingerprintBuilder {
2012            read_columns: input.read_cols,
2013            read_column_types: vec![
2014                metadata
2015                    .column_by_id(0)
2016                    .map(|col| col.column_schema.data_type.clone()),
2017                metadata
2018                    .column_by_id(2)
2019                    .map(|col| col.column_schema.data_type.clone()),
2020                metadata
2021                    .column_by_id(3)
2022                    .map(|col| col.column_schema.data_type.clone()),
2023            ],
2024            filters: vec![
2025                col("k0").eq(lit("foo")).to_string(),
2026                col("v0").gt(lit(1)).to_string(),
2027            ],
2028            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2029            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2030            append_mode: false,
2031            filter_deleted: false,
2032            merge_mode: MergeMode::LastNonNull,
2033            partition_expr_version: 0,
2034        }
2035        .build();
2036        assert_eq!(expected, fingerprint.fingerprint);
2037    }
2038
2039    #[tokio::test]
2040    async fn test_build_scan_fingerprint_requires_tag_filter() {
2041        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2042        let input = new_scan_input(
2043            metadata,
2044            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2045        )
2046        .await;
2047
2048        assert!(build_scan_fingerprint(&input).is_none());
2049    }
2050
2051    #[tokio::test]
2052    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2053        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2054        let filters = vec![col("k0").eq(lit("foo"))];
2055
2056        let disabled = ScanInput::new(
2057            SchedulerEnv::new().await.access_layer.clone(),
2058            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2059        )
2060        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2061        assert!(build_scan_fingerprint(&disabled).is_none());
2062
2063        let compaction = new_scan_input(metadata.clone(), filters.clone())
2064            .await
2065            .with_compaction(true);
2066        assert!(build_scan_fingerprint(&compaction).is_none());
2067
2068        // No files to read.
2069        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2070        assert!(build_scan_fingerprint(&no_files).is_none());
2071    }
2072
2073    #[tokio::test]
2074    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2075        let base = metadata_with_primary_key(vec![0, 1], false);
2076        let mut builder = RegionMetadataBuilder::from_existing(base);
2077        let partition_expr = partition_col("k0")
2078            .gt_eq(Value::String("foo".into()))
2079            .as_json_str()
2080            .unwrap();
2081        builder.partition_expr_json(Some(partition_expr));
2082        let metadata = Arc::new(builder.build_without_validation().unwrap());
2083
2084        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2085        let fingerprint = build_scan_fingerprint(&input).unwrap();
2086
2087        let expected = ScanRequestFingerprintBuilder {
2088            read_columns: input.read_cols,
2089            read_column_types: vec![
2090                metadata
2091                    .column_by_id(0)
2092                    .map(|col| col.column_schema.data_type.clone()),
2093                metadata
2094                    .column_by_id(2)
2095                    .map(|col| col.column_schema.data_type.clone()),
2096                metadata
2097                    .column_by_id(3)
2098                    .map(|col| col.column_schema.data_type.clone()),
2099            ],
2100            filters: vec![col("k0").eq(lit("foo")).to_string()],
2101            time_filters: vec![],
2102            series_row_selector: None,
2103            append_mode: false,
2104            filter_deleted: true,
2105            merge_mode: MergeMode::LastRow,
2106            partition_expr_version: metadata.partition_expr_version,
2107        }
2108        .build();
2109        assert_eq!(expected, fingerprint.fingerprint);
2110        assert_ne!(0, metadata.partition_expr_version);
2111    }
2112
2113    #[test]
2114    fn test_update_dyn_filters_with_empty_base_predicates() {
2115        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2116        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2117        assert!(predicate_group.predicate().is_none());
2118        assert!(predicate_group.predicate_without_region().is_none());
2119
2120        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2121        predicate_group.add_dyn_filters(vec![dyn_filter]);
2122
2123        let predicate_all = predicate_group.predicate().unwrap();
2124        assert!(predicate_all.exprs().is_empty());
2125        assert_eq!(1, predicate_all.dyn_filters().len());
2126
2127        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2128        assert!(predicate_without_region.exprs().is_empty());
2129        assert_eq!(1, predicate_without_region.dyn_filters().len());
2130    }
2131
2132    #[tokio::test]
2133    async fn test_range_pre_filter_mode() {
2134        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2135        let cases = [
2136            (true, MergeMode::LastRow, 1, PreFilterMode::All),
2137            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2138            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2139            (true, MergeMode::LastRow, 2, PreFilterMode::All),
2140        ];
2141
2142        for (append_mode, merge_mode, source_count, expected_mode) in cases {
2143            let input = new_scan_input(metadata.clone(), vec![])
2144                .await
2145                .with_append_mode(append_mode)
2146                .with_merge_mode(merge_mode);
2147
2148            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2149        }
2150    }
2151}