Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41    ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
42    TimeSeriesRowSelector,
43};
44use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
45use tokio::sync::{Semaphore, mpsc};
46use tokio_stream::wrappers::ReceiverStream;
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::CacheStrategy;
50use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
51use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
52#[cfg(feature = "enterprise")]
53use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
54use crate::memtable::{MemtableRange, RangesOptions};
55use crate::metrics::READ_SST_COUNT;
56use crate::read::compat::{self, FlatCompatBatch};
57use crate::read::flat_projection::FlatProjectionMapper;
58use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
59use crate::read::range_cache::ScanRequestFingerprint;
60use crate::read::seq_scan::SeqScan;
61use crate::read::series_scan::SeriesScan;
62use crate::read::stream::ScanBatchStream;
63use crate::read::unordered_scan::UnorderedScan;
64use crate::read::{BoxedRecordBatchStream, RecordBatch};
65use crate::region::options::MergeMode;
66use crate::region::version::VersionRef;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
70};
71use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
72use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
73use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
74use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
75#[cfg(feature = "vector_index")]
76use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
77use crate::sst::parquet::file_range::PreFilterMode;
78use crate::sst::parquet::reader::ReaderMetrics;
79
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83/// A scanner scans a region and returns a [SendableRecordBatchStream].
84pub(crate) enum Scanner {
85    /// Sequential scan.
86    Seq(SeqScan),
87    /// Unordered scan.
88    Unordered(UnorderedScan),
89    /// Per-series scan.
90    Series(SeriesScan),
91}
92
93impl Scanner {
94    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
95    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97        match self {
98            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100            Scanner::Series(series_scan) => series_scan.build_stream().await,
101        }
102    }
103
104    /// Create a stream of [`Batch`] by this scanner.
105    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106        match self {
107            Scanner::Seq(x) => x.scan_all_partitions(),
108            Scanner::Unordered(x) => x.scan_all_partitions(),
109            Scanner::Series(x) => x.scan_all_partitions(),
110        }
111    }
112}
113
114#[cfg(test)]
115impl Scanner {
116    /// Returns number of files to scan.
117    pub(crate) fn num_files(&self) -> usize {
118        match self {
119            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121            Scanner::Series(series_scan) => series_scan.input().num_files(),
122        }
123    }
124
125    /// Returns number of memtables to scan.
126    pub(crate) fn num_memtables(&self) -> usize {
127        match self {
128            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131        }
132    }
133
134    /// Returns SST file ids to scan.
135    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136        match self {
137            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139            Scanner::Series(series_scan) => series_scan.input().file_ids(),
140        }
141    }
142
143    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144        match self {
145            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147            Scanner::Series(series_scan) => series_scan.input().index_ids(),
148        }
149    }
150
151    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
152        match self {
153            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
154            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
155            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
156        }
157    }
158
159    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
160    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
161        use store_api::region_engine::{PrepareRequest, RegionScanner};
162
163        let request = PrepareRequest::default().with_target_partitions(target_partitions);
164        match self {
165            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
166            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
167            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
168        }
169    }
170}
171
172#[cfg_attr(doc, aquamarine::aquamarine)]
173/// Helper to scans a region by [ScanRequest].
174///
175/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
176/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
177///
178/// ```mermaid
179/// classDiagram
180/// class ScanRegion {
181///     -VersionRef version
182///     -ScanRequest request
183///     ~scanner() Scanner
184///     ~seq_scan() SeqScan
185/// }
186/// class Scanner {
187///     <<enumeration>>
188///     SeqScan
189///     UnorderedScan
190///     +scan() SendableRecordBatchStream
191/// }
192/// class SeqScan {
193///     -ScanInput input
194///     +build() SendableRecordBatchStream
195/// }
196/// class UnorderedScan {
197///     -ScanInput input
198///     +build() SendableRecordBatchStream
199/// }
200/// class ScanInput {
201///     -ProjectionMapper mapper
202///     -Option~TimeRange~ time_range
203///     -Option~Predicate~ predicate
204///     -Vec~MemtableRef~ memtables
205///     -Vec~FileHandle~ files
206/// }
207/// class ProjectionMapper {
208///     ~output_schema() SchemaRef
209///     ~convert(Batch) RecordBatch
210/// }
211/// ScanRegion -- Scanner
212/// ScanRegion o-- ScanRequest
213/// Scanner o-- SeqScan
214/// Scanner o-- UnorderedScan
215/// SeqScan o-- ScanInput
216/// UnorderedScan o-- ScanInput
217/// Scanner -- SendableRecordBatchStream
218/// ScanInput o-- ProjectionMapper
219/// SeqScan -- SendableRecordBatchStream
220/// UnorderedScan -- SendableRecordBatchStream
221/// ```
222pub(crate) struct ScanRegion {
223    /// Version of the region at scan.
224    version: VersionRef,
225    /// Access layer of the region.
226    access_layer: AccessLayerRef,
227    /// Scan request.
228    request: ScanRequest,
229    /// Cache.
230    cache_strategy: CacheStrategy,
231    /// Maximum number of SST files to scan concurrently.
232    max_concurrent_scan_files: usize,
233    /// Whether to ignore inverted index.
234    ignore_inverted_index: bool,
235    /// Whether to ignore fulltext index.
236    ignore_fulltext_index: bool,
237    /// Whether to ignore bloom filter.
238    ignore_bloom_filter: bool,
239    /// Start time of the scan task.
240    start_time: Option<Instant>,
241    /// Whether to filter out the deleted rows.
242    /// Usually true for normal read, and false for scan for compaction.
243    filter_deleted: bool,
244    #[cfg(feature = "enterprise")]
245    extension_range_provider: Option<BoxedExtensionRangeProvider>,
246}
247
248impl ScanRegion {
249    /// Creates a [ScanRegion].
250    pub(crate) fn new(
251        version: VersionRef,
252        access_layer: AccessLayerRef,
253        request: ScanRequest,
254        cache_strategy: CacheStrategy,
255    ) -> ScanRegion {
256        ScanRegion {
257            version,
258            access_layer,
259            request,
260            cache_strategy,
261            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
262            ignore_inverted_index: false,
263            ignore_fulltext_index: false,
264            ignore_bloom_filter: false,
265            start_time: None,
266            filter_deleted: true,
267            #[cfg(feature = "enterprise")]
268            extension_range_provider: None,
269        }
270    }
271
272    /// Sets maximum number of SST files to scan concurrently.
273    #[must_use]
274    pub(crate) fn with_max_concurrent_scan_files(
275        mut self,
276        max_concurrent_scan_files: usize,
277    ) -> Self {
278        self.max_concurrent_scan_files = max_concurrent_scan_files;
279        self
280    }
281
282    /// Sets whether to ignore inverted index.
283    #[must_use]
284    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
285        self.ignore_inverted_index = ignore;
286        self
287    }
288
289    /// Sets whether to ignore fulltext index.
290    #[must_use]
291    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
292        self.ignore_fulltext_index = ignore;
293        self
294    }
295
296    /// Sets whether to ignore bloom filter.
297    #[must_use]
298    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
299        self.ignore_bloom_filter = ignore;
300        self
301    }
302
303    #[must_use]
304    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
305        self.start_time = Some(now);
306        self
307    }
308
309    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
310        self.filter_deleted = filter_deleted;
311    }
312
313    #[cfg(feature = "enterprise")]
314    pub(crate) fn set_extension_range_provider(
315        &mut self,
316        extension_range_provider: BoxedExtensionRangeProvider,
317    ) {
318        self.extension_range_provider = Some(extension_range_provider);
319    }
320
321    /// Returns a [Scanner] to scan the region.
322    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
323    pub(crate) async fn scanner(self) -> Result<Scanner> {
324        if self.use_series_scan() {
325            self.series_scan().await.map(Scanner::Series)
326        } else if self.use_unordered_scan() {
327            // If table is append only and there is no series row selector, we use unordered scan in query.
328            // We still use seq scan in compaction.
329            self.unordered_scan().await.map(Scanner::Unordered)
330        } else {
331            self.seq_scan().await.map(Scanner::Seq)
332        }
333    }
334
335    /// Returns a [RegionScanner] to scan the region.
336    #[tracing::instrument(
337        level = tracing::Level::DEBUG,
338        skip_all,
339        fields(region_id = %self.region_id())
340    )]
341    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
342        if self.use_series_scan() {
343            self.series_scan()
344                .await
345                .map(|scanner| Box::new(scanner) as _)
346        } else if self.use_unordered_scan() {
347            self.unordered_scan()
348                .await
349                .map(|scanner| Box::new(scanner) as _)
350        } else {
351            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
352        }
353    }
354
355    /// Scan sequentially.
356    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
357    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
358        let input = self.scan_input().await?.with_compaction(false);
359        Ok(SeqScan::new(input))
360    }
361
362    /// Unordered scan.
363    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
365        let input = self.scan_input().await?;
366        Ok(UnorderedScan::new(input))
367    }
368
369    /// Scans by series.
370    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
372        let input = self.scan_input().await?;
373        Ok(SeriesScan::new(input))
374    }
375
376    /// Returns true if the region can use unordered scan for current request.
377    fn use_unordered_scan(&self) -> bool {
378        // We use unordered scan when:
379        // 1. The region is in append mode.
380        // 2. There is no series row selector.
381        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
382        //
383        // We still use seq scan in compaction.
384        self.version.options.append_mode
385            && self.request.series_row_selector.is_none()
386            && (self.request.distribution.is_none()
387                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
388    }
389
390    /// Returns true if the region can use series scan for current request.
391    fn use_series_scan(&self) -> bool {
392        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
393    }
394
395    /// Creates a scan input.
396    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
397    async fn scan_input(self) -> Result<ScanInput> {
398        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
399        let time_range = self.build_time_range_predicate();
400        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
401
402        let read_column_ids = match self.request.projection_indices() {
403            Some(p) => self.build_read_column_ids(p, &predicate)?,
404            None => self
405                .version
406                .metadata
407                .column_metadatas
408                .iter()
409                .map(|col| col.column_id)
410                .collect(),
411        };
412
413        // The mapper always computes projected column ids as the schema of SSTs may change.
414        let mapper = match self.request.projection_indices() {
415            Some(p) => FlatProjectionMapper::new_with_read_columns(
416                &self.version.metadata,
417                p.to_vec(),
418                read_column_ids.clone(),
419            )?,
420            None => FlatProjectionMapper::all(&self.version.metadata)?,
421        };
422
423        let ssts = &self.version.ssts;
424        let mut files = Vec::new();
425        for level in ssts.levels() {
426            for file in level.files.values() {
427                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
428                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
429                    // If the file's sequence is None (or actually is zero), it could mean the file
430                    // is generated and added to the region "directly". In this case, its data should
431                    // be considered as fresh as the memtable. So its sequence is treated greater than
432                    // the min_sequence, whatever the value of min_sequence is. Hence the default
433                    // "true" in this arm.
434                    (Some(_), None) => true,
435                    (None, _) => true,
436                };
437
438                // Finds SST files in range.
439                if exceed_min_sequence && file_in_range(file, &time_range) {
440                    files.push(file.clone());
441                }
442                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
443                // unless the timing is too good, or the sequence number wouldn't be in file.
444                // and the batch will be filtered out by tree reader anyway.
445            }
446        }
447
448        let memtables = self.version.memtables.list_memtables();
449        // Skip empty memtables and memtables out of time range.
450        let mut mem_range_builders = Vec::new();
451        let filter_mode = pre_filter_mode(
452            self.version.options.append_mode,
453            self.version.options.merge_mode(),
454        );
455
456        for m in memtables {
457            // check if memtable is empty by reading stats.
458            let Some((start, end)) = m.stats().time_range() else {
459                continue;
460            };
461            // The time range of the memtable is inclusive.
462            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
463            if !memtable_range.intersects(&time_range) {
464                continue;
465            }
466            let ranges_in_memtable = m.ranges(
467                Some(read_column_ids.as_slice()),
468                RangesOptions::default()
469                    .with_predicate(predicate.clone())
470                    .with_sequence(SequenceRange::new(
471                        self.request.memtable_min_sequence,
472                        self.request.memtable_max_sequence,
473                    ))
474                    .with_pre_filter_mode(filter_mode),
475            )?;
476            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
477                let stats = v.stats().clone();
478                MemRangeBuilder::new(v, stats)
479            }));
480        }
481
482        let region_id = self.region_id();
483        debug!(
484            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
485            region_id,
486            self.request,
487            time_range,
488            mem_range_builders.len(),
489            files.len(),
490            self.version.options.append_mode,
491        );
492
493        let (non_field_filters, field_filters) = self.partition_by_field_filters();
494        let inverted_index_appliers = [
495            self.build_invereted_index_applier(&non_field_filters),
496            self.build_invereted_index_applier(&field_filters),
497        ];
498        let bloom_filter_appliers = [
499            self.build_bloom_filter_applier(&non_field_filters),
500            self.build_bloom_filter_applier(&field_filters),
501        ];
502        let fulltext_index_appliers = [
503            self.build_fulltext_index_applier(&non_field_filters),
504            self.build_fulltext_index_applier(&field_filters),
505        ];
506        #[cfg(feature = "vector_index")]
507        let vector_index_applier = self.build_vector_index_applier();
508        #[cfg(feature = "vector_index")]
509        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
510            if self.request.filters.is_empty() {
511                search.k
512            } else {
513                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
514            }
515        });
516
517        let input = ScanInput::new(self.access_layer, mapper)
518            .with_time_range(Some(time_range))
519            .with_predicate(predicate)
520            .with_memtables(mem_range_builders)
521            .with_files(files)
522            .with_cache(self.cache_strategy)
523            .with_inverted_index_appliers(inverted_index_appliers)
524            .with_bloom_filter_index_appliers(bloom_filter_appliers)
525            .with_fulltext_index_appliers(fulltext_index_appliers)
526            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
527            .with_start_time(self.start_time)
528            .with_append_mode(self.version.options.append_mode)
529            .with_filter_deleted(self.filter_deleted)
530            .with_merge_mode(self.version.options.merge_mode())
531            .with_series_row_selector(self.request.series_row_selector)
532            .with_distribution(self.request.distribution)
533            .with_explain_flat_format(
534                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
535            )
536            .with_snapshot_sequence(
537                self.request
538                    .snapshot_on_scan
539                    .then_some(self.request.memtable_max_sequence)
540                    .flatten(),
541            );
542        #[cfg(feature = "vector_index")]
543        let input = input
544            .with_vector_index_applier(vector_index_applier)
545            .with_vector_index_k(vector_index_k);
546
547        #[cfg(feature = "enterprise")]
548        let input = if let Some(provider) = self.extension_range_provider {
549            let ranges = provider
550                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
551                .await?;
552            debug!("Find extension ranges: {ranges:?}");
553            input.with_extension_ranges(ranges)
554        } else {
555            input
556        };
557        Ok(input)
558    }
559
560    fn region_id(&self) -> RegionId {
561        self.version.metadata.region_id
562    }
563
564    /// Build time range predicate from filters.
565    fn build_time_range_predicate(&self) -> TimestampRange {
566        let time_index = self.version.metadata.time_index_column();
567        let unit = time_index
568            .column_schema
569            .data_type
570            .as_timestamp()
571            .expect("Time index must have timestamp-compatible type")
572            .unit();
573        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
574    }
575
576    /// Return all columns id to read according to the projection and filters.
577    fn build_read_column_ids(
578        &self,
579        projection: &[usize],
580        predicate: &PredicateGroup,
581    ) -> Result<Vec<ColumnId>> {
582        let metadata = &self.version.metadata;
583        // use Vec for read_column_ids to keep the order of columns.
584        let mut read_column_ids = Vec::new();
585        let mut seen = HashSet::new();
586
587        for idx in projection {
588            let column =
589                metadata
590                    .column_metadatas
591                    .get(*idx)
592                    .with_context(|| InvalidRequestSnafu {
593                        region_id: metadata.region_id,
594                        reason: format!("projection index {} is out of bound", idx),
595                    })?;
596            seen.insert(column.column_id);
597            // keep the projection order
598            read_column_ids.push(column.column_id);
599        }
600
601        if projection.is_empty() {
602            let time_index = metadata.time_index_column().column_id;
603            if seen.insert(time_index) {
604                read_column_ids.push(time_index);
605            }
606        }
607
608        let mut extra_names = HashSet::new();
609        let mut columns = HashSet::new();
610
611        for expr in &self.request.filters {
612            columns.clear();
613            if expr_to_columns(expr, &mut columns).is_err() {
614                continue;
615            }
616            extra_names.extend(columns.iter().map(|column| column.name.clone()));
617        }
618
619        if let Some(expr) = predicate.region_partition_expr() {
620            expr.collect_column_names(&mut extra_names);
621        }
622
623        if !extra_names.is_empty() {
624            for column in &metadata.column_metadatas {
625                if extra_names.contains(column.column_schema.name.as_str())
626                    && !seen.contains(&column.column_id)
627                {
628                    read_column_ids.push(column.column_id);
629                }
630                extra_names.remove(column.column_schema.name.as_str());
631            }
632            if !extra_names.is_empty() {
633                warn!(
634                    "Some columns in filters are not found in region {}: {:?}",
635                    metadata.region_id, extra_names
636                );
637            }
638        }
639        Ok(read_column_ids)
640    }
641
642    /// Partitions filters into two groups: non-field filters and field filters.
643    /// Returns `(non_field_filters, field_filters)`.
644    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
645        let field_columns = self
646            .version
647            .metadata
648            .field_columns()
649            .map(|col| &col.column_schema.name)
650            .collect::<HashSet<_>>();
651
652        let mut columns = HashSet::new();
653
654        self.request.filters.iter().cloned().partition(|expr| {
655            columns.clear();
656            // `expr_to_columns` won't return error.
657            if expr_to_columns(expr, &mut columns).is_err() {
658                // If we can't extract columns, treat it as non-field filter
659                return true;
660            }
661            // Return true for non-field filters (partition puts true cases in first vec)
662            !columns
663                .iter()
664                .any(|column| field_columns.contains(&column.name))
665        })
666    }
667
668    /// Use the latest schema to build the inverted index applier.
669    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
670        if self.ignore_inverted_index {
671            return None;
672        }
673
674        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
675        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
676
677        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
678
679        InvertedIndexApplierBuilder::new(
680            self.access_layer.table_dir().to_string(),
681            self.access_layer.path_type(),
682            self.access_layer.object_store().clone(),
683            self.version.metadata.as_ref(),
684            self.version.metadata.inverted_indexed_column_ids(
685                self.version
686                    .options
687                    .index_options
688                    .inverted_index
689                    .ignore_column_ids
690                    .iter(),
691            ),
692            self.access_layer.puffin_manager_factory().clone(),
693        )
694        .with_file_cache(file_cache)
695        .with_inverted_index_cache(inverted_index_cache)
696        .with_puffin_metadata_cache(puffin_metadata_cache)
697        .build(filters)
698        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
699        .ok()
700        .flatten()
701        .map(Arc::new)
702    }
703
704    /// Use the latest schema to build the bloom filter index applier.
705    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
706        if self.ignore_bloom_filter {
707            return None;
708        }
709
710        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
711        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
712        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
713
714        BloomFilterIndexApplierBuilder::new(
715            self.access_layer.table_dir().to_string(),
716            self.access_layer.path_type(),
717            self.access_layer.object_store().clone(),
718            self.version.metadata.as_ref(),
719            self.access_layer.puffin_manager_factory().clone(),
720        )
721        .with_file_cache(file_cache)
722        .with_bloom_filter_index_cache(bloom_filter_index_cache)
723        .with_puffin_metadata_cache(puffin_metadata_cache)
724        .build(filters)
725        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
726        .ok()
727        .flatten()
728        .map(Arc::new)
729    }
730
731    /// Use the latest schema to build the fulltext index applier.
732    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
733        if self.ignore_fulltext_index {
734            return None;
735        }
736
737        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
738        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
739        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
740        FulltextIndexApplierBuilder::new(
741            self.access_layer.table_dir().to_string(),
742            self.access_layer.path_type(),
743            self.access_layer.object_store().clone(),
744            self.access_layer.puffin_manager_factory().clone(),
745            self.version.metadata.as_ref(),
746        )
747        .with_file_cache(file_cache)
748        .with_puffin_metadata_cache(puffin_metadata_cache)
749        .with_bloom_filter_cache(bloom_filter_index_cache)
750        .build(filters)
751        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
752        .ok()
753        .flatten()
754        .map(Arc::new)
755    }
756
757    /// Build the vector index applier from vector search request.
758    #[cfg(feature = "vector_index")]
759    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
760        let vector_search = self.request.vector_search.as_ref()?;
761
762        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
763        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
764        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
765
766        let applier = VectorIndexApplier::new(
767            self.access_layer.table_dir().to_string(),
768            self.access_layer.path_type(),
769            self.access_layer.object_store().clone(),
770            self.access_layer.puffin_manager_factory().clone(),
771            vector_search.column_id,
772            vector_search.query_vector.clone(),
773            vector_search.metric,
774        )
775        .with_file_cache(file_cache)
776        .with_puffin_metadata_cache(puffin_metadata_cache)
777        .with_vector_index_cache(vector_index_cache);
778
779        Some(Arc::new(applier))
780    }
781}
782
783/// Returns true if the time range of a SST `file` matches the `predicate`.
784fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
785    if predicate == &TimestampRange::min_to_max() {
786        return true;
787    }
788    // end timestamp of a SST is inclusive.
789    let (start, end) = file.time_range();
790    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
791    file_ts_range.intersects(predicate)
792}
793
794/// Common input for different scanners.
795pub struct ScanInput {
796    /// Region SST access layer.
797    access_layer: AccessLayerRef,
798    /// Maps projected Batches to RecordBatches.
799    pub(crate) mapper: Arc<FlatProjectionMapper>,
800    /// Column ids to read from memtables and SSTs.
801    /// Notice this is different from the columns in `mapper` which are projected columns.
802    /// But this read columns might also include non-projected columns needed for filtering.
803    pub(crate) read_column_ids: Vec<ColumnId>,
804    /// Time range filter for time index.
805    pub(crate) time_range: Option<TimestampRange>,
806    /// Predicate to push down.
807    pub(crate) predicate: PredicateGroup,
808    /// Region partition expr applied at read time.
809    region_partition_expr: Option<PartitionExpr>,
810    /// Memtable range builders for memtables in the time range..
811    pub(crate) memtables: Vec<MemRangeBuilder>,
812    /// Handles to SST files to scan.
813    pub(crate) files: Vec<FileHandle>,
814    /// Cache.
815    pub(crate) cache_strategy: CacheStrategy,
816    /// Ignores file not found error.
817    ignore_file_not_found: bool,
818    /// Maximum number of SST files to scan concurrently.
819    pub(crate) max_concurrent_scan_files: usize,
820    /// Index appliers.
821    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
822    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
823    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
824    /// Vector index applier for KNN search.
825    #[cfg(feature = "vector_index")]
826    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
827    /// Over-fetched k for vector index scan.
828    #[cfg(feature = "vector_index")]
829    pub(crate) vector_index_k: Option<usize>,
830    /// Start time of the query.
831    pub(crate) query_start: Option<Instant>,
832    /// The region is using append mode.
833    pub(crate) append_mode: bool,
834    /// Whether to remove deletion markers.
835    pub(crate) filter_deleted: bool,
836    /// Mode to merge duplicate rows.
837    pub(crate) merge_mode: MergeMode,
838    /// Hint to select rows from time series.
839    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
840    /// Hint for the required distribution of the scanner.
841    pub(crate) distribution: Option<TimeSeriesDistribution>,
842    /// Whether the region's configured SST format is flat.
843    explain_flat_format: bool,
844    /// Snapshot upper bound bound at scan open and propagated back to the caller.
845    pub(crate) snapshot_sequence: Option<SequenceNumber>,
846    /// Whether this scan is for compaction.
847    pub(crate) compaction: bool,
848    #[cfg(feature = "enterprise")]
849    extension_ranges: Vec<BoxedExtensionRange>,
850}
851
852impl ScanInput {
853    /// Creates a new [ScanInput].
854    #[must_use]
855    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
856        ScanInput {
857            access_layer,
858            read_column_ids: mapper.column_ids().to_vec(),
859            mapper: Arc::new(mapper),
860            time_range: None,
861            predicate: PredicateGroup::default(),
862            region_partition_expr: None,
863            memtables: Vec::new(),
864            files: Vec::new(),
865            cache_strategy: CacheStrategy::Disabled,
866            ignore_file_not_found: false,
867            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
868            inverted_index_appliers: [None, None],
869            bloom_filter_index_appliers: [None, None],
870            fulltext_index_appliers: [None, None],
871            #[cfg(feature = "vector_index")]
872            vector_index_applier: None,
873            #[cfg(feature = "vector_index")]
874            vector_index_k: None,
875            query_start: None,
876            append_mode: false,
877            filter_deleted: true,
878            merge_mode: MergeMode::default(),
879            series_row_selector: None,
880            distribution: None,
881            explain_flat_format: false,
882            snapshot_sequence: None,
883            compaction: false,
884            #[cfg(feature = "enterprise")]
885            extension_ranges: Vec::new(),
886        }
887    }
888
889    /// Sets time range filter for time index.
890    #[must_use]
891    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
892        self.time_range = time_range;
893        self
894    }
895
896    /// Sets predicate to push down.
897    #[must_use]
898    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
899        self.region_partition_expr = predicate.region_partition_expr().cloned();
900        self.predicate = predicate;
901        self
902    }
903
904    /// Sets memtable range builders.
905    #[must_use]
906    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
907        self.memtables = memtables;
908        self
909    }
910
911    /// Sets files to read.
912    #[must_use]
913    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
914        self.files = files;
915        self
916    }
917
918    /// Sets cache for this query.
919    #[must_use]
920    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
921        self.cache_strategy = cache;
922        self
923    }
924
925    /// Ignores file not found error.
926    #[must_use]
927    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
928        self.ignore_file_not_found = ignore;
929        self
930    }
931
932    /// Sets maximum number of SST files to scan concurrently.
933    #[must_use]
934    pub(crate) fn with_max_concurrent_scan_files(
935        mut self,
936        max_concurrent_scan_files: usize,
937    ) -> Self {
938        self.max_concurrent_scan_files = max_concurrent_scan_files;
939        self
940    }
941
942    /// Sets inverted index appliers.
943    #[must_use]
944    pub(crate) fn with_inverted_index_appliers(
945        mut self,
946        appliers: [Option<InvertedIndexApplierRef>; 2],
947    ) -> Self {
948        self.inverted_index_appliers = appliers;
949        self
950    }
951
952    /// Sets bloom filter appliers.
953    #[must_use]
954    pub(crate) fn with_bloom_filter_index_appliers(
955        mut self,
956        appliers: [Option<BloomFilterIndexApplierRef>; 2],
957    ) -> Self {
958        self.bloom_filter_index_appliers = appliers;
959        self
960    }
961
962    /// Sets fulltext index appliers.
963    #[must_use]
964    pub(crate) fn with_fulltext_index_appliers(
965        mut self,
966        appliers: [Option<FulltextIndexApplierRef>; 2],
967    ) -> Self {
968        self.fulltext_index_appliers = appliers;
969        self
970    }
971
972    /// Sets vector index applier for KNN search.
973    #[cfg(feature = "vector_index")]
974    #[must_use]
975    pub(crate) fn with_vector_index_applier(
976        mut self,
977        applier: Option<VectorIndexApplierRef>,
978    ) -> Self {
979        self.vector_index_applier = applier;
980        self
981    }
982
983    /// Sets over-fetched k for vector index scan.
984    #[cfg(feature = "vector_index")]
985    #[must_use]
986    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
987        self.vector_index_k = k;
988        self
989    }
990
991    /// Sets start time of the query.
992    #[must_use]
993    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
994        self.query_start = now;
995        self
996    }
997
998    #[must_use]
999    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1000        self.append_mode = is_append_mode;
1001        self
1002    }
1003
1004    /// Sets whether to remove deletion markers during scan.
1005    #[must_use]
1006    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1007        self.filter_deleted = filter_deleted;
1008        self
1009    }
1010
1011    /// Sets the merge mode.
1012    #[must_use]
1013    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1014        self.merge_mode = merge_mode;
1015        self
1016    }
1017
1018    /// Sets the distribution hint.
1019    #[must_use]
1020    pub(crate) fn with_distribution(
1021        mut self,
1022        distribution: Option<TimeSeriesDistribution>,
1023    ) -> Self {
1024        self.distribution = distribution;
1025        self
1026    }
1027
1028    /// Sets whether the region's configured SST format is flat for explain output.
1029    #[must_use]
1030    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1031        self.explain_flat_format = explain_flat_format;
1032        self
1033    }
1034
1035    /// Sets the time series row selector.
1036    #[must_use]
1037    pub(crate) fn with_series_row_selector(
1038        mut self,
1039        series_row_selector: Option<TimeSeriesRowSelector>,
1040    ) -> Self {
1041        self.series_row_selector = series_row_selector;
1042        self
1043    }
1044
1045    #[must_use]
1046    pub(crate) fn with_snapshot_sequence(
1047        mut self,
1048        snapshot_sequence: Option<SequenceNumber>,
1049    ) -> Self {
1050        self.snapshot_sequence = snapshot_sequence;
1051        self
1052    }
1053
1054    /// Sets whether this scan is for compaction.
1055    #[must_use]
1056    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1057        self.compaction = compaction;
1058        self
1059    }
1060
1061    /// Builds memtable ranges to scan by `index`.
1062    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1063        let memtable = &self.memtables[index.index];
1064        let mut ranges = SmallVec::new();
1065        memtable.build_ranges(index.row_group_index, &mut ranges);
1066        ranges
1067    }
1068
1069    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1070        if self.should_skip_region_partition(file) {
1071            self.predicate.predicate_without_region().cloned()
1072        } else {
1073            self.predicate.predicate().cloned()
1074        }
1075    }
1076
1077    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1078        match (
1079            self.region_partition_expr.as_ref(),
1080            file.meta_ref().partition_expr.as_ref(),
1081        ) {
1082            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1083            _ => false,
1084        }
1085    }
1086
1087    /// Prunes a file to scan and returns the builder to build readers.
1088    #[tracing::instrument(
1089        skip_all,
1090        fields(
1091            region_id = %self.region_metadata().region_id,
1092            file_id = %file.file_id()
1093        )
1094    )]
1095    pub async fn prune_file(
1096        &self,
1097        file: &FileHandle,
1098        pre_filter_mode: PreFilterMode,
1099        reader_metrics: &mut ReaderMetrics,
1100    ) -> Result<FileRangeBuilder> {
1101        let predicate = self.predicate_for_file(file);
1102        let decode_pk_values = !self.compaction
1103            && self
1104                .mapper
1105                .column_ids()
1106                .iter()
1107                .any(|column_id| self.mapper.metadata().primary_key.contains(column_id));
1108        let reader = self
1109            .access_layer
1110            .read_sst(file.clone())
1111            .predicate(predicate)
1112            .projection(Some(self.read_column_ids.clone()))
1113            .cache(self.cache_strategy.clone())
1114            .inverted_index_appliers(self.inverted_index_appliers.clone())
1115            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1116            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1117        #[cfg(feature = "vector_index")]
1118        let reader = {
1119            let mut reader = reader;
1120            reader =
1121                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1122            reader
1123        };
1124        let res = reader
1125            .expected_metadata(Some(self.mapper.metadata().clone()))
1126            .compaction(self.compaction)
1127            .pre_filter_mode(pre_filter_mode)
1128            .decode_primary_key_values(decode_pk_values)
1129            .build_reader_input(reader_metrics)
1130            .await;
1131        let read_input = match res {
1132            Ok(x) => x,
1133            Err(e) => {
1134                if e.is_object_not_found() && self.ignore_file_not_found {
1135                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1136                    return Ok(FileRangeBuilder::default());
1137                } else {
1138                    return Err(e);
1139                }
1140            }
1141        };
1142
1143        let Some((mut file_range_ctx, selection)) = read_input else {
1144            return Ok(FileRangeBuilder::default());
1145        };
1146
1147        let need_compat = !compat::has_same_columns_and_pk_encoding(
1148            self.mapper.metadata(),
1149            file_range_ctx.read_format().metadata(),
1150        );
1151        if need_compat {
1152            // They have different schema. We need to adapt the batch first so the
1153            // mapper can convert it.
1154            let compat = FlatCompatBatch::try_new(
1155                &self.mapper,
1156                file_range_ctx.read_format().metadata(),
1157                file_range_ctx.read_format().format_projection(),
1158                self.compaction,
1159            )?;
1160            file_range_ctx.set_compat_batch(compat);
1161        }
1162        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1163    }
1164
1165    /// Scans flat sources (RecordBatch streams) in parallel.
1166    ///
1167    /// # Panics if the input doesn't allow parallel scan.
1168    #[tracing::instrument(
1169        skip(self, sources, semaphore),
1170        fields(
1171            region_id = %self.region_metadata().region_id,
1172            source_count = sources.len()
1173        )
1174    )]
1175    pub(crate) fn create_parallel_flat_sources(
1176        &self,
1177        sources: Vec<BoxedRecordBatchStream>,
1178        semaphore: Arc<Semaphore>,
1179        channel_size: usize,
1180    ) -> Result<Vec<BoxedRecordBatchStream>> {
1181        if sources.len() <= 1 {
1182            return Ok(sources);
1183        }
1184
1185        // Spawn a task for each source.
1186        let sources = sources
1187            .into_iter()
1188            .map(|source| {
1189                let (sender, receiver) = mpsc::channel(channel_size);
1190                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1191                let stream = Box::pin(ReceiverStream::new(receiver));
1192                Box::pin(stream) as _
1193            })
1194            .collect();
1195        Ok(sources)
1196    }
1197
1198    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1199    #[tracing::instrument(
1200        skip(self, input, semaphore, sender),
1201        fields(region_id = %self.region_metadata().region_id)
1202    )]
1203    pub(crate) fn spawn_flat_scan_task(
1204        &self,
1205        mut input: BoxedRecordBatchStream,
1206        semaphore: Arc<Semaphore>,
1207        sender: mpsc::Sender<Result<RecordBatch>>,
1208    ) {
1209        let region_id = self.region_metadata().region_id;
1210        let span = tracing::info_span!(
1211            "ScanInput::parallel_scan_task",
1212            region_id = %region_id,
1213            stream_kind = "flat"
1214        );
1215        common_runtime::spawn_global(
1216            async move {
1217                loop {
1218                    // We release the permit before sending result to avoid the task waiting on
1219                    // the channel with the permit held.
1220                    let maybe_batch = {
1221                        // Safety: We never close the semaphore.
1222                        let _permit = semaphore.acquire().await.unwrap();
1223                        input.next().await
1224                    };
1225                    match maybe_batch {
1226                        Some(Ok(batch)) => {
1227                            let _ = sender.send(Ok(batch)).await;
1228                        }
1229                        Some(Err(e)) => {
1230                            let _ = sender.send(Err(e)).await;
1231                            break;
1232                        }
1233                        None => break,
1234                    }
1235                }
1236            }
1237            .instrument(span),
1238        );
1239    }
1240
1241    pub(crate) fn total_rows(&self) -> usize {
1242        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1243        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1244
1245        let rows = rows_in_files + rows_in_memtables;
1246        #[cfg(feature = "enterprise")]
1247        let rows = rows
1248            + self
1249                .extension_ranges
1250                .iter()
1251                .map(|x| x.num_rows())
1252                .sum::<u64>() as usize;
1253        rows
1254    }
1255
1256    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1257        &self.predicate
1258    }
1259
1260    /// Returns number of memtables to scan.
1261    pub(crate) fn num_memtables(&self) -> usize {
1262        self.memtables.len()
1263    }
1264
1265    /// Returns number of SST files to scan.
1266    pub(crate) fn num_files(&self) -> usize {
1267        self.files.len()
1268    }
1269
1270    /// Gets the file handle from a row group index.
1271    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1272        let file_index = index.index - self.num_memtables();
1273        &self.files[file_index]
1274    }
1275
1276    pub fn region_metadata(&self) -> &RegionMetadataRef {
1277        self.mapper.metadata()
1278    }
1279
1280    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1281        if source_count <= 1 {
1282            // Duplicated rows in the same source is not a normal case and we don't provide
1283            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1284            // are exactly identical in the same source so we use PreFilterMode::All for
1285            // performance reason.
1286            return PreFilterMode::All;
1287        }
1288
1289        pre_filter_mode(self.append_mode, self.merge_mode)
1290    }
1291}
1292
1293#[cfg(feature = "enterprise")]
1294impl ScanInput {
1295    #[must_use]
1296    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1297        Self {
1298            extension_ranges,
1299            ..self
1300        }
1301    }
1302
1303    #[cfg(feature = "enterprise")]
1304    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1305        &self.extension_ranges
1306    }
1307
1308    /// Get a boxed [ExtensionRange] by the index in all ranges.
1309    #[cfg(feature = "enterprise")]
1310    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1311        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1312    }
1313}
1314
1315#[cfg(test)]
1316impl ScanInput {
1317    /// Returns SST file ids to scan.
1318    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1319        self.files.iter().map(|file| file.file_id()).collect()
1320    }
1321
1322    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1323        self.files.iter().map(|file| file.index_id()).collect()
1324    }
1325}
1326
1327fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1328    if append_mode {
1329        return PreFilterMode::All;
1330    }
1331
1332    match merge_mode {
1333        MergeMode::LastRow => PreFilterMode::SkipFields,
1334        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1335    }
1336}
1337
1338/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1339/// for partition range caching.
1340pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1341    let eligible = !input.compaction
1342        && !input.files.is_empty()
1343        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1344
1345    if !eligible {
1346        return None;
1347    }
1348
1349    let metadata = input.region_metadata();
1350    let tag_names: HashSet<&str> = metadata
1351        .column_metadatas
1352        .iter()
1353        .filter(|col| col.semantic_type == SemanticType::Tag)
1354        .map(|col| col.column_schema.name.as_str())
1355        .collect();
1356
1357    let time_index = metadata.time_index_column();
1358    let time_index_name = time_index.column_schema.name.clone();
1359    let ts_col_unit = time_index
1360        .column_schema
1361        .data_type
1362        .as_timestamp()
1363        .expect("Time index must have timestamp-compatible type")
1364        .unit();
1365
1366    let exprs = input
1367        .predicate_group()
1368        .predicate_without_region()
1369        .map(|predicate| predicate.exprs())
1370        .unwrap_or_default();
1371
1372    let mut filters = Vec::new();
1373    let mut time_filters = Vec::new();
1374    let mut has_tag_filter = false;
1375    let mut columns = HashSet::new();
1376
1377    for expr in exprs {
1378        columns.clear();
1379        let is_time_only = match expr_to_columns(expr, &mut columns) {
1380            Ok(()) if !columns.is_empty() => {
1381                has_tag_filter |= columns
1382                    .iter()
1383                    .any(|col| tag_names.contains(col.name.as_str()));
1384                columns.iter().all(|col| col.name == time_index_name)
1385            }
1386            _ => false,
1387        };
1388
1389        if is_time_only
1390            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1391        {
1392            // Range-reducible time predicates can be safely dropped from the
1393            // cache key when the query time range covers the partition range.
1394            time_filters.push(expr.to_string());
1395        } else {
1396            // Non-time filters and non-range time predicates (those that
1397            // extract_time_range_from_expr cannot convert to a TimestampRange)
1398            // always stay in the cache key.
1399            filters.push(expr.to_string());
1400        }
1401    }
1402
1403    if !has_tag_filter {
1404        // We only cache requests that have tag filters to avoid caching all series.
1405        return None;
1406    }
1407
1408    // Ensure the filters are sorted for consistent fingerprinting.
1409    filters.sort_unstable();
1410    time_filters.sort_unstable();
1411
1412    Some(
1413        crate::read::range_cache::ScanRequestFingerprintBuilder {
1414            read_column_ids: input.read_column_ids.clone(),
1415            read_column_types: input
1416                .read_column_ids
1417                .iter()
1418                .map(|id| {
1419                    metadata
1420                        .column_by_id(*id)
1421                        .map(|col| col.column_schema.data_type.clone())
1422                })
1423                .collect(),
1424            filters,
1425            time_filters,
1426            series_row_selector: input.series_row_selector,
1427            append_mode: input.append_mode,
1428            filter_deleted: input.filter_deleted,
1429            merge_mode: input.merge_mode,
1430            partition_expr_version: metadata.partition_expr_version,
1431        }
1432        .build(),
1433    )
1434}
1435
1436/// Context shared by different streams from a scanner.
1437/// It contains the input and ranges to scan.
1438pub struct StreamContext {
1439    /// Input memtables and files.
1440    pub input: ScanInput,
1441    /// Metadata for partition ranges.
1442    pub(crate) ranges: Vec<RangeMeta>,
1443    /// Precomputed scan fingerprint for partition range caching.
1444    /// `None` when the scan is not eligible for caching.
1445    #[allow(dead_code)]
1446    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1447
1448    // Metrics:
1449    /// The start time of the query.
1450    pub(crate) query_start: Instant,
1451}
1452
1453impl StreamContext {
1454    /// Creates a new [StreamContext] for [SeqScan].
1455    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1456        let query_start = input.query_start.unwrap_or_else(Instant::now);
1457        let ranges = RangeMeta::seq_scan_ranges(&input);
1458        READ_SST_COUNT.observe(input.num_files() as f64);
1459        let scan_fingerprint = build_scan_fingerprint(&input);
1460
1461        Self {
1462            input,
1463            ranges,
1464            scan_fingerprint,
1465            query_start,
1466        }
1467    }
1468
1469    /// Creates a new [StreamContext] for [UnorderedScan].
1470    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1471        let query_start = input.query_start.unwrap_or_else(Instant::now);
1472        let ranges = RangeMeta::unordered_scan_ranges(&input);
1473        READ_SST_COUNT.observe(input.num_files() as f64);
1474        let scan_fingerprint = build_scan_fingerprint(&input);
1475
1476        Self {
1477            input,
1478            ranges,
1479            scan_fingerprint,
1480            query_start,
1481        }
1482    }
1483
1484    /// Returns true if the index refers to a memtable.
1485    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1486        self.input.num_memtables() > index.index
1487    }
1488
1489    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1490        !self.is_mem_range_index(index)
1491            && index.index < self.input.num_files() + self.input.num_memtables()
1492    }
1493
1494    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1495        let range_meta = &self.ranges[part_range.identifier];
1496        let source_count = range_meta.indices.len();
1497
1498        self.input.range_pre_filter_mode(source_count)
1499    }
1500
1501    /// Retrieves the partition ranges.
1502    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1503        self.ranges
1504            .iter()
1505            .enumerate()
1506            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1507            .collect()
1508    }
1509
1510    /// Format the context for explain.
1511    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1512        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1513        for range_meta in &self.ranges {
1514            for idx in &range_meta.row_group_indices {
1515                if self.is_mem_range_index(*idx) {
1516                    num_mem_ranges += 1;
1517                } else if self.is_file_range_index(*idx) {
1518                    num_file_ranges += 1;
1519                } else {
1520                    num_other_ranges += 1;
1521                }
1522            }
1523        }
1524        if verbose {
1525            write!(f, "{{")?;
1526        }
1527        write!(
1528            f,
1529            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1530            self.ranges.len(),
1531            num_mem_ranges,
1532            self.input.num_files(),
1533            num_file_ranges,
1534        )?;
1535        if num_other_ranges > 0 {
1536            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1537        }
1538        write!(f, "}}")?;
1539
1540        if let Some(selector) = &self.input.series_row_selector {
1541            write!(f, ", \"selector\":\"{}\"", selector)?;
1542        }
1543        if let Some(distribution) = &self.input.distribution {
1544            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1545        }
1546
1547        if verbose {
1548            self.format_verbose_content(f)?;
1549        }
1550
1551        Ok(())
1552    }
1553
1554    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1555        struct FileWrapper<'a> {
1556            file: &'a FileHandle,
1557        }
1558
1559        impl fmt::Debug for FileWrapper<'_> {
1560            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1561                let (start, end) = self.file.time_range();
1562                write!(
1563                    f,
1564                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1565                    self.file.file_id(),
1566                    start.value(),
1567                    start.unit(),
1568                    end.value(),
1569                    end.unit(),
1570                    self.file.num_rows(),
1571                    self.file.size(),
1572                    self.file.index_size()
1573                )
1574            }
1575        }
1576
1577        struct InputWrapper<'a> {
1578            input: &'a ScanInput,
1579        }
1580
1581        #[cfg(feature = "enterprise")]
1582        impl InputWrapper<'_> {
1583            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1584                if self.input.extension_ranges.is_empty() {
1585                    return Ok(());
1586                }
1587
1588                let mut delimiter = "";
1589                write!(f, ", extension_ranges: [")?;
1590                for range in self.input.extension_ranges() {
1591                    write!(f, "{}{:?}", delimiter, range)?;
1592                    delimiter = ", ";
1593                }
1594                write!(f, "]")?;
1595                Ok(())
1596            }
1597        }
1598
1599        impl fmt::Debug for InputWrapper<'_> {
1600            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1601                let output_schema = self.input.mapper.output_schema();
1602                if !output_schema.is_empty() {
1603                    let names: Vec<_> = output_schema
1604                        .column_schemas()
1605                        .iter()
1606                        .map(|col| &col.name)
1607                        .collect();
1608                    write!(f, ", \"projection\": {:?}", names)?;
1609                }
1610                if let Some(predicate) = &self.input.predicate.predicate() {
1611                    if !predicate.exprs().is_empty() {
1612                        let exprs: Vec<_> =
1613                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1614                        write!(f, ", \"filters\": {:?}", exprs)?;
1615                    }
1616                    if !predicate.dyn_filters().is_empty() {
1617                        let dyn_filters: Vec<_> = predicate
1618                            .dyn_filters()
1619                            .iter()
1620                            .map(|f| format!("{}", f))
1621                            .collect();
1622                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1623                    }
1624                }
1625                #[cfg(feature = "vector_index")]
1626                if let Some(vector_index_k) = self.input.vector_index_k {
1627                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1628                }
1629                if !self.input.files.is_empty() {
1630                    write!(f, ", \"files\": ")?;
1631                    f.debug_list()
1632                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1633                        .finish()?;
1634                }
1635                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1636                #[cfg(feature = "enterprise")]
1637                self.format_extension_ranges(f)?;
1638
1639                Ok(())
1640            }
1641        }
1642
1643        write!(f, "{:?}", InputWrapper { input: &self.input })
1644    }
1645
1646    /// Add new dynamic filters to the predicates.
1647    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1648    pub(crate) fn add_dyn_filter_to_predicate(
1649        self: &Arc<Self>,
1650        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1651    ) -> Vec<bool> {
1652        let mut supported = Vec::with_capacity(filter_exprs.len());
1653        let filter_expr = filter_exprs
1654            .into_iter()
1655            .filter_map(|expr| {
1656                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1657                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1658            {
1659                supported.push(true);
1660                Some(dyn_filter)
1661            } else {
1662                supported.push(false);
1663                None
1664            }
1665            })
1666            .collect();
1667        self.input.predicate.add_dyn_filters(filter_expr);
1668        supported
1669    }
1670}
1671
1672/// Predicates to evaluate.
1673/// It only keeps filters that [SimpleFilterEvaluator] supports.
1674#[derive(Clone, Default)]
1675pub struct PredicateGroup {
1676    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1677    /// Predicate that includes request filters and region partition expr (if any).
1678    predicate_all: Predicate,
1679    /// Predicate that only includes request filters.
1680    predicate_without_region: Predicate,
1681    /// Region partition expression restored from metadata.
1682    region_partition_expr: Option<PartitionExpr>,
1683}
1684
1685impl PredicateGroup {
1686    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1687    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1688        let mut combined_exprs = exprs.to_vec();
1689        let mut region_partition_expr = None;
1690
1691        if let Some(expr_json) = metadata.partition_expr.as_ref()
1692            && !expr_json.is_empty()
1693            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1694                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1695        {
1696            let logical_expr = expr
1697                .try_as_logical_expr()
1698                .context(InvalidPartitionExprSnafu {
1699                    expr: expr_json.clone(),
1700                })?;
1701
1702            combined_exprs.push(logical_expr);
1703            region_partition_expr = Some(expr);
1704        }
1705
1706        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1707        // Columns in the expr.
1708        let mut columns = HashSet::new();
1709        for expr in &combined_exprs {
1710            columns.clear();
1711            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1712                continue;
1713            };
1714            time_filters.push(filter);
1715        }
1716        let time_filters = if time_filters.is_empty() {
1717            None
1718        } else {
1719            Some(Arc::new(time_filters))
1720        };
1721
1722        let predicate_all = Predicate::new(combined_exprs);
1723        let predicate_without_region = Predicate::new(exprs.to_vec());
1724
1725        Ok(Self {
1726            time_filters,
1727            predicate_all,
1728            predicate_without_region,
1729            region_partition_expr,
1730        })
1731    }
1732
1733    /// Returns time filters.
1734    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1735        self.time_filters.clone()
1736    }
1737
1738    /// Returns predicate of all exprs (including region partition expr if present).
1739    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1740        if self.predicate_all.is_empty() {
1741            None
1742        } else {
1743            Some(&self.predicate_all)
1744        }
1745    }
1746
1747    /// Returns predicate that excludes region partition expr.
1748    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1749        if self.predicate_without_region.is_empty() {
1750            None
1751        } else {
1752            Some(&self.predicate_without_region)
1753        }
1754    }
1755
1756    /// Add dynamic filters in the predicates.
1757    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1758        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1759        self.predicate_without_region.add_dyn_filters(dyn_filters);
1760    }
1761
1762    /// Returns the region partition expr from metadata, if any.
1763    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1764        self.region_partition_expr.as_ref()
1765    }
1766
1767    fn expr_to_filter(
1768        expr: &Expr,
1769        metadata: &RegionMetadata,
1770        columns: &mut HashSet<Column>,
1771    ) -> Option<SimpleFilterEvaluator> {
1772        columns.clear();
1773        // `expr_to_columns` won't return error.
1774        // We still ignore these expressions for safety.
1775        expr_to_columns(expr, columns).ok()?;
1776        if columns.len() > 1 {
1777            // Simple filter doesn't support multiple columns.
1778            return None;
1779        }
1780        let column = columns.iter().next()?;
1781        let column_meta = metadata.column_by_name(&column.name)?;
1782        if column_meta.semantic_type == SemanticType::Timestamp {
1783            SimpleFilterEvaluator::try_new(expr)
1784        } else {
1785            None
1786        }
1787    }
1788}
1789
1790#[cfg(test)]
1791mod tests {
1792    use std::sync::Arc;
1793
1794    use datafusion::physical_plan::expressions::lit as physical_lit;
1795    use datafusion_common::ScalarValue;
1796    use datafusion_expr::{col, lit};
1797    use datatypes::value::Value;
1798    use partition::expr::col as partition_col;
1799    use store_api::metadata::RegionMetadataBuilder;
1800    use store_api::storage::{
1801        ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
1802    };
1803
1804    use super::*;
1805    use crate::cache::CacheManager;
1806    use crate::memtable::time_partition::TimePartitions;
1807    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1808    use crate::region::version::VersionBuilder;
1809    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1810    use crate::test_util::scheduler_util::SchedulerEnv;
1811
1812    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1813        let mutable = Arc::new(TimePartitions::new(
1814            metadata.clone(),
1815            Arc::new(EmptyMemtableBuilder::default()),
1816            0,
1817            None,
1818        ));
1819        Arc::new(VersionBuilder::new(metadata, mutable).build())
1820    }
1821
1822    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1823        let env = SchedulerEnv::new().await;
1824        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1825        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1826        let file = FileHandle::new(
1827            crate::sst::file::FileMeta::default(),
1828            Arc::new(crate::sst::file_purger::NoopFilePurger),
1829        );
1830
1831        ScanInput::new(env.access_layer.clone(), mapper)
1832            .with_predicate(predicate)
1833            .with_cache(CacheStrategy::EnableAll(Arc::new(
1834                CacheManager::builder()
1835                    .range_result_cache_size(1024)
1836                    .build(),
1837            )))
1838            .with_files(vec![file])
1839    }
1840
1841    #[tokio::test]
1842    async fn test_build_read_column_ids_includes_filters() {
1843        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1844        let version = new_version(metadata.clone());
1845        let env = SchedulerEnv::new().await;
1846        let request = ScanRequest {
1847            projection_input: Some(vec![4].into()),
1848            filters: vec![
1849                col("v0").gt(lit(1)),
1850                col("ts").gt(lit(0)),
1851                col("k0").eq(lit("foo")),
1852            ],
1853            ..Default::default()
1854        };
1855        let scan_region = ScanRegion::new(
1856            version,
1857            env.access_layer.clone(),
1858            request,
1859            CacheStrategy::Disabled,
1860        );
1861        let predicate =
1862            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1863        let projection = &scan_region.request.projection_indices().unwrap();
1864        let read_ids = scan_region
1865            .build_read_column_ids(projection, &predicate)
1866            .unwrap();
1867        assert_eq!(vec![4, 0, 2, 3], read_ids);
1868    }
1869
1870    #[tokio::test]
1871    async fn test_build_read_column_ids_empty_projection() {
1872        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1873        let version = new_version(metadata.clone());
1874        let env = SchedulerEnv::new().await;
1875        let request = ScanRequest {
1876            projection_input: Some(ProjectionInput::default()),
1877            ..Default::default()
1878        };
1879        let scan_region = ScanRegion::new(
1880            version,
1881            env.access_layer.clone(),
1882            request,
1883            CacheStrategy::Disabled,
1884        );
1885        let predicate =
1886            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1887        let projection = &scan_region.request.projection_indices().unwrap();
1888        let read_ids = scan_region
1889            .build_read_column_ids(projection, &predicate)
1890            .unwrap();
1891        // Empty projection should still read the time index column (id 2 in this test schema).
1892        assert_eq!(vec![2], read_ids);
1893    }
1894
1895    #[tokio::test]
1896    async fn test_build_read_column_ids_keeps_projection_order() {
1897        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1898        let version = new_version(metadata.clone());
1899        let env = SchedulerEnv::new().await;
1900        let request = ScanRequest {
1901            projection_input: Some(vec![4, 1].into()),
1902            filters: vec![col("v0").gt(lit(1))],
1903            ..Default::default()
1904        };
1905        let scan_region = ScanRegion::new(
1906            version,
1907            env.access_layer.clone(),
1908            request,
1909            CacheStrategy::Disabled,
1910        );
1911        let predicate =
1912            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1913        let projection = &scan_region.request.projection_indices().unwrap();
1914        let read_ids = scan_region
1915            .build_read_column_ids(projection, &predicate)
1916            .unwrap();
1917        // Projection order preserved, extra columns appended in schema order.
1918        assert_eq!(vec![4, 1, 3], read_ids);
1919    }
1920
1921    /// Helper to create a timestamp millisecond literal.
1922    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1923        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1924    }
1925
1926    #[tokio::test]
1927    async fn test_build_scan_fingerprint_for_eligible_scan() {
1928        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1929        let input = new_scan_input(
1930            metadata.clone(),
1931            vec![
1932                col("ts").gt_eq(ts_lit(1000)),
1933                col("k0").eq(lit("foo")),
1934                col("v0").gt(lit(1)),
1935            ],
1936        )
1937        .await
1938        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1939        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1940        .with_merge_mode(MergeMode::LastNonNull)
1941        .with_filter_deleted(false);
1942
1943        let fingerprint = build_scan_fingerprint(&input).unwrap();
1944
1945        let expected = ScanRequestFingerprintBuilder {
1946            read_column_ids: input.read_column_ids.clone(),
1947            read_column_types: vec![
1948                metadata
1949                    .column_by_id(0)
1950                    .map(|col| col.column_schema.data_type.clone()),
1951                metadata
1952                    .column_by_id(2)
1953                    .map(|col| col.column_schema.data_type.clone()),
1954                metadata
1955                    .column_by_id(3)
1956                    .map(|col| col.column_schema.data_type.clone()),
1957            ],
1958            filters: vec![
1959                col("k0").eq(lit("foo")).to_string(),
1960                col("v0").gt(lit(1)).to_string(),
1961            ],
1962            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1963            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1964            append_mode: false,
1965            filter_deleted: false,
1966            merge_mode: MergeMode::LastNonNull,
1967            partition_expr_version: 0,
1968        }
1969        .build();
1970        assert_eq!(expected, fingerprint);
1971    }
1972
1973    #[tokio::test]
1974    async fn test_build_scan_fingerprint_requires_tag_filter() {
1975        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1976        let input = new_scan_input(
1977            metadata,
1978            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1979        )
1980        .await;
1981
1982        assert!(build_scan_fingerprint(&input).is_none());
1983    }
1984
1985    #[tokio::test]
1986    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1987        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1988        let filters = vec![col("k0").eq(lit("foo"))];
1989
1990        let disabled = ScanInput::new(
1991            SchedulerEnv::new().await.access_layer.clone(),
1992            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1993        )
1994        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1995        assert!(build_scan_fingerprint(&disabled).is_none());
1996
1997        let compaction = new_scan_input(metadata.clone(), filters.clone())
1998            .await
1999            .with_compaction(true);
2000        assert!(build_scan_fingerprint(&compaction).is_none());
2001
2002        // No files to read.
2003        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2004        assert!(build_scan_fingerprint(&no_files).is_none());
2005    }
2006
2007    #[tokio::test]
2008    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2009        let base = metadata_with_primary_key(vec![0, 1], false);
2010        let mut builder = RegionMetadataBuilder::from_existing(base);
2011        let partition_expr = partition_col("k0")
2012            .gt_eq(Value::String("foo".into()))
2013            .as_json_str()
2014            .unwrap();
2015        builder.partition_expr_json(Some(partition_expr));
2016        let metadata = Arc::new(builder.build_without_validation().unwrap());
2017
2018        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2019        let fingerprint = build_scan_fingerprint(&input).unwrap();
2020
2021        let expected = ScanRequestFingerprintBuilder {
2022            read_column_ids: input.read_column_ids.clone(),
2023            read_column_types: vec![
2024                metadata
2025                    .column_by_id(0)
2026                    .map(|col| col.column_schema.data_type.clone()),
2027                metadata
2028                    .column_by_id(2)
2029                    .map(|col| col.column_schema.data_type.clone()),
2030                metadata
2031                    .column_by_id(3)
2032                    .map(|col| col.column_schema.data_type.clone()),
2033            ],
2034            filters: vec![col("k0").eq(lit("foo")).to_string()],
2035            time_filters: vec![],
2036            series_row_selector: None,
2037            append_mode: false,
2038            filter_deleted: true,
2039            merge_mode: MergeMode::LastRow,
2040            partition_expr_version: metadata.partition_expr_version,
2041        }
2042        .build();
2043        assert_eq!(expected, fingerprint);
2044        assert_ne!(0, metadata.partition_expr_version);
2045    }
2046
2047    #[test]
2048    fn test_update_dyn_filters_with_empty_base_predicates() {
2049        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2050        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2051        assert!(predicate_group.predicate().is_none());
2052        assert!(predicate_group.predicate_without_region().is_none());
2053
2054        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2055        predicate_group.add_dyn_filters(vec![dyn_filter]);
2056
2057        let predicate_all = predicate_group.predicate().unwrap();
2058        assert!(predicate_all.exprs().is_empty());
2059        assert_eq!(1, predicate_all.dyn_filters().len());
2060
2061        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2062        assert!(predicate_without_region.exprs().is_empty());
2063        assert_eq!(1, predicate_without_region.dyn_filters().len());
2064    }
2065
2066    #[tokio::test]
2067    async fn test_range_pre_filter_mode() {
2068        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2069        let cases = [
2070            (true, MergeMode::LastRow, 1, PreFilterMode::All),
2071            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2072            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2073            (true, MergeMode::LastRow, 2, PreFilterMode::All),
2074        ];
2075
2076        for (append_mode, merge_mode, source_count, expected_mode) in cases {
2077            let input = new_scan_input(metadata.clone(), vec![])
2078                .await
2079                .with_append_mode(append_mode)
2080                .with_merge_mode(merge_mode);
2081
2082            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2083        }
2084    }
2085}