Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::data_type::ConcreteDataType;
35use datatypes::extension::json::is_json_extension_type;
36use datatypes::schema::Schema;
37use datatypes::schema::ext::ArrowSchemaExt;
38use datatypes::types::json_type::JsonNativeType;
39use futures::StreamExt;
40use partition::expr::PartitionExpr;
41use smallvec::SmallVec;
42use snafu::ResultExt;
43use store_api::metadata::{RegionMetadata, RegionMetadataRef};
44use store_api::region_engine::{PartitionRange, RegionScannerRef};
45use store_api::storage::{
46    RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
47    TimeSeriesRowSelector,
48};
49use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
50use tokio::sync::{Semaphore, mpsc};
51use tokio_stream::wrappers::ReceiverStream;
52
53use crate::access_layer::AccessLayerRef;
54use crate::cache::CacheStrategy;
55use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
56use crate::error::{InvalidPartitionExprSnafu, Result};
57#[cfg(feature = "enterprise")]
58use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
59use crate::memtable::{MemtableRange, RangesOptions};
60use crate::metrics::READ_SST_COUNT;
61use crate::read::compat::{self, FlatCompatBatch};
62use crate::read::flat_projection::FlatProjectionMapper;
63use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
64use crate::read::range_cache::ScanRequestFingerprint;
65use crate::read::read_columns::{
66    ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
67};
68use crate::read::seq_scan::SeqScan;
69use crate::read::series_scan::SeriesScan;
70use crate::read::stream::ScanBatchStream;
71use crate::read::unordered_scan::UnorderedScan;
72use crate::read::{BoxedRecordBatchStream, RecordBatch};
73use crate::region::options::MergeMode;
74use crate::region::version::VersionRef;
75use crate::sst::file::FileHandle;
76use crate::sst::index::bloom_filter::applier::{
77    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
78};
79use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
80use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
81use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
82use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
83#[cfg(feature = "vector_index")]
84use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
85use crate::sst::parquet::file_range::PreFilterMode;
86use crate::sst::parquet::reader::ReaderMetrics;
87
88#[cfg(feature = "vector_index")]
89const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
90
91/// A scanner scans a region and returns a [SendableRecordBatchStream].
92pub(crate) enum Scanner {
93    /// Sequential scan.
94    Seq(SeqScan),
95    /// Unordered scan.
96    Unordered(UnorderedScan),
97    /// Per-series scan.
98    Series(SeriesScan),
99}
100
101impl Scanner {
102    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
103    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
104    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
105        match self {
106            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
107            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
108            Scanner::Series(series_scan) => series_scan.build_stream().await,
109        }
110    }
111
112    /// Create a stream of [`Batch`] by this scanner.
113    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
114        match self {
115            Scanner::Seq(x) => x.scan_all_partitions(),
116            Scanner::Unordered(x) => x.scan_all_partitions(),
117            Scanner::Series(x) => x.scan_all_partitions(),
118        }
119    }
120}
121
122#[cfg(test)]
123impl Scanner {
124    /// Returns number of files to scan.
125    pub(crate) fn num_files(&self) -> usize {
126        match self {
127            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
128            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
129            Scanner::Series(series_scan) => series_scan.input().num_files(),
130        }
131    }
132
133    /// Returns number of memtables to scan.
134    pub(crate) fn num_memtables(&self) -> usize {
135        match self {
136            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
137            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
138            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
139        }
140    }
141
142    /// Returns SST file ids to scan.
143    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
144        match self {
145            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
146            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
147            Scanner::Series(series_scan) => series_scan.input().file_ids(),
148        }
149    }
150
151    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
152        match self {
153            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
154            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
155            Scanner::Series(series_scan) => series_scan.input().index_ids(),
156        }
157    }
158
159    pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
160        match self {
161            Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
162            Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
163            Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
164        }
165    }
166
167    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
168    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
169        use store_api::region_engine::{PrepareRequest, RegionScanner};
170
171        let request = PrepareRequest::default().with_target_partitions(target_partitions);
172        match self {
173            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
174            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
175            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
176        }
177    }
178}
179
180#[cfg_attr(doc, aquamarine::aquamarine)]
181/// Helper to scans a region by [ScanRequest].
182///
183/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
184/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
185///
186/// ```mermaid
187/// classDiagram
188/// class ScanRegion {
189///     -VersionRef version
190///     -ScanRequest request
191///     ~scanner() Scanner
192///     ~seq_scan() SeqScan
193/// }
194/// class Scanner {
195///     <<enumeration>>
196///     SeqScan
197///     UnorderedScan
198///     +scan() SendableRecordBatchStream
199/// }
200/// class SeqScan {
201///     -ScanInput input
202///     +build() SendableRecordBatchStream
203/// }
204/// class UnorderedScan {
205///     -ScanInput input
206///     +build() SendableRecordBatchStream
207/// }
208/// class ScanInput {
209///     -ProjectionMapper mapper
210///     -Option~TimeRange~ time_range
211///     -Option~Predicate~ predicate
212///     -Vec~MemtableRef~ memtables
213///     -Vec~FileHandle~ files
214/// }
215/// class ProjectionMapper {
216///     ~output_schema() SchemaRef
217///     ~convert(Batch) RecordBatch
218/// }
219/// ScanRegion -- Scanner
220/// ScanRegion o-- ScanRequest
221/// Scanner o-- SeqScan
222/// Scanner o-- UnorderedScan
223/// SeqScan o-- ScanInput
224/// UnorderedScan o-- ScanInput
225/// Scanner -- SendableRecordBatchStream
226/// ScanInput o-- ProjectionMapper
227/// SeqScan -- SendableRecordBatchStream
228/// UnorderedScan -- SendableRecordBatchStream
229/// ```
230pub(crate) struct ScanRegion {
231    /// Version of the region at scan.
232    version: VersionRef,
233    /// Access layer of the region.
234    access_layer: AccessLayerRef,
235    /// Scan request.
236    request: ScanRequest,
237    /// Cache.
238    cache_strategy: CacheStrategy,
239    /// Maximum number of SST files to scan concurrently.
240    max_concurrent_scan_files: usize,
241    /// Whether to ignore inverted index.
242    ignore_inverted_index: bool,
243    /// Whether to ignore fulltext index.
244    ignore_fulltext_index: bool,
245    /// Whether to ignore bloom filter.
246    ignore_bloom_filter: bool,
247    /// Start time of the scan task.
248    start_time: Option<Instant>,
249    /// Whether to filter out the deleted rows.
250    /// Usually true for normal read, and false for scan for compaction.
251    filter_deleted: bool,
252    #[cfg(feature = "enterprise")]
253    extension_range_provider: Option<BoxedExtensionRangeProvider>,
254}
255
256impl ScanRegion {
257    /// Creates a [ScanRegion].
258    pub(crate) fn new(
259        version: VersionRef,
260        access_layer: AccessLayerRef,
261        request: ScanRequest,
262        cache_strategy: CacheStrategy,
263    ) -> ScanRegion {
264        ScanRegion {
265            version,
266            access_layer,
267            request,
268            cache_strategy,
269            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
270            ignore_inverted_index: false,
271            ignore_fulltext_index: false,
272            ignore_bloom_filter: false,
273            start_time: None,
274            filter_deleted: true,
275            #[cfg(feature = "enterprise")]
276            extension_range_provider: None,
277        }
278    }
279
280    /// Sets maximum number of SST files to scan concurrently.
281    #[must_use]
282    pub(crate) fn with_max_concurrent_scan_files(
283        mut self,
284        max_concurrent_scan_files: usize,
285    ) -> Self {
286        self.max_concurrent_scan_files = max_concurrent_scan_files;
287        self
288    }
289
290    /// Sets whether to ignore inverted index.
291    #[must_use]
292    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
293        self.ignore_inverted_index = ignore;
294        self
295    }
296
297    /// Sets whether to ignore fulltext index.
298    #[must_use]
299    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
300        self.ignore_fulltext_index = ignore;
301        self
302    }
303
304    /// Sets whether to ignore bloom filter.
305    #[must_use]
306    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
307        self.ignore_bloom_filter = ignore;
308        self
309    }
310
311    #[must_use]
312    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
313        self.start_time = Some(now);
314        self
315    }
316
317    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
318        self.filter_deleted = filter_deleted;
319    }
320
321    #[cfg(feature = "enterprise")]
322    pub(crate) fn set_extension_range_provider(
323        &mut self,
324        extension_range_provider: BoxedExtensionRangeProvider,
325    ) {
326        self.extension_range_provider = Some(extension_range_provider);
327    }
328
329    /// Returns a [Scanner] to scan the region.
330    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
331    pub(crate) async fn scanner(self) -> Result<Scanner> {
332        if self.use_series_scan() {
333            self.series_scan().await.map(Scanner::Series)
334        } else if self.use_unordered_scan() {
335            // If table is append only and there is no series row selector, we use unordered scan in query.
336            // We still use seq scan in compaction.
337            self.unordered_scan().await.map(Scanner::Unordered)
338        } else {
339            self.seq_scan().await.map(Scanner::Seq)
340        }
341    }
342
343    /// Returns a [RegionScanner] to scan the region.
344    #[tracing::instrument(
345        level = tracing::Level::DEBUG,
346        skip_all,
347        fields(region_id = %self.region_id())
348    )]
349    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
350        if self.use_series_scan() {
351            self.series_scan()
352                .await
353                .map(|scanner| Box::new(scanner) as _)
354        } else if self.use_unordered_scan() {
355            self.unordered_scan()
356                .await
357                .map(|scanner| Box::new(scanner) as _)
358        } else {
359            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
360        }
361    }
362
363    /// Scan sequentially.
364    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
365    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
366        let input = self.scan_input().await?.with_compaction(false);
367        Ok(SeqScan::new(input))
368    }
369
370    /// Unordered scan.
371    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
372    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
373        let input = self.scan_input().await?;
374        Ok(UnorderedScan::new(input))
375    }
376
377    /// Scans by series.
378    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
379    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
380        let input = self.scan_input().await?;
381        Ok(SeriesScan::new(input))
382    }
383
384    /// Returns true if the region can use unordered scan for current request.
385    fn use_unordered_scan(&self) -> bool {
386        // We use unordered scan when:
387        // 1. The region is in append mode.
388        // 2. There is no series row selector.
389        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
390        //
391        // We still use seq scan in compaction.
392        self.version.options.append_mode
393            && self.request.series_row_selector.is_none()
394            && (self.request.distribution.is_none()
395                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
396    }
397
398    /// Returns true if the region can use series scan for current request.
399    fn use_series_scan(&self) -> bool {
400        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
401    }
402
403    /// Creates a scan input.
404    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
405    async fn scan_input(self) -> Result<ScanInput> {
406        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
407        let time_range = self.build_time_range_predicate();
408        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
409
410        let read_cols = match &self.request.projection_input {
411            Some(p) => {
412                // Read columns include the pushed-down projection and columns
413                // resolved from the predicate.
414                let metadata = &self.version.metadata;
415                let from_projection = read_columns_from_projection(p.clone(), metadata)?;
416                let from_predicate = read_columns_from_predicate(&predicate, metadata);
417                merge(from_projection, from_predicate)
418            }
419            None => {
420                let read_col_ids = self
421                    .version
422                    .metadata
423                    .column_metadatas
424                    .iter()
425                    .map(|col| col.column_id);
426                ReadColumns::from_deduped_column_ids(read_col_ids)
427            }
428        };
429        let read_col_ids = read_cols.column_ids();
430
431        // The mapper always computes projected column ids as the schema of SSTs may change.
432        let mut mapper = match self.request.projection_indices() {
433            Some(p) => FlatProjectionMapper::new_with_read_columns(
434                &self.version.metadata,
435                p.to_vec(),
436                read_cols,
437            )?,
438            None => FlatProjectionMapper::all(&self.version.metadata)?,
439        };
440        concretize_json_types(&mut mapper, &self.request.json_type_hint);
441
442        let ssts = &self.version.ssts;
443        let mut files = Vec::new();
444        for level in ssts.levels() {
445            for file in level.files.values() {
446                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
447                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
448                    // If the file's sequence is None (or actually is zero), it could mean the file
449                    // is generated and added to the region "directly". In this case, its data should
450                    // be considered as fresh as the memtable. So its sequence is treated greater than
451                    // the min_sequence, whatever the value of min_sequence is. Hence the default
452                    // "true" in this arm.
453                    (Some(_), None) => true,
454                    (None, _) => true,
455                };
456
457                // Finds SST files in range.
458                if exceed_min_sequence && file_in_range(file, &time_range) {
459                    files.push(file.clone());
460                }
461                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
462                // unless the timing is too good, or the sequence number wouldn't be in file.
463                // and the batch will be filtered out by tree reader anyway.
464            }
465        }
466
467        let memtables = self.version.memtables.list_memtables();
468        // Skip empty memtables and memtables out of time range.
469        let mut mem_range_builders = Vec::new();
470        let filter_mode = pre_filter_mode(
471            self.version.options.append_mode,
472            self.version.options.merge_mode(),
473        );
474
475        for m in memtables {
476            // check if memtable is empty by reading stats.
477            let Some((start, end)) = m.stats().time_range() else {
478                continue;
479            };
480            // The time range of the memtable is inclusive.
481            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
482            if !memtable_range.intersects(&time_range) {
483                continue;
484            }
485            let ranges_in_memtable = m.ranges(
486                Some(&read_col_ids),
487                RangesOptions::default()
488                    .with_predicate(predicate.clone())
489                    .with_sequence(SequenceRange::new(
490                        self.request.memtable_min_sequence,
491                        self.request.memtable_max_sequence,
492                    ))
493                    .with_pre_filter_mode(filter_mode),
494            )?;
495            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
496                let stats = v.stats().clone();
497                MemRangeBuilder::new(v, stats)
498            }));
499        }
500
501        let region_id = self.region_id();
502        debug!(
503            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
504            region_id,
505            self.request,
506            time_range,
507            mem_range_builders.len(),
508            files.len(),
509            self.version.options.append_mode,
510        );
511
512        let (non_field_filters, field_filters) = self.partition_by_field_filters();
513        let inverted_index_appliers = [
514            self.build_invereted_index_applier(&non_field_filters),
515            self.build_invereted_index_applier(&field_filters),
516        ];
517        let bloom_filter_appliers = [
518            self.build_bloom_filter_applier(&non_field_filters),
519            self.build_bloom_filter_applier(&field_filters),
520        ];
521        let fulltext_index_appliers = [
522            self.build_fulltext_index_applier(&non_field_filters),
523            self.build_fulltext_index_applier(&field_filters),
524        ];
525        #[cfg(feature = "vector_index")]
526        let vector_index_applier = self.build_vector_index_applier();
527        #[cfg(feature = "vector_index")]
528        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
529            if self.request.filters.is_empty() {
530                search.k
531            } else {
532                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
533            }
534        });
535
536        let input = ScanInput::new(self.access_layer, mapper)
537            .with_time_range(Some(time_range))
538            .with_predicate(predicate)
539            .with_memtables(mem_range_builders)
540            .with_files(files)
541            .with_cache(self.cache_strategy)
542            .with_inverted_index_appliers(inverted_index_appliers)
543            .with_bloom_filter_index_appliers(bloom_filter_appliers)
544            .with_fulltext_index_appliers(fulltext_index_appliers)
545            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
546            .with_start_time(self.start_time)
547            .with_append_mode(self.version.options.append_mode)
548            .with_filter_deleted(self.filter_deleted)
549            .with_merge_mode(self.version.options.merge_mode())
550            .with_series_row_selector(self.request.series_row_selector)
551            .with_distribution(self.request.distribution)
552            .with_explain_flat_format(
553                self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
554            )
555            .with_snapshot_sequence(
556                self.request
557                    .snapshot_on_scan
558                    .then_some(self.request.memtable_max_sequence)
559                    .flatten(),
560            );
561        #[cfg(feature = "vector_index")]
562        let input = input
563            .with_vector_index_applier(vector_index_applier)
564            .with_vector_index_k(vector_index_k);
565
566        #[cfg(feature = "enterprise")]
567        let input = if let Some(provider) = self.extension_range_provider {
568            let ranges = provider
569                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
570                .await?;
571            debug!("Find extension ranges: {ranges:?}");
572            input.with_extension_ranges(ranges)
573        } else {
574            input
575        };
576        Ok(input)
577    }
578
579    fn region_id(&self) -> RegionId {
580        self.version.metadata.region_id
581    }
582
583    /// Build time range predicate from filters.
584    fn build_time_range_predicate(&self) -> TimestampRange {
585        let time_index = self.version.metadata.time_index_column();
586        let unit = time_index
587            .column_schema
588            .data_type
589            .as_timestamp()
590            .expect("Time index must have timestamp-compatible type")
591            .unit();
592        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
593    }
594
595    /// Partitions filters into two groups: non-field filters and field filters.
596    /// Returns `(non_field_filters, field_filters)`.
597    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
598        let field_columns = self
599            .version
600            .metadata
601            .field_columns()
602            .map(|col| &col.column_schema.name)
603            .collect::<HashSet<_>>();
604
605        let mut columns = HashSet::new();
606
607        self.request.filters.iter().cloned().partition(|expr| {
608            columns.clear();
609            // `expr_to_columns` won't return error.
610            if expr_to_columns(expr, &mut columns).is_err() {
611                // If we can't extract columns, treat it as non-field filter
612                return true;
613            }
614            // Return true for non-field filters (partition puts true cases in first vec)
615            !columns
616                .iter()
617                .any(|column| field_columns.contains(&column.name))
618        })
619    }
620
621    /// Use the latest schema to build the inverted index applier.
622    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
623        if self.ignore_inverted_index {
624            return None;
625        }
626
627        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
628        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
629
630        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
631
632        InvertedIndexApplierBuilder::new(
633            self.access_layer.table_dir().to_string(),
634            self.access_layer.path_type(),
635            self.access_layer.object_store().clone(),
636            self.version.metadata.as_ref(),
637            self.version.metadata.inverted_indexed_column_ids(
638                self.version
639                    .options
640                    .index_options
641                    .inverted_index
642                    .ignore_column_ids
643                    .iter(),
644            ),
645            self.access_layer.puffin_manager_factory().clone(),
646        )
647        .with_file_cache(file_cache)
648        .with_inverted_index_cache(inverted_index_cache)
649        .with_puffin_metadata_cache(puffin_metadata_cache)
650        .build(filters)
651        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
652        .ok()
653        .flatten()
654        .map(Arc::new)
655    }
656
657    /// Use the latest schema to build the bloom filter index applier.
658    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
659        if self.ignore_bloom_filter {
660            return None;
661        }
662
663        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
664        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
665        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
666
667        BloomFilterIndexApplierBuilder::new(
668            self.access_layer.table_dir().to_string(),
669            self.access_layer.path_type(),
670            self.access_layer.object_store().clone(),
671            self.version.metadata.as_ref(),
672            self.access_layer.puffin_manager_factory().clone(),
673        )
674        .with_file_cache(file_cache)
675        .with_bloom_filter_index_cache(bloom_filter_index_cache)
676        .with_puffin_metadata_cache(puffin_metadata_cache)
677        .build(filters)
678        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
679        .ok()
680        .flatten()
681        .map(Arc::new)
682    }
683
684    /// Use the latest schema to build the fulltext index applier.
685    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
686        if self.ignore_fulltext_index {
687            return None;
688        }
689
690        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
691        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
693        FulltextIndexApplierBuilder::new(
694            self.access_layer.table_dir().to_string(),
695            self.access_layer.path_type(),
696            self.access_layer.object_store().clone(),
697            self.access_layer.puffin_manager_factory().clone(),
698            self.version.metadata.as_ref(),
699        )
700        .with_file_cache(file_cache)
701        .with_puffin_metadata_cache(puffin_metadata_cache)
702        .with_bloom_filter_cache(bloom_filter_index_cache)
703        .build(filters)
704        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
705        .ok()
706        .flatten()
707        .map(Arc::new)
708    }
709
710    /// Build the vector index applier from vector search request.
711    #[cfg(feature = "vector_index")]
712    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
713        let vector_search = self.request.vector_search.as_ref()?;
714
715        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
716        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
717        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
718
719        let applier = VectorIndexApplier::new(
720            self.access_layer.table_dir().to_string(),
721            self.access_layer.path_type(),
722            self.access_layer.object_store().clone(),
723            self.access_layer.puffin_manager_factory().clone(),
724            vector_search.column_id,
725            vector_search.query_vector.clone(),
726            vector_search.metric,
727        )
728        .with_file_cache(file_cache)
729        .with_puffin_metadata_cache(puffin_metadata_cache)
730        .with_vector_index_cache(vector_index_cache);
731
732        Some(Arc::new(applier))
733    }
734}
735
736fn concretize_json_types(
737    mapper: &mut FlatProjectionMapper,
738    json_type_hint: &HashMap<String, JsonNativeType>,
739) {
740    let output_schema = mapper.output_schema();
741    let output_arrow_schema = output_schema.arrow_schema();
742    if !output_arrow_schema.has_json_extension_field() {
743        return;
744    }
745
746    let mut column_schemas = output_schema.column_schemas().to_vec();
747    for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
748        if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
749            continue;
750        }
751        let Some(json_type) = json_type_hint.get(&column_schema.name) else {
752            continue;
753        };
754        column_schema.data_type = ConcreteDataType::from(json_type);
755    }
756
757    let output_schema = Arc::new(Schema::new_with_version(
758        column_schemas,
759        output_schema.version(),
760    ));
761    mapper.with_output_schema(output_schema);
762}
763
764/// Returns true if the time range of a SST `file` matches the `predicate`.
765fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
766    if predicate == &TimestampRange::min_to_max() {
767        return true;
768    }
769    // end timestamp of a SST is inclusive.
770    let (start, end) = file.time_range();
771    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
772    file_ts_range.intersects(predicate)
773}
774
775/// Common input for different scanners.
776pub struct ScanInput {
777    /// Region SST access layer.
778    access_layer: AccessLayerRef,
779    /// Maps projected Batches to RecordBatches.
780    pub(crate) mapper: Arc<FlatProjectionMapper>,
781    /// The columns to read from memtables and SSTs.
782    /// Notice this is different from the columns in `mapper` which are projected columns.
783    /// But this read columns might also include non-projected columns needed for filtering.
784    pub(crate) read_cols: ReadColumns,
785    /// Time range filter for time index.
786    pub(crate) time_range: Option<TimestampRange>,
787    /// Predicate to push down.
788    pub(crate) predicate: PredicateGroup,
789    /// Region partition expr applied at read time.
790    region_partition_expr: Option<PartitionExpr>,
791    /// Memtable range builders for memtables in the time range..
792    pub(crate) memtables: Vec<MemRangeBuilder>,
793    /// Handles to SST files to scan.
794    pub(crate) files: Vec<FileHandle>,
795    /// Cache.
796    pub(crate) cache_strategy: CacheStrategy,
797    /// Ignores file not found error.
798    ignore_file_not_found: bool,
799    /// Maximum number of SST files to scan concurrently.
800    pub(crate) max_concurrent_scan_files: usize,
801    /// Index appliers.
802    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
803    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
804    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
805    /// Vector index applier for KNN search.
806    #[cfg(feature = "vector_index")]
807    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
808    /// Over-fetched k for vector index scan.
809    #[cfg(feature = "vector_index")]
810    pub(crate) vector_index_k: Option<usize>,
811    /// Start time of the query.
812    pub(crate) query_start: Option<Instant>,
813    /// The region is using append mode.
814    pub(crate) append_mode: bool,
815    /// Whether to remove deletion markers.
816    pub(crate) filter_deleted: bool,
817    /// Mode to merge duplicate rows.
818    pub(crate) merge_mode: MergeMode,
819    /// Hint to select rows from time series.
820    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
821    /// Hint for the required distribution of the scanner.
822    pub(crate) distribution: Option<TimeSeriesDistribution>,
823    /// Whether the region's configured SST format is flat.
824    explain_flat_format: bool,
825    /// Snapshot upper bound bound at scan open and propagated back to the caller.
826    pub(crate) snapshot_sequence: Option<SequenceNumber>,
827    /// Whether this scan is for compaction.
828    pub(crate) compaction: bool,
829    #[cfg(feature = "enterprise")]
830    extension_ranges: Vec<BoxedExtensionRange>,
831}
832
833impl ScanInput {
834    /// Creates a new [ScanInput].
835    #[must_use]
836    pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
837        ScanInput {
838            access_layer,
839            read_cols: mapper.read_columns().clone(),
840            mapper: Arc::new(mapper),
841            time_range: None,
842            predicate: PredicateGroup::default(),
843            region_partition_expr: None,
844            memtables: Vec::new(),
845            files: Vec::new(),
846            cache_strategy: CacheStrategy::Disabled,
847            ignore_file_not_found: false,
848            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
849            inverted_index_appliers: [None, None],
850            bloom_filter_index_appliers: [None, None],
851            fulltext_index_appliers: [None, None],
852            #[cfg(feature = "vector_index")]
853            vector_index_applier: None,
854            #[cfg(feature = "vector_index")]
855            vector_index_k: None,
856            query_start: None,
857            append_mode: false,
858            filter_deleted: true,
859            merge_mode: MergeMode::default(),
860            series_row_selector: None,
861            distribution: None,
862            explain_flat_format: false,
863            snapshot_sequence: None,
864            compaction: false,
865            #[cfg(feature = "enterprise")]
866            extension_ranges: Vec::new(),
867        }
868    }
869
870    /// Sets time range filter for time index.
871    #[must_use]
872    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
873        self.time_range = time_range;
874        self
875    }
876
877    /// Sets predicate to push down.
878    #[must_use]
879    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
880        self.region_partition_expr = predicate.region_partition_expr().cloned();
881        self.predicate = predicate;
882        self
883    }
884
885    /// Sets memtable range builders.
886    #[must_use]
887    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
888        self.memtables = memtables;
889        self
890    }
891
892    /// Sets files to read.
893    #[must_use]
894    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
895        self.files = files;
896        self
897    }
898
899    /// Sets cache for this query.
900    #[must_use]
901    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
902        self.cache_strategy = cache;
903        self
904    }
905
906    /// Ignores file not found error.
907    #[must_use]
908    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
909        self.ignore_file_not_found = ignore;
910        self
911    }
912
913    /// Sets maximum number of SST files to scan concurrently.
914    #[must_use]
915    pub(crate) fn with_max_concurrent_scan_files(
916        mut self,
917        max_concurrent_scan_files: usize,
918    ) -> Self {
919        self.max_concurrent_scan_files = max_concurrent_scan_files;
920        self
921    }
922
923    /// Sets inverted index appliers.
924    #[must_use]
925    pub(crate) fn with_inverted_index_appliers(
926        mut self,
927        appliers: [Option<InvertedIndexApplierRef>; 2],
928    ) -> Self {
929        self.inverted_index_appliers = appliers;
930        self
931    }
932
933    /// Sets bloom filter appliers.
934    #[must_use]
935    pub(crate) fn with_bloom_filter_index_appliers(
936        mut self,
937        appliers: [Option<BloomFilterIndexApplierRef>; 2],
938    ) -> Self {
939        self.bloom_filter_index_appliers = appliers;
940        self
941    }
942
943    /// Sets fulltext index appliers.
944    #[must_use]
945    pub(crate) fn with_fulltext_index_appliers(
946        mut self,
947        appliers: [Option<FulltextIndexApplierRef>; 2],
948    ) -> Self {
949        self.fulltext_index_appliers = appliers;
950        self
951    }
952
953    /// Sets vector index applier for KNN search.
954    #[cfg(feature = "vector_index")]
955    #[must_use]
956    pub(crate) fn with_vector_index_applier(
957        mut self,
958        applier: Option<VectorIndexApplierRef>,
959    ) -> Self {
960        self.vector_index_applier = applier;
961        self
962    }
963
964    /// Sets over-fetched k for vector index scan.
965    #[cfg(feature = "vector_index")]
966    #[must_use]
967    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
968        self.vector_index_k = k;
969        self
970    }
971
972    /// Sets start time of the query.
973    #[must_use]
974    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
975        self.query_start = now;
976        self
977    }
978
979    #[must_use]
980    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
981        self.append_mode = is_append_mode;
982        self
983    }
984
985    /// Sets whether to remove deletion markers during scan.
986    #[must_use]
987    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
988        self.filter_deleted = filter_deleted;
989        self
990    }
991
992    /// Sets the merge mode.
993    #[must_use]
994    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
995        self.merge_mode = merge_mode;
996        self
997    }
998
999    /// Sets the distribution hint.
1000    #[must_use]
1001    pub(crate) fn with_distribution(
1002        mut self,
1003        distribution: Option<TimeSeriesDistribution>,
1004    ) -> Self {
1005        self.distribution = distribution;
1006        self
1007    }
1008
1009    /// Sets whether the region's configured SST format is flat for explain output.
1010    #[must_use]
1011    pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1012        self.explain_flat_format = explain_flat_format;
1013        self
1014    }
1015
1016    /// Sets the time series row selector.
1017    #[must_use]
1018    pub(crate) fn with_series_row_selector(
1019        mut self,
1020        series_row_selector: Option<TimeSeriesRowSelector>,
1021    ) -> Self {
1022        self.series_row_selector = series_row_selector;
1023        self
1024    }
1025
1026    #[must_use]
1027    pub(crate) fn with_snapshot_sequence(
1028        mut self,
1029        snapshot_sequence: Option<SequenceNumber>,
1030    ) -> Self {
1031        self.snapshot_sequence = snapshot_sequence;
1032        self
1033    }
1034
1035    /// Sets whether this scan is for compaction.
1036    #[must_use]
1037    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1038        self.compaction = compaction;
1039        self
1040    }
1041
1042    /// Builds memtable ranges to scan by `index`.
1043    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1044        let memtable = &self.memtables[index.index];
1045        let mut ranges = SmallVec::new();
1046        memtable.build_ranges(index.row_group_index, &mut ranges);
1047        ranges
1048    }
1049
1050    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1051        if self.should_skip_region_partition(file) {
1052            self.predicate.predicate_without_region().cloned()
1053        } else {
1054            self.predicate.predicate().cloned()
1055        }
1056    }
1057
1058    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1059        match (
1060            self.region_partition_expr.as_ref(),
1061            file.meta_ref().partition_expr.as_ref(),
1062        ) {
1063            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1064            _ => false,
1065        }
1066    }
1067
1068    /// Prunes a file to scan and returns the builder to build readers.
1069    #[tracing::instrument(
1070        skip_all,
1071        fields(
1072            region_id = %self.region_metadata().region_id,
1073            file_id = %file.file_id()
1074        )
1075    )]
1076    pub async fn prune_file(
1077        &self,
1078        file: &FileHandle,
1079        pre_filter_mode: PreFilterMode,
1080        reader_metrics: &mut ReaderMetrics,
1081    ) -> Result<FileRangeBuilder> {
1082        let predicate = self.predicate_for_file(file);
1083        let decode_pk_values = !self.compaction
1084            && self
1085                .mapper
1086                .read_columns()
1087                .column_ids_iter()
1088                .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1089        let reader = self
1090            .access_layer
1091            .read_sst(file.clone())
1092            .predicate(predicate)
1093            .projection(Some(self.read_cols.clone()))
1094            .cache(self.cache_strategy.clone())
1095            .inverted_index_appliers(self.inverted_index_appliers.clone())
1096            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1097            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1098        #[cfg(feature = "vector_index")]
1099        let reader = {
1100            let mut reader = reader;
1101            reader =
1102                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1103            reader
1104        };
1105        let res = reader
1106            .expected_metadata(Some(self.mapper.metadata().clone()))
1107            .compaction(self.compaction)
1108            .pre_filter_mode(pre_filter_mode)
1109            .decode_primary_key_values(decode_pk_values)
1110            .build_reader_input(reader_metrics)
1111            .await;
1112        let read_input = match res {
1113            Ok(x) => x,
1114            Err(e) => {
1115                if e.is_object_not_found() && self.ignore_file_not_found {
1116                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1117                    return Ok(FileRangeBuilder::default());
1118                } else {
1119                    return Err(e);
1120                }
1121            }
1122        };
1123
1124        let Some((mut file_range_ctx, selection)) = read_input else {
1125            return Ok(FileRangeBuilder::default());
1126        };
1127
1128        let need_compat = !compat::has_same_columns_and_pk_encoding(
1129            self.mapper.metadata(),
1130            file_range_ctx.read_format().metadata(),
1131        );
1132        if need_compat {
1133            // They have different schema. We need to adapt the batch first so the
1134            // mapper can convert it.
1135            let compat = FlatCompatBatch::try_new(
1136                &self.mapper,
1137                file_range_ctx.read_format().metadata(),
1138                file_range_ctx.read_format().format_projection(),
1139                self.compaction,
1140            )?;
1141            file_range_ctx.set_compat_batch(compat);
1142        }
1143        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1144    }
1145
1146    /// Scans flat sources (RecordBatch streams) in parallel.
1147    ///
1148    /// # Panics if the input doesn't allow parallel scan.
1149    #[tracing::instrument(
1150        skip(self, sources, semaphore),
1151        fields(
1152            region_id = %self.region_metadata().region_id,
1153            source_count = sources.len()
1154        )
1155    )]
1156    pub(crate) fn create_parallel_flat_sources(
1157        &self,
1158        sources: Vec<BoxedRecordBatchStream>,
1159        semaphore: Arc<Semaphore>,
1160        channel_size: usize,
1161    ) -> Result<Vec<BoxedRecordBatchStream>> {
1162        if sources.len() <= 1 {
1163            return Ok(sources);
1164        }
1165
1166        // Spawn a task for each source.
1167        let sources = sources
1168            .into_iter()
1169            .map(|source| {
1170                let (sender, receiver) = mpsc::channel(channel_size);
1171                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1172                let stream = Box::pin(ReceiverStream::new(receiver));
1173                Box::pin(stream) as _
1174            })
1175            .collect();
1176        Ok(sources)
1177    }
1178
1179    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1180    #[tracing::instrument(
1181        skip(self, input, semaphore, sender),
1182        fields(region_id = %self.region_metadata().region_id)
1183    )]
1184    pub(crate) fn spawn_flat_scan_task(
1185        &self,
1186        mut input: BoxedRecordBatchStream,
1187        semaphore: Arc<Semaphore>,
1188        sender: mpsc::Sender<Result<RecordBatch>>,
1189    ) {
1190        let region_id = self.region_metadata().region_id;
1191        let span = tracing::info_span!(
1192            "ScanInput::parallel_scan_task",
1193            region_id = %region_id,
1194            stream_kind = "flat"
1195        );
1196        common_runtime::spawn_global(
1197            async move {
1198                loop {
1199                    // We release the permit before sending result to avoid the task waiting on
1200                    // the channel with the permit held.
1201                    let maybe_batch = {
1202                        // Safety: We never close the semaphore.
1203                        let _permit = semaphore.acquire().await.unwrap();
1204                        input.next().await
1205                    };
1206                    match maybe_batch {
1207                        Some(Ok(batch)) => {
1208                            let _ = sender.send(Ok(batch)).await;
1209                        }
1210                        Some(Err(e)) => {
1211                            let _ = sender.send(Err(e)).await;
1212                            break;
1213                        }
1214                        None => break,
1215                    }
1216                }
1217            }
1218            .instrument(span),
1219        );
1220    }
1221
1222    pub(crate) fn total_rows(&self) -> usize {
1223        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1224        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1225
1226        let rows = rows_in_files + rows_in_memtables;
1227        #[cfg(feature = "enterprise")]
1228        let rows = rows
1229            + self
1230                .extension_ranges
1231                .iter()
1232                .map(|x| x.num_rows())
1233                .sum::<u64>() as usize;
1234        rows
1235    }
1236
1237    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1238        &self.predicate
1239    }
1240
1241    /// Returns number of memtables to scan.
1242    pub(crate) fn num_memtables(&self) -> usize {
1243        self.memtables.len()
1244    }
1245
1246    /// Returns number of SST files to scan.
1247    pub(crate) fn num_files(&self) -> usize {
1248        self.files.len()
1249    }
1250
1251    /// Gets the file handle from a row group index.
1252    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1253        let file_index = index.index - self.num_memtables();
1254        &self.files[file_index]
1255    }
1256
1257    pub fn region_metadata(&self) -> &RegionMetadataRef {
1258        self.mapper.metadata()
1259    }
1260
1261    fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1262        if source_count <= 1 {
1263            // Duplicated rows in the same source is not a normal case and we don't provide
1264            // strict dedup semantic (last_row/last_non_null) for it. We expect the duplicated rows
1265            // are exactly identical in the same source so we use PreFilterMode::All for
1266            // performance reason.
1267            return PreFilterMode::All;
1268        }
1269
1270        pre_filter_mode(self.append_mode, self.merge_mode)
1271    }
1272}
1273
1274#[cfg(feature = "enterprise")]
1275impl ScanInput {
1276    #[must_use]
1277    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1278        Self {
1279            extension_ranges,
1280            ..self
1281        }
1282    }
1283
1284    #[cfg(feature = "enterprise")]
1285    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1286        &self.extension_ranges
1287    }
1288
1289    /// Get a boxed [ExtensionRange] by the index in all ranges.
1290    #[cfg(feature = "enterprise")]
1291    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1292        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1293    }
1294}
1295
1296#[cfg(test)]
1297impl ScanInput {
1298    /// Returns SST file ids to scan.
1299    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1300        self.files.iter().map(|file| file.file_id()).collect()
1301    }
1302
1303    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1304        self.files.iter().map(|file| file.index_id()).collect()
1305    }
1306}
1307
1308fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1309    if append_mode {
1310        return PreFilterMode::All;
1311    }
1312
1313    match merge_mode {
1314        MergeMode::LastRow => PreFilterMode::SkipFields,
1315        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1316    }
1317}
1318
1319/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1320/// for partition range caching.
1321pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1322    let eligible = !input.compaction
1323        && !input.files.is_empty()
1324        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1325
1326    if !eligible {
1327        return None;
1328    }
1329
1330    let metadata = input.region_metadata();
1331    let tag_names: HashSet<&str> = metadata
1332        .column_metadatas
1333        .iter()
1334        .filter(|col| col.semantic_type == SemanticType::Tag)
1335        .map(|col| col.column_schema.name.as_str())
1336        .collect();
1337
1338    let time_index = metadata.time_index_column();
1339    let time_index_name = time_index.column_schema.name.clone();
1340    let ts_col_unit = time_index
1341        .column_schema
1342        .data_type
1343        .as_timestamp()
1344        .expect("Time index must have timestamp-compatible type")
1345        .unit();
1346
1347    let exprs = input
1348        .predicate_group()
1349        .predicate_without_region()
1350        .map(|predicate| predicate.exprs())
1351        .unwrap_or_default();
1352
1353    let mut filters = Vec::new();
1354    let mut time_filters = Vec::new();
1355    let mut has_tag_filter = false;
1356    let mut columns = HashSet::new();
1357
1358    for expr in exprs {
1359        columns.clear();
1360        let is_time_only = match expr_to_columns(expr, &mut columns) {
1361            Ok(()) if !columns.is_empty() => {
1362                has_tag_filter |= columns
1363                    .iter()
1364                    .any(|col| tag_names.contains(col.name.as_str()));
1365                columns.iter().all(|col| col.name == time_index_name)
1366            }
1367            _ => false,
1368        };
1369
1370        // TODO(yingwen): The split between `time_filters` and `filters` is currently inert
1371        // because `build_range_cache_key()` always keeps both in the cache key. We used to
1372        // strip `time_filters` when the query's `TimestampRange` covered the partition's
1373        // `FileTimeRange`, but `extract_time_range_from_expr` is not precise enough to prove
1374        // a time predicate is implied by that range (it can return a wider range than the
1375        // predicate, and it does not analyze AND/OR shapes), which let the cache reuse rows
1376        // that should have been filtered. Reviving the optimization needs a per-predicate
1377        // implication check that walks each time-only `Expr` (recursing through AND/OR/NOT)
1378        // and proves the predicate holds for every timestamp inside the partition's
1379        // `FileTimeRange`; until then both buckets land in the fingerprint.
1380        if is_time_only
1381            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1382        {
1383            time_filters.push(expr.to_string());
1384        } else {
1385            filters.push(expr.to_string());
1386        }
1387    }
1388
1389    if !has_tag_filter {
1390        // We only cache requests that have tag filters to avoid caching all series.
1391        return None;
1392    }
1393
1394    // Ensure the filters are sorted for consistent fingerprinting.
1395    filters.sort_unstable();
1396    time_filters.sort_unstable();
1397    let read_columns = input.read_cols.clone();
1398    Some(
1399        crate::read::range_cache::ScanRequestFingerprintBuilder {
1400            read_column_types: read_columns
1401                .column_ids_iter()
1402                .map(|id| {
1403                    metadata
1404                        .column_by_id(id)
1405                        .map(|col| col.column_schema.data_type.clone())
1406                })
1407                .collect(),
1408            read_columns,
1409            filters,
1410            time_filters,
1411            series_row_selector: input.series_row_selector,
1412            append_mode: input.append_mode,
1413            filter_deleted: input.filter_deleted,
1414            merge_mode: input.merge_mode,
1415            partition_expr_version: metadata.partition_expr_version,
1416        }
1417        .build(),
1418    )
1419}
1420
1421/// Context shared by different streams from a scanner.
1422/// It contains the input and ranges to scan.
1423pub struct StreamContext {
1424    /// Input memtables and files.
1425    pub input: ScanInput,
1426    /// Metadata for partition ranges.
1427    pub(crate) ranges: Vec<RangeMeta>,
1428    /// Precomputed scan fingerprint for partition range caching.
1429    /// `None` when the scan is not eligible for caching.
1430    #[allow(dead_code)]
1431    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1432
1433    // Metrics:
1434    /// The start time of the query.
1435    pub(crate) query_start: Instant,
1436}
1437
1438impl StreamContext {
1439    /// Creates a new [StreamContext] for [SeqScan].
1440    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1441        let query_start = input.query_start.unwrap_or_else(Instant::now);
1442        let ranges = RangeMeta::seq_scan_ranges(&input);
1443        READ_SST_COUNT.observe(input.num_files() as f64);
1444        let scan_fingerprint = build_scan_fingerprint(&input);
1445
1446        Self {
1447            input,
1448            ranges,
1449            scan_fingerprint,
1450            query_start,
1451        }
1452    }
1453
1454    /// Creates a new [StreamContext] for [UnorderedScan].
1455    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1456        let query_start = input.query_start.unwrap_or_else(Instant::now);
1457        let ranges = RangeMeta::unordered_scan_ranges(&input);
1458        READ_SST_COUNT.observe(input.num_files() as f64);
1459        let scan_fingerprint = build_scan_fingerprint(&input);
1460
1461        Self {
1462            input,
1463            ranges,
1464            scan_fingerprint,
1465            query_start,
1466        }
1467    }
1468
1469    /// Returns true if the index refers to a memtable.
1470    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1471        self.input.num_memtables() > index.index
1472    }
1473
1474    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1475        !self.is_mem_range_index(index)
1476            && index.index < self.input.num_files() + self.input.num_memtables()
1477    }
1478
1479    pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1480        let range_meta = &self.ranges[part_range.identifier];
1481        let source_count = range_meta.indices.len();
1482
1483        self.input.range_pre_filter_mode(source_count)
1484    }
1485
1486    /// Retrieves the partition ranges.
1487    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1488        self.ranges
1489            .iter()
1490            .enumerate()
1491            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1492            .collect()
1493    }
1494
1495    /// Format the context for explain.
1496    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1497        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1498        for range_meta in &self.ranges {
1499            for idx in &range_meta.row_group_indices {
1500                if self.is_mem_range_index(*idx) {
1501                    num_mem_ranges += 1;
1502                } else if self.is_file_range_index(*idx) {
1503                    num_file_ranges += 1;
1504                } else {
1505                    num_other_ranges += 1;
1506                }
1507            }
1508        }
1509        if verbose {
1510            write!(f, "{{")?;
1511        }
1512        write!(
1513            f,
1514            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1515            self.ranges.len(),
1516            num_mem_ranges,
1517            self.input.num_files(),
1518            num_file_ranges,
1519        )?;
1520        if num_other_ranges > 0 {
1521            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1522        }
1523        write!(f, "}}")?;
1524
1525        if let Some(selector) = &self.input.series_row_selector {
1526            write!(f, ", \"selector\":\"{}\"", selector)?;
1527        }
1528        if let Some(distribution) = &self.input.distribution {
1529            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1530        }
1531
1532        if verbose {
1533            self.format_verbose_content(f)?;
1534        }
1535
1536        Ok(())
1537    }
1538
1539    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1540        struct FileWrapper<'a> {
1541            file: &'a FileHandle,
1542        }
1543
1544        impl fmt::Debug for FileWrapper<'_> {
1545            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1546                let (start, end) = self.file.time_range();
1547                write!(
1548                    f,
1549                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1550                    self.file.file_id(),
1551                    start.value(),
1552                    start.unit(),
1553                    end.value(),
1554                    end.unit(),
1555                    self.file.num_rows(),
1556                    self.file.size(),
1557                    self.file.index_size()
1558                )
1559            }
1560        }
1561
1562        struct InputWrapper<'a> {
1563            input: &'a ScanInput,
1564        }
1565
1566        #[cfg(feature = "enterprise")]
1567        impl InputWrapper<'_> {
1568            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1569                if self.input.extension_ranges.is_empty() {
1570                    return Ok(());
1571                }
1572
1573                let mut delimiter = "";
1574                write!(f, ", extension_ranges: [")?;
1575                for range in self.input.extension_ranges() {
1576                    write!(f, "{}{:?}", delimiter, range)?;
1577                    delimiter = ", ";
1578                }
1579                write!(f, "]")?;
1580                Ok(())
1581            }
1582        }
1583
1584        impl fmt::Debug for InputWrapper<'_> {
1585            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1586                let output_schema = self.input.mapper.output_schema();
1587                if !output_schema.is_empty() {
1588                    let names: Vec<_> = output_schema
1589                        .column_schemas()
1590                        .iter()
1591                        .map(|col| &col.name)
1592                        .collect();
1593                    write!(f, ", \"projection\": {:?}", names)?;
1594                }
1595                if let Some(predicate) = &self.input.predicate.predicate() {
1596                    if !predicate.exprs().is_empty() {
1597                        let exprs: Vec<_> =
1598                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1599                        write!(f, ", \"filters\": {:?}", exprs)?;
1600                    }
1601                    if !predicate.dyn_filters().is_empty() {
1602                        let dyn_filters: Vec<_> = predicate
1603                            .dyn_filters()
1604                            .iter()
1605                            .map(|f| format!("{}", f))
1606                            .collect();
1607                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1608                    }
1609                }
1610                #[cfg(feature = "vector_index")]
1611                if let Some(vector_index_k) = self.input.vector_index_k {
1612                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1613                }
1614                if !self.input.files.is_empty() {
1615                    write!(f, ", \"files\": ")?;
1616                    f.debug_list()
1617                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1618                        .finish()?;
1619                }
1620                write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1621                #[cfg(feature = "enterprise")]
1622                self.format_extension_ranges(f)?;
1623
1624                Ok(())
1625            }
1626        }
1627
1628        write!(f, "{:?}", InputWrapper { input: &self.input })
1629    }
1630
1631    /// Add new dynamic filters to the predicates.
1632    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1633    pub(crate) fn add_dyn_filter_to_predicate(
1634        self: &Arc<Self>,
1635        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1636    ) -> Vec<bool> {
1637        let mut supported = Vec::with_capacity(filter_exprs.len());
1638        let filter_expr = filter_exprs
1639            .into_iter()
1640            .filter_map(|expr| {
1641                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1642                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1643            {
1644                supported.push(true);
1645                Some(dyn_filter)
1646            } else {
1647                supported.push(false);
1648                None
1649            }
1650            })
1651            .collect();
1652        self.input.predicate.add_dyn_filters(filter_expr);
1653        supported
1654    }
1655}
1656
1657/// Predicates to evaluate.
1658/// It only keeps filters that [SimpleFilterEvaluator] supports.
1659#[derive(Clone, Default)]
1660pub struct PredicateGroup {
1661    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1662    /// Predicate that includes request filters and region partition expr (if any).
1663    predicate_all: Predicate,
1664    /// Predicate that only includes request filters.
1665    predicate_without_region: Predicate,
1666    /// Region partition expression restored from metadata.
1667    region_partition_expr: Option<PartitionExpr>,
1668}
1669
1670impl PredicateGroup {
1671    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1672    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1673        let mut combined_exprs = exprs.to_vec();
1674        let mut region_partition_expr = None;
1675
1676        if let Some(expr_json) = metadata.partition_expr.as_ref()
1677            && !expr_json.is_empty()
1678            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1679                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1680        {
1681            let logical_expr = expr
1682                .try_as_logical_expr()
1683                .context(InvalidPartitionExprSnafu {
1684                    expr: expr_json.clone(),
1685                })?;
1686
1687            combined_exprs.push(logical_expr);
1688            region_partition_expr = Some(expr);
1689        }
1690
1691        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1692        // Columns in the expr.
1693        let mut columns = HashSet::new();
1694        for expr in &combined_exprs {
1695            columns.clear();
1696            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1697                continue;
1698            };
1699            time_filters.push(filter);
1700        }
1701        let time_filters = if time_filters.is_empty() {
1702            None
1703        } else {
1704            Some(Arc::new(time_filters))
1705        };
1706
1707        let predicate_all = Predicate::new(combined_exprs);
1708        let predicate_without_region = Predicate::new(exprs.to_vec());
1709
1710        Ok(Self {
1711            time_filters,
1712            predicate_all,
1713            predicate_without_region,
1714            region_partition_expr,
1715        })
1716    }
1717
1718    /// Returns time filters.
1719    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1720        self.time_filters.clone()
1721    }
1722
1723    /// Returns predicate of all exprs (including region partition expr if present).
1724    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1725        if self.predicate_all.is_empty() {
1726            None
1727        } else {
1728            Some(&self.predicate_all)
1729        }
1730    }
1731
1732    /// Returns predicate that excludes region partition expr.
1733    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1734        if self.predicate_without_region.is_empty() {
1735            None
1736        } else {
1737            Some(&self.predicate_without_region)
1738        }
1739    }
1740
1741    /// Add dynamic filters in the predicates.
1742    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1743        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1744        self.predicate_without_region.add_dyn_filters(dyn_filters);
1745    }
1746
1747    /// Returns the region partition expr from metadata, if any.
1748    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1749        self.region_partition_expr.as_ref()
1750    }
1751
1752    fn expr_to_filter(
1753        expr: &Expr,
1754        metadata: &RegionMetadata,
1755        columns: &mut HashSet<Column>,
1756    ) -> Option<SimpleFilterEvaluator> {
1757        columns.clear();
1758        // `expr_to_columns` won't return error.
1759        // We still ignore these expressions for safety.
1760        expr_to_columns(expr, columns).ok()?;
1761        if columns.len() > 1 {
1762            // Simple filter doesn't support multiple columns.
1763            return None;
1764        }
1765        let column = columns.iter().next()?;
1766        let column_meta = metadata.column_by_name(&column.name)?;
1767        if column_meta.semantic_type == SemanticType::Timestamp {
1768            SimpleFilterEvaluator::try_new(expr)
1769        } else {
1770            None
1771        }
1772    }
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777    use std::collections::HashMap;
1778    use std::sync::Arc;
1779
1780    use datafusion::physical_plan::expressions::lit as physical_lit;
1781    use datafusion_common::ScalarValue;
1782    use datafusion_expr::{col, lit};
1783    use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
1784    use datatypes::schema::ColumnSchema;
1785    use datatypes::types::json_type::JsonObjectType;
1786    use datatypes::value::Value;
1787    use partition::expr::col as partition_col;
1788    use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1789    use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1790
1791    use super::*;
1792    use crate::cache::CacheManager;
1793    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1794    use crate::test_util::memtable_util::metadata_with_primary_key;
1795    use crate::test_util::scheduler_util::SchedulerEnv;
1796
1797    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1798        let env = SchedulerEnv::new().await;
1799        let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1800        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1801        let file = FileHandle::new(
1802            crate::sst::file::FileMeta::default(),
1803            Arc::new(crate::sst::file_purger::NoopFilePurger),
1804        );
1805
1806        ScanInput::new(env.access_layer.clone(), mapper)
1807            .with_predicate(predicate)
1808            .with_cache(CacheStrategy::EnableAll(Arc::new(
1809                CacheManager::builder()
1810                    .range_result_cache_size(1024)
1811                    .build(),
1812            )))
1813            .with_files(vec![file])
1814    }
1815
1816    /// Helper to create a timestamp millisecond literal.
1817    fn ts_lit(val: i64) -> datafusion_expr::Expr {
1818        lit(ScalarValue::TimestampMillisecond(Some(val), None))
1819    }
1820
1821    fn metadata_with_json_field() -> RegionMetadataRef {
1822        let mut json_column = ColumnSchema::new(
1823            "payload",
1824            ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
1825            true,
1826        );
1827        json_column
1828            .with_extension_type(&JsonExtensionType::new(Arc::new(JsonMetadata::default())))
1829            .unwrap();
1830
1831        let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
1832        builder
1833            .push_column_metadata(ColumnMetadata {
1834                column_schema: ColumnSchema::new(
1835                    "host",
1836                    ConcreteDataType::string_datatype(),
1837                    false,
1838                ),
1839                semantic_type: SemanticType::Tag,
1840                column_id: 1,
1841            })
1842            .push_column_metadata(ColumnMetadata {
1843                column_schema: ColumnSchema::new(
1844                    "ts",
1845                    ConcreteDataType::timestamp_millisecond_datatype(),
1846                    false,
1847                ),
1848                semantic_type: SemanticType::Timestamp,
1849                column_id: 2,
1850            })
1851            .push_column_metadata(ColumnMetadata {
1852                column_schema: json_column,
1853                semantic_type: SemanticType::Field,
1854                column_id: 3,
1855            })
1856            .push_column_metadata(ColumnMetadata {
1857                column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true),
1858                semantic_type: SemanticType::Field,
1859                column_id: 4,
1860            })
1861            .primary_key(vec![1]);
1862
1863        Arc::new(builder.build().unwrap())
1864    }
1865
1866    fn concrete_json_type_hint() -> JsonNativeType {
1867        JsonNativeType::Object(JsonObjectType::from([
1868            ("active".to_string(), JsonNativeType::Bool),
1869            ("name".to_string(), JsonNativeType::String),
1870        ]))
1871    }
1872
1873    #[test]
1874    fn test_concretize_json_types_rewrites_json_output_schema() -> Result<()> {
1875        let metadata = metadata_with_json_field();
1876        let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3])?;
1877        let output_schema = mapper.output_schema();
1878        let original_arrow_schema = output_schema.arrow_schema();
1879        assert!(original_arrow_schema.has_json_extension_field());
1880        assert!(is_json_extension_type(&original_arrow_schema.fields()[1]));
1881
1882        let expected_type = concrete_json_type_hint();
1883        concretize_json_types(
1884            &mut mapper,
1885            &HashMap::from([("payload".to_string(), expected_type.clone())]),
1886        );
1887
1888        let output_schema = mapper.output_schema();
1889        let output_arrow_schema = output_schema.arrow_schema();
1890        assert!(output_arrow_schema.has_json_extension_field());
1891        assert_eq!(
1892            ConcreteDataType::string_datatype(),
1893            output_schema.column_schemas()[0].data_type
1894        );
1895        assert_eq!(
1896            ConcreteDataType::from(&expected_type),
1897            output_schema.column_schemas()[1].data_type
1898        );
1899        assert_eq!(
1900            ConcreteDataType::int64_datatype(),
1901            output_schema.column_schemas()[2].data_type
1902        );
1903        Ok(())
1904    }
1905
1906    #[test]
1907    fn test_concretize_json_types_keeps_json_without_hint() {
1908        let metadata = metadata_with_json_field();
1909        let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1910
1911        concretize_json_types(
1912            &mut mapper,
1913            &HashMap::from([("missing".to_string(), JsonNativeType::String)]),
1914        );
1915
1916        let output_schema = mapper.output_schema();
1917        let output_arrow_schema = output_schema.arrow_schema();
1918        assert!(output_arrow_schema.has_json_extension_field());
1919        assert!(is_json_extension_type(&output_arrow_schema.fields()[1]));
1920
1921        // Assert that the expected JSON type stays un-concretized: empty object.
1922        assert_eq!(
1923            ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
1924            output_schema.column_schemas()[1].data_type
1925        );
1926    }
1927
1928    #[tokio::test]
1929    async fn test_build_scan_fingerprint_for_eligible_scan() {
1930        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1931        let input = new_scan_input(
1932            metadata.clone(),
1933            vec![
1934                col("ts").gt_eq(ts_lit(1000)),
1935                col("k0").eq(lit("foo")),
1936                col("v0").gt(lit(1)),
1937            ],
1938        )
1939        .await
1940        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1941        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1942        .with_merge_mode(MergeMode::LastNonNull)
1943        .with_filter_deleted(false);
1944
1945        let fingerprint = build_scan_fingerprint(&input).unwrap();
1946
1947        let expected = ScanRequestFingerprintBuilder {
1948            read_columns: input.read_cols,
1949            read_column_types: vec![
1950                metadata
1951                    .column_by_id(0)
1952                    .map(|col| col.column_schema.data_type.clone()),
1953                metadata
1954                    .column_by_id(2)
1955                    .map(|col| col.column_schema.data_type.clone()),
1956                metadata
1957                    .column_by_id(3)
1958                    .map(|col| col.column_schema.data_type.clone()),
1959            ],
1960            filters: vec![
1961                col("k0").eq(lit("foo")).to_string(),
1962                col("v0").gt(lit(1)).to_string(),
1963            ],
1964            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1965            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1966            append_mode: false,
1967            filter_deleted: false,
1968            merge_mode: MergeMode::LastNonNull,
1969            partition_expr_version: 0,
1970        }
1971        .build();
1972        assert_eq!(expected, fingerprint);
1973    }
1974
1975    #[tokio::test]
1976    async fn test_build_scan_fingerprint_requires_tag_filter() {
1977        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1978        let input = new_scan_input(
1979            metadata,
1980            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1981        )
1982        .await;
1983
1984        assert!(build_scan_fingerprint(&input).is_none());
1985    }
1986
1987    #[tokio::test]
1988    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1989        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1990        let filters = vec![col("k0").eq(lit("foo"))];
1991
1992        let disabled = ScanInput::new(
1993            SchedulerEnv::new().await.access_layer.clone(),
1994            FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1995        )
1996        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1997        assert!(build_scan_fingerprint(&disabled).is_none());
1998
1999        let compaction = new_scan_input(metadata.clone(), filters.clone())
2000            .await
2001            .with_compaction(true);
2002        assert!(build_scan_fingerprint(&compaction).is_none());
2003
2004        // No files to read.
2005        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2006        assert!(build_scan_fingerprint(&no_files).is_none());
2007    }
2008
2009    #[tokio::test]
2010    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2011        let base = metadata_with_primary_key(vec![0, 1], false);
2012        let mut builder = RegionMetadataBuilder::from_existing(base);
2013        let partition_expr = partition_col("k0")
2014            .gt_eq(Value::String("foo".into()))
2015            .as_json_str()
2016            .unwrap();
2017        builder.partition_expr_json(Some(partition_expr));
2018        let metadata = Arc::new(builder.build_without_validation().unwrap());
2019
2020        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2021        let fingerprint = build_scan_fingerprint(&input).unwrap();
2022
2023        let expected = ScanRequestFingerprintBuilder {
2024            read_columns: input.read_cols,
2025            read_column_types: vec![
2026                metadata
2027                    .column_by_id(0)
2028                    .map(|col| col.column_schema.data_type.clone()),
2029                metadata
2030                    .column_by_id(2)
2031                    .map(|col| col.column_schema.data_type.clone()),
2032                metadata
2033                    .column_by_id(3)
2034                    .map(|col| col.column_schema.data_type.clone()),
2035            ],
2036            filters: vec![col("k0").eq(lit("foo")).to_string()],
2037            time_filters: vec![],
2038            series_row_selector: None,
2039            append_mode: false,
2040            filter_deleted: true,
2041            merge_mode: MergeMode::LastRow,
2042            partition_expr_version: metadata.partition_expr_version,
2043        }
2044        .build();
2045        assert_eq!(expected, fingerprint);
2046        assert_ne!(0, metadata.partition_expr_version);
2047    }
2048
2049    #[test]
2050    fn test_update_dyn_filters_with_empty_base_predicates() {
2051        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2052        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2053        assert!(predicate_group.predicate().is_none());
2054        assert!(predicate_group.predicate_without_region().is_none());
2055
2056        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2057        predicate_group.add_dyn_filters(vec![dyn_filter]);
2058
2059        let predicate_all = predicate_group.predicate().unwrap();
2060        assert!(predicate_all.exprs().is_empty());
2061        assert_eq!(1, predicate_all.dyn_filters().len());
2062
2063        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2064        assert!(predicate_without_region.exprs().is_empty());
2065        assert_eq!(1, predicate_without_region.dyn_filters().len());
2066    }
2067
2068    #[tokio::test]
2069    async fn test_range_pre_filter_mode() {
2070        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2071        let cases = [
2072            (true, MergeMode::LastRow, 1, PreFilterMode::All),
2073            (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2074            (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2075            (true, MergeMode::LastRow, 2, PreFilterMode::All),
2076        ];
2077
2078        for (append_mode, merge_mode, source_count, expected_mode) in cases {
2079            let input = new_scan_input(metadata.clone(), vec![])
2080                .await
2081                .with_append_mode(append_mode)
2082                .with_merge_mode(merge_mode);
2083
2084            assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2085        }
2086    }
2087}