mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41    ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
42};
43use table::predicate::{Predicate, build_time_range_predicate};
44use tokio::sync::{Semaphore, mpsc};
45use tokio_stream::wrappers::ReceiverStream;
46
47use crate::access_layer::AccessLayerRef;
48use crate::cache::CacheStrategy;
49use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
50use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
51#[cfg(feature = "enterprise")]
52use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
53use crate::memtable::{MemtableRange, RangesOptions};
54use crate::metrics::READ_SST_COUNT;
55use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
56use crate::read::projection::ProjectionMapper;
57use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
58use crate::read::range_cache::ScanRequestFingerprint;
59use crate::read::seq_scan::SeqScan;
60use crate::read::series_scan::SeriesScan;
61use crate::read::stream::ScanBatchStream;
62use crate::read::unordered_scan::UnorderedScan;
63use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
64use crate::region::options::MergeMode;
65use crate::region::version::VersionRef;
66use crate::sst::FormatType;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
70};
71use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
72use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
73use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
74use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
75#[cfg(feature = "vector_index")]
76use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
77use crate::sst::parquet::file_range::PreFilterMode;
78use crate::sst::parquet::reader::ReaderMetrics;
79
80/// Parallel scan channel size for flat format.
81const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
82#[cfg(feature = "vector_index")]
83const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
84
85/// A scanner scans a region and returns a [SendableRecordBatchStream].
86pub(crate) enum Scanner {
87    /// Sequential scan.
88    Seq(SeqScan),
89    /// Unordered scan.
90    Unordered(UnorderedScan),
91    /// Per-series scan.
92    Series(SeriesScan),
93}
94
95impl Scanner {
96    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
97    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
98    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
99        match self {
100            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
101            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
102            Scanner::Series(series_scan) => series_scan.build_stream().await,
103        }
104    }
105
106    /// Create a stream of [`Batch`] by this scanner.
107    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
108        match self {
109            Scanner::Seq(x) => x.scan_all_partitions(),
110            Scanner::Unordered(x) => x.scan_all_partitions(),
111            Scanner::Series(x) => x.scan_all_partitions(),
112        }
113    }
114}
115
116#[cfg(test)]
117impl Scanner {
118    /// Returns number of files to scan.
119    pub(crate) fn num_files(&self) -> usize {
120        match self {
121            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
122            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
123            Scanner::Series(series_scan) => series_scan.input().num_files(),
124        }
125    }
126
127    /// Returns number of memtables to scan.
128    pub(crate) fn num_memtables(&self) -> usize {
129        match self {
130            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
131            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
132            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
133        }
134    }
135
136    /// Returns SST file ids to scan.
137    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
138        match self {
139            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
140            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
141            Scanner::Series(series_scan) => series_scan.input().file_ids(),
142        }
143    }
144
145    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
146        match self {
147            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
148            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
149            Scanner::Series(series_scan) => series_scan.input().index_ids(),
150        }
151    }
152
153    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
154    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
155        use store_api::region_engine::{PrepareRequest, RegionScanner};
156
157        let request = PrepareRequest::default().with_target_partitions(target_partitions);
158        match self {
159            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
160            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
161            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
162        }
163    }
164}
165
166#[cfg_attr(doc, aquamarine::aquamarine)]
167/// Helper to scans a region by [ScanRequest].
168///
169/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
170/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
171///
172/// ```mermaid
173/// classDiagram
174/// class ScanRegion {
175///     -VersionRef version
176///     -ScanRequest request
177///     ~scanner() Scanner
178///     ~seq_scan() SeqScan
179/// }
180/// class Scanner {
181///     <<enumeration>>
182///     SeqScan
183///     UnorderedScan
184///     +scan() SendableRecordBatchStream
185/// }
186/// class SeqScan {
187///     -ScanInput input
188///     +build() SendableRecordBatchStream
189/// }
190/// class UnorderedScan {
191///     -ScanInput input
192///     +build() SendableRecordBatchStream
193/// }
194/// class ScanInput {
195///     -ProjectionMapper mapper
196///     -Option~TimeRange~ time_range
197///     -Option~Predicate~ predicate
198///     -Vec~MemtableRef~ memtables
199///     -Vec~FileHandle~ files
200/// }
201/// class ProjectionMapper {
202///     ~output_schema() SchemaRef
203///     ~convert(Batch) RecordBatch
204/// }
205/// ScanRegion -- Scanner
206/// ScanRegion o-- ScanRequest
207/// Scanner o-- SeqScan
208/// Scanner o-- UnorderedScan
209/// SeqScan o-- ScanInput
210/// UnorderedScan o-- ScanInput
211/// Scanner -- SendableRecordBatchStream
212/// ScanInput o-- ProjectionMapper
213/// SeqScan -- SendableRecordBatchStream
214/// UnorderedScan -- SendableRecordBatchStream
215/// ```
216pub(crate) struct ScanRegion {
217    /// Version of the region at scan.
218    version: VersionRef,
219    /// Access layer of the region.
220    access_layer: AccessLayerRef,
221    /// Scan request.
222    request: ScanRequest,
223    /// Cache.
224    cache_strategy: CacheStrategy,
225    /// Capacity of the channel to send data from parallel scan tasks to the main task.
226    parallel_scan_channel_size: usize,
227    /// Maximum number of SST files to scan concurrently.
228    max_concurrent_scan_files: usize,
229    /// Whether to ignore inverted index.
230    ignore_inverted_index: bool,
231    /// Whether to ignore fulltext index.
232    ignore_fulltext_index: bool,
233    /// Whether to ignore bloom filter.
234    ignore_bloom_filter: bool,
235    /// Start time of the scan task.
236    start_time: Option<Instant>,
237    /// Whether to filter out the deleted rows.
238    /// Usually true for normal read, and false for scan for compaction.
239    filter_deleted: bool,
240    #[cfg(feature = "enterprise")]
241    extension_range_provider: Option<BoxedExtensionRangeProvider>,
242}
243
244impl ScanRegion {
245    /// Creates a [ScanRegion].
246    pub(crate) fn new(
247        version: VersionRef,
248        access_layer: AccessLayerRef,
249        request: ScanRequest,
250        cache_strategy: CacheStrategy,
251    ) -> ScanRegion {
252        ScanRegion {
253            version,
254            access_layer,
255            request,
256            cache_strategy,
257            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
258            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
259            ignore_inverted_index: false,
260            ignore_fulltext_index: false,
261            ignore_bloom_filter: false,
262            start_time: None,
263            filter_deleted: true,
264            #[cfg(feature = "enterprise")]
265            extension_range_provider: None,
266        }
267    }
268
269    /// Sets parallel scan task channel size.
270    #[must_use]
271    pub(crate) fn with_parallel_scan_channel_size(
272        mut self,
273        parallel_scan_channel_size: usize,
274    ) -> Self {
275        self.parallel_scan_channel_size = parallel_scan_channel_size;
276        self
277    }
278
279    /// Sets maximum number of SST files to scan concurrently.
280    #[must_use]
281    pub(crate) fn with_max_concurrent_scan_files(
282        mut self,
283        max_concurrent_scan_files: usize,
284    ) -> Self {
285        self.max_concurrent_scan_files = max_concurrent_scan_files;
286        self
287    }
288
289    /// Sets whether to ignore inverted index.
290    #[must_use]
291    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292        self.ignore_inverted_index = ignore;
293        self
294    }
295
296    /// Sets whether to ignore fulltext index.
297    #[must_use]
298    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299        self.ignore_fulltext_index = ignore;
300        self
301    }
302
303    /// Sets whether to ignore bloom filter.
304    #[must_use]
305    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306        self.ignore_bloom_filter = ignore;
307        self
308    }
309
310    #[must_use]
311    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312        self.start_time = Some(now);
313        self
314    }
315
316    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317        self.filter_deleted = filter_deleted;
318    }
319
320    #[cfg(feature = "enterprise")]
321    pub(crate) fn set_extension_range_provider(
322        &mut self,
323        extension_range_provider: BoxedExtensionRangeProvider,
324    ) {
325        self.extension_range_provider = Some(extension_range_provider);
326    }
327
328    /// Returns a [Scanner] to scan the region.
329    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330    pub(crate) async fn scanner(self) -> Result<Scanner> {
331        if self.use_series_scan() {
332            self.series_scan().await.map(Scanner::Series)
333        } else if self.use_unordered_scan() {
334            // If table is append only and there is no series row selector, we use unordered scan in query.
335            // We still use seq scan in compaction.
336            self.unordered_scan().await.map(Scanner::Unordered)
337        } else {
338            self.seq_scan().await.map(Scanner::Seq)
339        }
340    }
341
342    /// Returns a [RegionScanner] to scan the region.
343    #[tracing::instrument(
344        level = tracing::Level::DEBUG,
345        skip_all,
346        fields(region_id = %self.region_id())
347    )]
348    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349        if self.use_series_scan() {
350            self.series_scan()
351                .await
352                .map(|scanner| Box::new(scanner) as _)
353        } else if self.use_unordered_scan() {
354            self.unordered_scan()
355                .await
356                .map(|scanner| Box::new(scanner) as _)
357        } else {
358            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359        }
360    }
361
362    /// Scan sequentially.
363    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365        let input = self.scan_input().await?.with_compaction(false);
366        Ok(SeqScan::new(input))
367    }
368
369    /// Unordered scan.
370    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372        let input = self.scan_input().await?;
373        Ok(UnorderedScan::new(input))
374    }
375
376    /// Scans by series.
377    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379        let input = self.scan_input().await?;
380        Ok(SeriesScan::new(input))
381    }
382
383    /// Returns true if the region can use unordered scan for current request.
384    fn use_unordered_scan(&self) -> bool {
385        // We use unordered scan when:
386        // 1. The region is in append mode.
387        // 2. There is no series row selector.
388        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
389        //
390        // We still use seq scan in compaction.
391        self.version.options.append_mode
392            && self.request.series_row_selector.is_none()
393            && (self.request.distribution.is_none()
394                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395    }
396
397    /// Returns true if the region can use series scan for current request.
398    fn use_series_scan(&self) -> bool {
399        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400    }
401
402    /// Returns true if the region use flat format.
403    fn use_flat_format(&self) -> bool {
404        self.request.force_flat_format
405            || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
406    }
407
408    /// Creates a scan input.
409    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
410    async fn scan_input(mut self) -> Result<ScanInput> {
411        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
412        let time_range = self.build_time_range_predicate();
413        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
414        let flat_format = self.use_flat_format();
415
416        let read_column_ids = match &self.request.projection {
417            Some(p) => self.build_read_column_ids(p, &predicate)?,
418            None => self
419                .version
420                .metadata
421                .column_metadatas
422                .iter()
423                .map(|col| col.column_id)
424                .collect(),
425        };
426
427        // The mapper always computes projected column ids as the schema of SSTs may change.
428        let mapper = match &self.request.projection {
429            Some(p) => ProjectionMapper::new_with_read_columns(
430                &self.version.metadata,
431                p.iter().copied(),
432                flat_format,
433                read_column_ids.clone(),
434            )?,
435            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
436        };
437
438        let ssts = &self.version.ssts;
439        let mut files = Vec::new();
440        for level in ssts.levels() {
441            for file in level.files.values() {
442                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
443                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
444                    // If the file's sequence is None (or actually is zero), it could mean the file
445                    // is generated and added to the region "directly". In this case, its data should
446                    // be considered as fresh as the memtable. So its sequence is treated greater than
447                    // the min_sequence, whatever the value of min_sequence is. Hence the default
448                    // "true" in this arm.
449                    (Some(_), None) => true,
450                    (None, _) => true,
451                };
452
453                // Finds SST files in range.
454                if exceed_min_sequence && file_in_range(file, &time_range) {
455                    files.push(file.clone());
456                }
457                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
458                // unless the timing is too good, or the sequence number wouldn't be in file.
459                // and the batch will be filtered out by tree reader anyway.
460            }
461        }
462
463        let memtables = self.version.memtables.list_memtables();
464        // Skip empty memtables and memtables out of time range.
465        let mut mem_range_builders = Vec::new();
466        let filter_mode = pre_filter_mode(
467            self.version.options.append_mode,
468            self.version.options.merge_mode(),
469        );
470
471        for m in memtables {
472            // check if memtable is empty by reading stats.
473            let Some((start, end)) = m.stats().time_range() else {
474                continue;
475            };
476            // The time range of the memtable is inclusive.
477            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
478            if !memtable_range.intersects(&time_range) {
479                continue;
480            }
481            let ranges_in_memtable = m.ranges(
482                Some(read_column_ids.as_slice()),
483                RangesOptions::default()
484                    .with_predicate(predicate.clone())
485                    .with_sequence(SequenceRange::new(
486                        self.request.memtable_min_sequence,
487                        self.request.memtable_max_sequence,
488                    ))
489                    .with_pre_filter_mode(filter_mode),
490            )?;
491            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
492                let stats = v.stats().clone();
493                MemRangeBuilder::new(v, stats)
494            }));
495        }
496
497        let region_id = self.region_id();
498        debug!(
499            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
500            region_id,
501            self.request,
502            time_range,
503            mem_range_builders.len(),
504            files.len(),
505            self.version.options.append_mode,
506            flat_format,
507        );
508
509        let (non_field_filters, field_filters) = self.partition_by_field_filters();
510        let inverted_index_appliers = [
511            self.build_invereted_index_applier(&non_field_filters),
512            self.build_invereted_index_applier(&field_filters),
513        ];
514        let bloom_filter_appliers = [
515            self.build_bloom_filter_applier(&non_field_filters),
516            self.build_bloom_filter_applier(&field_filters),
517        ];
518        let fulltext_index_appliers = [
519            self.build_fulltext_index_applier(&non_field_filters),
520            self.build_fulltext_index_applier(&field_filters),
521        ];
522        #[cfg(feature = "vector_index")]
523        let vector_index_applier = self.build_vector_index_applier();
524        #[cfg(feature = "vector_index")]
525        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
526            if self.request.filters.is_empty() {
527                search.k
528            } else {
529                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
530            }
531        });
532
533        if flat_format {
534            // The batch is already large enough so we use a small channel size here.
535            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
536        }
537
538        let input = ScanInput::new(self.access_layer, mapper)
539            .with_time_range(Some(time_range))
540            .with_predicate(predicate)
541            .with_memtables(mem_range_builders)
542            .with_files(files)
543            .with_cache(self.cache_strategy)
544            .with_inverted_index_appliers(inverted_index_appliers)
545            .with_bloom_filter_index_appliers(bloom_filter_appliers)
546            .with_fulltext_index_appliers(fulltext_index_appliers)
547            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
548            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
549            .with_start_time(self.start_time)
550            .with_append_mode(self.version.options.append_mode)
551            .with_filter_deleted(self.filter_deleted)
552            .with_merge_mode(self.version.options.merge_mode())
553            .with_series_row_selector(self.request.series_row_selector)
554            .with_distribution(self.request.distribution)
555            .with_flat_format(flat_format);
556        #[cfg(feature = "vector_index")]
557        let input = input
558            .with_vector_index_applier(vector_index_applier)
559            .with_vector_index_k(vector_index_k);
560
561        #[cfg(feature = "enterprise")]
562        let input = if let Some(provider) = self.extension_range_provider {
563            let ranges = provider
564                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
565                .await?;
566            debug!("Find extension ranges: {ranges:?}");
567            input.with_extension_ranges(ranges)
568        } else {
569            input
570        };
571        Ok(input)
572    }
573
574    fn region_id(&self) -> RegionId {
575        self.version.metadata.region_id
576    }
577
578    /// Build time range predicate from filters.
579    fn build_time_range_predicate(&self) -> TimestampRange {
580        let time_index = self.version.metadata.time_index_column();
581        let unit = time_index
582            .column_schema
583            .data_type
584            .as_timestamp()
585            .expect("Time index must have timestamp-compatible type")
586            .unit();
587        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
588    }
589
590    /// Return all columns id to read according to the projection and filters.
591    fn build_read_column_ids(
592        &self,
593        projection: &[usize],
594        predicate: &PredicateGroup,
595    ) -> Result<Vec<ColumnId>> {
596        let metadata = &self.version.metadata;
597        // use Vec for read_column_ids to keep the order of columns.
598        let mut read_column_ids = Vec::new();
599        let mut seen = HashSet::new();
600
601        for idx in projection {
602            let column =
603                metadata
604                    .column_metadatas
605                    .get(*idx)
606                    .with_context(|| InvalidRequestSnafu {
607                        region_id: metadata.region_id,
608                        reason: format!("projection index {} is out of bound", idx),
609                    })?;
610            seen.insert(column.column_id);
611            // keep the projection order
612            read_column_ids.push(column.column_id);
613        }
614
615        if projection.is_empty() {
616            let time_index = metadata.time_index_column().column_id;
617            if seen.insert(time_index) {
618                read_column_ids.push(time_index);
619            }
620        }
621
622        let mut extra_names = HashSet::new();
623        let mut columns = HashSet::new();
624
625        for expr in &self.request.filters {
626            columns.clear();
627            if expr_to_columns(expr, &mut columns).is_err() {
628                continue;
629            }
630            extra_names.extend(columns.iter().map(|column| column.name.clone()));
631        }
632
633        if let Some(expr) = predicate.region_partition_expr() {
634            expr.collect_column_names(&mut extra_names);
635        }
636
637        if !extra_names.is_empty() {
638            for column in &metadata.column_metadatas {
639                if extra_names.contains(column.column_schema.name.as_str())
640                    && !seen.contains(&column.column_id)
641                {
642                    read_column_ids.push(column.column_id);
643                }
644                extra_names.remove(column.column_schema.name.as_str());
645            }
646            if !extra_names.is_empty() {
647                warn!(
648                    "Some columns in filters are not found in region {}: {:?}",
649                    metadata.region_id, extra_names
650                );
651            }
652        }
653        Ok(read_column_ids)
654    }
655
656    /// Partitions filters into two groups: non-field filters and field filters.
657    /// Returns `(non_field_filters, field_filters)`.
658    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
659        let field_columns = self
660            .version
661            .metadata
662            .field_columns()
663            .map(|col| &col.column_schema.name)
664            .collect::<HashSet<_>>();
665
666        let mut columns = HashSet::new();
667
668        self.request.filters.iter().cloned().partition(|expr| {
669            columns.clear();
670            // `expr_to_columns` won't return error.
671            if expr_to_columns(expr, &mut columns).is_err() {
672                // If we can't extract columns, treat it as non-field filter
673                return true;
674            }
675            // Return true for non-field filters (partition puts true cases in first vec)
676            !columns
677                .iter()
678                .any(|column| field_columns.contains(&column.name))
679        })
680    }
681
682    /// Use the latest schema to build the inverted index applier.
683    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
684        if self.ignore_inverted_index {
685            return None;
686        }
687
688        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
689        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
690
691        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692
693        InvertedIndexApplierBuilder::new(
694            self.access_layer.table_dir().to_string(),
695            self.access_layer.path_type(),
696            self.access_layer.object_store().clone(),
697            self.version.metadata.as_ref(),
698            self.version.metadata.inverted_indexed_column_ids(
699                self.version
700                    .options
701                    .index_options
702                    .inverted_index
703                    .ignore_column_ids
704                    .iter(),
705            ),
706            self.access_layer.puffin_manager_factory().clone(),
707        )
708        .with_file_cache(file_cache)
709        .with_inverted_index_cache(inverted_index_cache)
710        .with_puffin_metadata_cache(puffin_metadata_cache)
711        .build(filters)
712        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
713        .ok()
714        .flatten()
715        .map(Arc::new)
716    }
717
718    /// Use the latest schema to build the bloom filter index applier.
719    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
720        if self.ignore_bloom_filter {
721            return None;
722        }
723
724        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
725        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
726        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
727
728        BloomFilterIndexApplierBuilder::new(
729            self.access_layer.table_dir().to_string(),
730            self.access_layer.path_type(),
731            self.access_layer.object_store().clone(),
732            self.version.metadata.as_ref(),
733            self.access_layer.puffin_manager_factory().clone(),
734        )
735        .with_file_cache(file_cache)
736        .with_bloom_filter_index_cache(bloom_filter_index_cache)
737        .with_puffin_metadata_cache(puffin_metadata_cache)
738        .build(filters)
739        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
740        .ok()
741        .flatten()
742        .map(Arc::new)
743    }
744
745    /// Use the latest schema to build the fulltext index applier.
746    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
747        if self.ignore_fulltext_index {
748            return None;
749        }
750
751        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
752        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
753        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
754        FulltextIndexApplierBuilder::new(
755            self.access_layer.table_dir().to_string(),
756            self.access_layer.path_type(),
757            self.access_layer.object_store().clone(),
758            self.access_layer.puffin_manager_factory().clone(),
759            self.version.metadata.as_ref(),
760        )
761        .with_file_cache(file_cache)
762        .with_puffin_metadata_cache(puffin_metadata_cache)
763        .with_bloom_filter_cache(bloom_filter_index_cache)
764        .build(filters)
765        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
766        .ok()
767        .flatten()
768        .map(Arc::new)
769    }
770
771    /// Build the vector index applier from vector search request.
772    #[cfg(feature = "vector_index")]
773    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
774        let vector_search = self.request.vector_search.as_ref()?;
775
776        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
777        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
778        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
779
780        let applier = VectorIndexApplier::new(
781            self.access_layer.table_dir().to_string(),
782            self.access_layer.path_type(),
783            self.access_layer.object_store().clone(),
784            self.access_layer.puffin_manager_factory().clone(),
785            vector_search.column_id,
786            vector_search.query_vector.clone(),
787            vector_search.metric,
788        )
789        .with_file_cache(file_cache)
790        .with_puffin_metadata_cache(puffin_metadata_cache)
791        .with_vector_index_cache(vector_index_cache);
792
793        Some(Arc::new(applier))
794    }
795}
796
797/// Returns true if the time range of a SST `file` matches the `predicate`.
798fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
799    if predicate == &TimestampRange::min_to_max() {
800        return true;
801    }
802    // end timestamp of a SST is inclusive.
803    let (start, end) = file.time_range();
804    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
805    file_ts_range.intersects(predicate)
806}
807
808/// Common input for different scanners.
809pub struct ScanInput {
810    /// Region SST access layer.
811    access_layer: AccessLayerRef,
812    /// Maps projected Batches to RecordBatches.
813    pub(crate) mapper: Arc<ProjectionMapper>,
814    /// Column ids to read from memtables and SSTs.
815    /// Notice this is different from the columns in `mapper` which are projected columns.
816    /// But this read columns might also include non-projected columns needed for filtering.
817    pub(crate) read_column_ids: Vec<ColumnId>,
818    /// Time range filter for time index.
819    pub(crate) time_range: Option<TimestampRange>,
820    /// Predicate to push down.
821    pub(crate) predicate: PredicateGroup,
822    /// Region partition expr applied at read time.
823    region_partition_expr: Option<PartitionExpr>,
824    /// Memtable range builders for memtables in the time range..
825    pub(crate) memtables: Vec<MemRangeBuilder>,
826    /// Handles to SST files to scan.
827    pub(crate) files: Vec<FileHandle>,
828    /// Cache.
829    pub(crate) cache_strategy: CacheStrategy,
830    /// Ignores file not found error.
831    ignore_file_not_found: bool,
832    /// Capacity of the channel to send data from parallel scan tasks to the main task.
833    pub(crate) parallel_scan_channel_size: usize,
834    /// Maximum number of SST files to scan concurrently.
835    pub(crate) max_concurrent_scan_files: usize,
836    /// Index appliers.
837    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
838    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
839    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
840    /// Vector index applier for KNN search.
841    #[cfg(feature = "vector_index")]
842    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
843    /// Over-fetched k for vector index scan.
844    #[cfg(feature = "vector_index")]
845    pub(crate) vector_index_k: Option<usize>,
846    /// Start time of the query.
847    pub(crate) query_start: Option<Instant>,
848    /// The region is using append mode.
849    pub(crate) append_mode: bool,
850    /// Whether to remove deletion markers.
851    pub(crate) filter_deleted: bool,
852    /// Mode to merge duplicate rows.
853    pub(crate) merge_mode: MergeMode,
854    /// Hint to select rows from time series.
855    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
856    /// Hint for the required distribution of the scanner.
857    pub(crate) distribution: Option<TimeSeriesDistribution>,
858    /// Whether to use flat format.
859    pub(crate) flat_format: bool,
860    /// Whether this scan is for compaction.
861    pub(crate) compaction: bool,
862    #[cfg(feature = "enterprise")]
863    extension_ranges: Vec<BoxedExtensionRange>,
864}
865
866impl ScanInput {
867    /// Creates a new [ScanInput].
868    #[must_use]
869    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
870        ScanInput {
871            access_layer,
872            read_column_ids: mapper.column_ids().to_vec(),
873            mapper: Arc::new(mapper),
874            time_range: None,
875            predicate: PredicateGroup::default(),
876            region_partition_expr: None,
877            memtables: Vec::new(),
878            files: Vec::new(),
879            cache_strategy: CacheStrategy::Disabled,
880            ignore_file_not_found: false,
881            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
882            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
883            inverted_index_appliers: [None, None],
884            bloom_filter_index_appliers: [None, None],
885            fulltext_index_appliers: [None, None],
886            #[cfg(feature = "vector_index")]
887            vector_index_applier: None,
888            #[cfg(feature = "vector_index")]
889            vector_index_k: None,
890            query_start: None,
891            append_mode: false,
892            filter_deleted: true,
893            merge_mode: MergeMode::default(),
894            series_row_selector: None,
895            distribution: None,
896            flat_format: false,
897            compaction: false,
898            #[cfg(feature = "enterprise")]
899            extension_ranges: Vec::new(),
900        }
901    }
902
903    /// Sets time range filter for time index.
904    #[must_use]
905    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
906        self.time_range = time_range;
907        self
908    }
909
910    /// Sets predicate to push down.
911    #[must_use]
912    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
913        self.region_partition_expr = predicate.region_partition_expr().cloned();
914        self.predicate = predicate;
915        self
916    }
917
918    /// Sets memtable range builders.
919    #[must_use]
920    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
921        self.memtables = memtables;
922        self
923    }
924
925    /// Sets files to read.
926    #[must_use]
927    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
928        self.files = files;
929        self
930    }
931
932    /// Sets cache for this query.
933    #[must_use]
934    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
935        self.cache_strategy = cache;
936        self
937    }
938
939    /// Ignores file not found error.
940    #[must_use]
941    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
942        self.ignore_file_not_found = ignore;
943        self
944    }
945
946    /// Sets scan task channel size.
947    #[must_use]
948    pub(crate) fn with_parallel_scan_channel_size(
949        mut self,
950        parallel_scan_channel_size: usize,
951    ) -> Self {
952        self.parallel_scan_channel_size = parallel_scan_channel_size;
953        self
954    }
955
956    /// Sets maximum number of SST files to scan concurrently.
957    #[must_use]
958    pub(crate) fn with_max_concurrent_scan_files(
959        mut self,
960        max_concurrent_scan_files: usize,
961    ) -> Self {
962        self.max_concurrent_scan_files = max_concurrent_scan_files;
963        self
964    }
965
966    /// Sets inverted index appliers.
967    #[must_use]
968    pub(crate) fn with_inverted_index_appliers(
969        mut self,
970        appliers: [Option<InvertedIndexApplierRef>; 2],
971    ) -> Self {
972        self.inverted_index_appliers = appliers;
973        self
974    }
975
976    /// Sets bloom filter appliers.
977    #[must_use]
978    pub(crate) fn with_bloom_filter_index_appliers(
979        mut self,
980        appliers: [Option<BloomFilterIndexApplierRef>; 2],
981    ) -> Self {
982        self.bloom_filter_index_appliers = appliers;
983        self
984    }
985
986    /// Sets fulltext index appliers.
987    #[must_use]
988    pub(crate) fn with_fulltext_index_appliers(
989        mut self,
990        appliers: [Option<FulltextIndexApplierRef>; 2],
991    ) -> Self {
992        self.fulltext_index_appliers = appliers;
993        self
994    }
995
996    /// Sets vector index applier for KNN search.
997    #[cfg(feature = "vector_index")]
998    #[must_use]
999    pub(crate) fn with_vector_index_applier(
1000        mut self,
1001        applier: Option<VectorIndexApplierRef>,
1002    ) -> Self {
1003        self.vector_index_applier = applier;
1004        self
1005    }
1006
1007    /// Sets over-fetched k for vector index scan.
1008    #[cfg(feature = "vector_index")]
1009    #[must_use]
1010    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1011        self.vector_index_k = k;
1012        self
1013    }
1014
1015    /// Sets start time of the query.
1016    #[must_use]
1017    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1018        self.query_start = now;
1019        self
1020    }
1021
1022    #[must_use]
1023    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1024        self.append_mode = is_append_mode;
1025        self
1026    }
1027
1028    /// Sets whether to remove deletion markers during scan.
1029    #[must_use]
1030    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1031        self.filter_deleted = filter_deleted;
1032        self
1033    }
1034
1035    /// Sets the merge mode.
1036    #[must_use]
1037    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1038        self.merge_mode = merge_mode;
1039        self
1040    }
1041
1042    /// Sets the distribution hint.
1043    #[must_use]
1044    pub(crate) fn with_distribution(
1045        mut self,
1046        distribution: Option<TimeSeriesDistribution>,
1047    ) -> Self {
1048        self.distribution = distribution;
1049        self
1050    }
1051
1052    /// Sets the time series row selector.
1053    #[must_use]
1054    pub(crate) fn with_series_row_selector(
1055        mut self,
1056        series_row_selector: Option<TimeSeriesRowSelector>,
1057    ) -> Self {
1058        self.series_row_selector = series_row_selector;
1059        self
1060    }
1061
1062    /// Sets whether to use flat format.
1063    #[must_use]
1064    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1065        self.flat_format = flat_format;
1066        self
1067    }
1068
1069    /// Sets whether this scan is for compaction.
1070    #[must_use]
1071    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1072        self.compaction = compaction;
1073        self
1074    }
1075
1076    /// Scans sources in parallel.
1077    ///
1078    /// # Panics if the input doesn't allow parallel scan.
1079    #[tracing::instrument(
1080        skip(self, sources, semaphore),
1081        fields(
1082            region_id = %self.region_metadata().region_id,
1083            source_count = sources.len()
1084        )
1085    )]
1086    pub(crate) fn create_parallel_sources(
1087        &self,
1088        sources: Vec<Source>,
1089        semaphore: Arc<Semaphore>,
1090    ) -> Result<Vec<Source>> {
1091        if sources.len() <= 1 {
1092            return Ok(sources);
1093        }
1094
1095        // Spawn a task for each source.
1096        let sources = sources
1097            .into_iter()
1098            .map(|source| {
1099                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1100                self.spawn_scan_task(source, semaphore.clone(), sender);
1101                let stream = Box::pin(ReceiverStream::new(receiver));
1102                Source::Stream(stream)
1103            })
1104            .collect();
1105        Ok(sources)
1106    }
1107
1108    /// Builds memtable ranges to scan by `index`.
1109    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1110        let memtable = &self.memtables[index.index];
1111        let mut ranges = SmallVec::new();
1112        memtable.build_ranges(index.row_group_index, &mut ranges);
1113        ranges
1114    }
1115
1116    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1117        if self.should_skip_region_partition(file) {
1118            self.predicate.predicate_without_region().cloned()
1119        } else {
1120            self.predicate.predicate().cloned()
1121        }
1122    }
1123
1124    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1125        match (
1126            self.region_partition_expr.as_ref(),
1127            file.meta_ref().partition_expr.as_ref(),
1128        ) {
1129            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1130            _ => false,
1131        }
1132    }
1133
1134    /// Prunes a file to scan and returns the builder to build readers.
1135    #[tracing::instrument(
1136        skip_all,
1137        fields(
1138            region_id = %self.region_metadata().region_id,
1139            file_id = %file.file_id()
1140        )
1141    )]
1142    pub async fn prune_file(
1143        &self,
1144        file: &FileHandle,
1145        reader_metrics: &mut ReaderMetrics,
1146    ) -> Result<FileRangeBuilder> {
1147        let predicate = self.predicate_for_file(file);
1148        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1149        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1150        let reader = self
1151            .access_layer
1152            .read_sst(file.clone())
1153            .predicate(predicate)
1154            .projection(Some(self.read_column_ids.clone()))
1155            .cache(self.cache_strategy.clone())
1156            .inverted_index_appliers(self.inverted_index_appliers.clone())
1157            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1158            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1159        #[cfg(feature = "vector_index")]
1160        let reader = {
1161            let mut reader = reader;
1162            reader =
1163                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1164            reader
1165        };
1166        let res = reader
1167            .expected_metadata(Some(self.mapper.metadata().clone()))
1168            .flat_format(self.flat_format)
1169            .compaction(self.compaction)
1170            .pre_filter_mode(filter_mode)
1171            .decode_primary_key_values(decode_pk_values)
1172            .build_reader_input(reader_metrics)
1173            .await;
1174        let read_input = match res {
1175            Ok(x) => x,
1176            Err(e) => {
1177                if e.is_object_not_found() && self.ignore_file_not_found {
1178                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1179                    return Ok(FileRangeBuilder::default());
1180                } else {
1181                    return Err(e);
1182                }
1183            }
1184        };
1185
1186        let Some((mut file_range_ctx, selection)) = read_input else {
1187            return Ok(FileRangeBuilder::default());
1188        };
1189
1190        let need_compat = !compat::has_same_columns_and_pk_encoding(
1191            self.mapper.metadata(),
1192            file_range_ctx.read_format().metadata(),
1193        );
1194        if need_compat {
1195            // They have different schema. We need to adapt the batch first so the
1196            // mapper can convert it.
1197            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1198                let mapper = self.mapper.as_flat().unwrap();
1199                FlatCompatBatch::try_new(
1200                    mapper,
1201                    flat_format.metadata(),
1202                    flat_format.format_projection(),
1203                    self.compaction,
1204                )?
1205                .map(CompatBatch::Flat)
1206            } else {
1207                let compact_batch = PrimaryKeyCompatBatch::new(
1208                    &self.mapper,
1209                    file_range_ctx.read_format().metadata().clone(),
1210                )?;
1211                Some(CompatBatch::PrimaryKey(compact_batch))
1212            };
1213            file_range_ctx.set_compat_batch(compat);
1214        }
1215        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1216    }
1217
1218    /// Scans the input source in another task and sends batches to the sender.
1219    #[tracing::instrument(
1220        skip(self, input, semaphore, sender),
1221        fields(region_id = %self.region_metadata().region_id)
1222    )]
1223    pub(crate) fn spawn_scan_task(
1224        &self,
1225        mut input: Source,
1226        semaphore: Arc<Semaphore>,
1227        sender: mpsc::Sender<Result<Batch>>,
1228    ) {
1229        let region_id = self.region_metadata().region_id;
1230        let span = tracing::info_span!(
1231            "ScanInput::parallel_scan_task",
1232            region_id = %region_id,
1233            stream_kind = "batch"
1234        );
1235        common_runtime::spawn_global(
1236            async move {
1237                loop {
1238                    // We release the permit before sending result to avoid the task waiting on
1239                    // the channel with the permit held.
1240                    let maybe_batch = {
1241                        // Safety: We never close the semaphore.
1242                        let _permit = semaphore.acquire().await.unwrap();
1243                        input.next_batch().await
1244                    };
1245                    match maybe_batch {
1246                        Ok(Some(batch)) => {
1247                            let _ = sender.send(Ok(batch)).await;
1248                        }
1249                        Ok(None) => break,
1250                        Err(e) => {
1251                            let _ = sender.send(Err(e)).await;
1252                            break;
1253                        }
1254                    }
1255                }
1256            }
1257            .instrument(span),
1258        );
1259    }
1260
1261    /// Scans flat sources (RecordBatch streams) in parallel.
1262    ///
1263    /// # Panics if the input doesn't allow parallel scan.
1264    #[tracing::instrument(
1265        skip(self, sources, semaphore),
1266        fields(
1267            region_id = %self.region_metadata().region_id,
1268            source_count = sources.len()
1269        )
1270    )]
1271    pub(crate) fn create_parallel_flat_sources(
1272        &self,
1273        sources: Vec<BoxedRecordBatchStream>,
1274        semaphore: Arc<Semaphore>,
1275    ) -> Result<Vec<BoxedRecordBatchStream>> {
1276        if sources.len() <= 1 {
1277            return Ok(sources);
1278        }
1279
1280        // Spawn a task for each source.
1281        let sources = sources
1282            .into_iter()
1283            .map(|source| {
1284                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1285                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1286                let stream = Box::pin(ReceiverStream::new(receiver));
1287                Box::pin(stream) as _
1288            })
1289            .collect();
1290        Ok(sources)
1291    }
1292
1293    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1294    #[tracing::instrument(
1295        skip(self, input, semaphore, sender),
1296        fields(region_id = %self.region_metadata().region_id)
1297    )]
1298    pub(crate) fn spawn_flat_scan_task(
1299        &self,
1300        mut input: BoxedRecordBatchStream,
1301        semaphore: Arc<Semaphore>,
1302        sender: mpsc::Sender<Result<RecordBatch>>,
1303    ) {
1304        let region_id = self.region_metadata().region_id;
1305        let span = tracing::info_span!(
1306            "ScanInput::parallel_scan_task",
1307            region_id = %region_id,
1308            stream_kind = "flat"
1309        );
1310        common_runtime::spawn_global(
1311            async move {
1312                loop {
1313                    // We release the permit before sending result to avoid the task waiting on
1314                    // the channel with the permit held.
1315                    let maybe_batch = {
1316                        // Safety: We never close the semaphore.
1317                        let _permit = semaphore.acquire().await.unwrap();
1318                        input.next().await
1319                    };
1320                    match maybe_batch {
1321                        Some(Ok(batch)) => {
1322                            let _ = sender.send(Ok(batch)).await;
1323                        }
1324                        Some(Err(e)) => {
1325                            let _ = sender.send(Err(e)).await;
1326                            break;
1327                        }
1328                        None => break,
1329                    }
1330                }
1331            }
1332            .instrument(span),
1333        );
1334    }
1335
1336    pub(crate) fn total_rows(&self) -> usize {
1337        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1338        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1339
1340        let rows = rows_in_files + rows_in_memtables;
1341        #[cfg(feature = "enterprise")]
1342        let rows = rows
1343            + self
1344                .extension_ranges
1345                .iter()
1346                .map(|x| x.num_rows())
1347                .sum::<u64>() as usize;
1348        rows
1349    }
1350
1351    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1352        &self.predicate
1353    }
1354
1355    /// Returns number of memtables to scan.
1356    pub(crate) fn num_memtables(&self) -> usize {
1357        self.memtables.len()
1358    }
1359
1360    /// Returns number of SST files to scan.
1361    pub(crate) fn num_files(&self) -> usize {
1362        self.files.len()
1363    }
1364
1365    /// Gets the file handle from a row group index.
1366    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1367        let file_index = index.index - self.num_memtables();
1368        &self.files[file_index]
1369    }
1370
1371    pub fn region_metadata(&self) -> &RegionMetadataRef {
1372        self.mapper.metadata()
1373    }
1374}
1375
1376#[cfg(feature = "enterprise")]
1377impl ScanInput {
1378    #[must_use]
1379    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1380        Self {
1381            extension_ranges,
1382            ..self
1383        }
1384    }
1385
1386    #[cfg(feature = "enterprise")]
1387    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1388        &self.extension_ranges
1389    }
1390
1391    /// Get a boxed [ExtensionRange] by the index in all ranges.
1392    #[cfg(feature = "enterprise")]
1393    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1394        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1395    }
1396}
1397
1398#[cfg(test)]
1399impl ScanInput {
1400    /// Returns SST file ids to scan.
1401    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1402        self.files.iter().map(|file| file.file_id()).collect()
1403    }
1404
1405    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1406        self.files.iter().map(|file| file.index_id()).collect()
1407    }
1408}
1409
1410fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1411    if append_mode {
1412        return PreFilterMode::All;
1413    }
1414
1415    match merge_mode {
1416        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1417        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1418    }
1419}
1420
1421/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1422/// for partition range caching.
1423#[cfg_attr(not(test), allow(dead_code))]
1424pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1425    let eligible = input.flat_format
1426        && !input.compaction
1427        && !input.files.is_empty()
1428        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1429
1430    if !eligible {
1431        return None;
1432    }
1433
1434    let metadata = input.region_metadata();
1435    let tag_names: HashSet<&str> = metadata
1436        .column_metadatas
1437        .iter()
1438        .filter(|col| col.semantic_type == SemanticType::Tag)
1439        .map(|col| col.column_schema.name.as_str())
1440        .collect();
1441
1442    let time_index_name = metadata.time_index_column().column_schema.name.clone();
1443
1444    let exprs = input
1445        .predicate_group()
1446        .predicate_without_region()
1447        .map(|predicate| predicate.exprs())
1448        .unwrap_or_default();
1449
1450    let mut filters = Vec::new();
1451    let mut time_filters = Vec::new();
1452    let mut has_tag_filter = false;
1453    let mut columns = HashSet::new();
1454
1455    for expr in exprs {
1456        columns.clear();
1457        let is_time_only = match expr_to_columns(expr, &mut columns) {
1458            Ok(()) if !columns.is_empty() => {
1459                has_tag_filter |= columns
1460                    .iter()
1461                    .any(|col| tag_names.contains(col.name.as_str()));
1462                columns.iter().all(|col| col.name == time_index_name)
1463            }
1464            _ => false,
1465        };
1466
1467        if is_time_only {
1468            time_filters.push(expr.to_string());
1469        } else {
1470            filters.push(expr.to_string());
1471        }
1472    }
1473
1474    if !has_tag_filter {
1475        // We only cache requests that have tag filters to avoid caching all series.
1476        return None;
1477    }
1478
1479    // Ensure the filters are sorted for consistent fingerprinting.
1480    filters.sort_unstable();
1481    time_filters.sort_unstable();
1482
1483    Some(
1484        crate::read::range_cache::ScanRequestFingerprintBuilder {
1485            read_column_ids: input.read_column_ids.clone(),
1486            read_column_types: input
1487                .read_column_ids
1488                .iter()
1489                .map(|id| {
1490                    metadata
1491                        .column_by_id(*id)
1492                        .map(|col| col.column_schema.data_type.clone())
1493                })
1494                .collect(),
1495            filters,
1496            time_filters,
1497            series_row_selector: input.series_row_selector,
1498            append_mode: input.append_mode,
1499            filter_deleted: input.filter_deleted,
1500            merge_mode: input.merge_mode,
1501            partition_expr_version: metadata.partition_expr_version,
1502        }
1503        .build(),
1504    )
1505}
1506
1507/// Context shared by different streams from a scanner.
1508/// It contains the input and ranges to scan.
1509pub struct StreamContext {
1510    /// Input memtables and files.
1511    pub input: ScanInput,
1512    /// Metadata for partition ranges.
1513    pub(crate) ranges: Vec<RangeMeta>,
1514
1515    // Metrics:
1516    /// The start time of the query.
1517    pub(crate) query_start: Instant,
1518}
1519
1520impl StreamContext {
1521    /// Creates a new [StreamContext] for [SeqScan].
1522    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1523        let query_start = input.query_start.unwrap_or_else(Instant::now);
1524        let ranges = RangeMeta::seq_scan_ranges(&input);
1525        READ_SST_COUNT.observe(input.num_files() as f64);
1526
1527        Self {
1528            input,
1529            ranges,
1530            query_start,
1531        }
1532    }
1533
1534    /// Creates a new [StreamContext] for [UnorderedScan].
1535    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1536        let query_start = input.query_start.unwrap_or_else(Instant::now);
1537        let ranges = RangeMeta::unordered_scan_ranges(&input);
1538        READ_SST_COUNT.observe(input.num_files() as f64);
1539
1540        Self {
1541            input,
1542            ranges,
1543            query_start,
1544        }
1545    }
1546
1547    /// Returns true if the index refers to a memtable.
1548    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1549        self.input.num_memtables() > index.index
1550    }
1551
1552    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1553        !self.is_mem_range_index(index)
1554            && index.index < self.input.num_files() + self.input.num_memtables()
1555    }
1556
1557    /// Retrieves the partition ranges.
1558    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1559        self.ranges
1560            .iter()
1561            .enumerate()
1562            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1563            .collect()
1564    }
1565
1566    /// Format the context for explain.
1567    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1568        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1569        for range_meta in &self.ranges {
1570            for idx in &range_meta.row_group_indices {
1571                if self.is_mem_range_index(*idx) {
1572                    num_mem_ranges += 1;
1573                } else if self.is_file_range_index(*idx) {
1574                    num_file_ranges += 1;
1575                } else {
1576                    num_other_ranges += 1;
1577                }
1578            }
1579        }
1580        if verbose {
1581            write!(f, "{{")?;
1582        }
1583        write!(
1584            f,
1585            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1586            self.ranges.len(),
1587            num_mem_ranges,
1588            self.input.num_files(),
1589            num_file_ranges,
1590        )?;
1591        if num_other_ranges > 0 {
1592            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1593        }
1594        write!(f, "}}")?;
1595
1596        if let Some(selector) = &self.input.series_row_selector {
1597            write!(f, ", \"selector\":\"{}\"", selector)?;
1598        }
1599        if let Some(distribution) = &self.input.distribution {
1600            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1601        }
1602
1603        if verbose {
1604            self.format_verbose_content(f)?;
1605        }
1606
1607        Ok(())
1608    }
1609
1610    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1611        struct FileWrapper<'a> {
1612            file: &'a FileHandle,
1613        }
1614
1615        impl fmt::Debug for FileWrapper<'_> {
1616            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1617                let (start, end) = self.file.time_range();
1618                write!(
1619                    f,
1620                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1621                    self.file.file_id(),
1622                    start.value(),
1623                    start.unit(),
1624                    end.value(),
1625                    end.unit(),
1626                    self.file.num_rows(),
1627                    self.file.size(),
1628                    self.file.index_size()
1629                )
1630            }
1631        }
1632
1633        struct InputWrapper<'a> {
1634            input: &'a ScanInput,
1635        }
1636
1637        #[cfg(feature = "enterprise")]
1638        impl InputWrapper<'_> {
1639            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1640                if self.input.extension_ranges.is_empty() {
1641                    return Ok(());
1642                }
1643
1644                let mut delimiter = "";
1645                write!(f, ", extension_ranges: [")?;
1646                for range in self.input.extension_ranges() {
1647                    write!(f, "{}{:?}", delimiter, range)?;
1648                    delimiter = ", ";
1649                }
1650                write!(f, "]")?;
1651                Ok(())
1652            }
1653        }
1654
1655        impl fmt::Debug for InputWrapper<'_> {
1656            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1657                let output_schema = self.input.mapper.output_schema();
1658                if !output_schema.is_empty() {
1659                    let names: Vec<_> = output_schema
1660                        .column_schemas()
1661                        .iter()
1662                        .map(|col| &col.name)
1663                        .collect();
1664                    write!(f, ", \"projection\": {:?}", names)?;
1665                }
1666                if let Some(predicate) = &self.input.predicate.predicate() {
1667                    if !predicate.exprs().is_empty() {
1668                        let exprs: Vec<_> =
1669                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1670                        write!(f, ", \"filters\": {:?}", exprs)?;
1671                    }
1672                    if !predicate.dyn_filters().is_empty() {
1673                        let dyn_filters: Vec<_> = predicate
1674                            .dyn_filters()
1675                            .iter()
1676                            .map(|f| format!("{}", f))
1677                            .collect();
1678                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1679                    }
1680                }
1681                #[cfg(feature = "vector_index")]
1682                if let Some(vector_index_k) = self.input.vector_index_k {
1683                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1684                }
1685                if !self.input.files.is_empty() {
1686                    write!(f, ", \"files\": ")?;
1687                    f.debug_list()
1688                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1689                        .finish()?;
1690                }
1691                write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1692
1693                #[cfg(feature = "enterprise")]
1694                self.format_extension_ranges(f)?;
1695
1696                Ok(())
1697            }
1698        }
1699
1700        write!(f, "{:?}", InputWrapper { input: &self.input })
1701    }
1702
1703    /// Add new dynamic filters to the predicates.
1704    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1705    pub(crate) fn add_dyn_filter_to_predicate(
1706        self: &Arc<Self>,
1707        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1708    ) -> Vec<bool> {
1709        let mut supported = Vec::with_capacity(filter_exprs.len());
1710        let filter_expr = filter_exprs
1711            .into_iter()
1712            .filter_map(|expr| {
1713                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1714                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1715            {
1716                supported.push(true);
1717                Some(dyn_filter)
1718            } else {
1719                supported.push(false);
1720                None
1721            }
1722            })
1723            .collect();
1724        self.input.predicate.add_dyn_filters(filter_expr);
1725        supported
1726    }
1727}
1728
1729/// Predicates to evaluate.
1730/// It only keeps filters that [SimpleFilterEvaluator] supports.
1731#[derive(Clone, Default)]
1732pub struct PredicateGroup {
1733    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1734    /// Predicate that includes request filters and region partition expr (if any).
1735    predicate_all: Predicate,
1736    /// Predicate that only includes request filters.
1737    predicate_without_region: Predicate,
1738    /// Region partition expression restored from metadata.
1739    region_partition_expr: Option<PartitionExpr>,
1740}
1741
1742impl PredicateGroup {
1743    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1744    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1745        let mut combined_exprs = exprs.to_vec();
1746        let mut region_partition_expr = None;
1747
1748        if let Some(expr_json) = metadata.partition_expr.as_ref()
1749            && !expr_json.is_empty()
1750            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1751                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1752        {
1753            let logical_expr = expr
1754                .try_as_logical_expr()
1755                .context(InvalidPartitionExprSnafu {
1756                    expr: expr_json.clone(),
1757                })?;
1758
1759            combined_exprs.push(logical_expr);
1760            region_partition_expr = Some(expr);
1761        }
1762
1763        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1764        // Columns in the expr.
1765        let mut columns = HashSet::new();
1766        for expr in &combined_exprs {
1767            columns.clear();
1768            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1769                continue;
1770            };
1771            time_filters.push(filter);
1772        }
1773        let time_filters = if time_filters.is_empty() {
1774            None
1775        } else {
1776            Some(Arc::new(time_filters))
1777        };
1778
1779        let predicate_all = Predicate::new(combined_exprs);
1780        let predicate_without_region = Predicate::new(exprs.to_vec());
1781
1782        Ok(Self {
1783            time_filters,
1784            predicate_all,
1785            predicate_without_region,
1786            region_partition_expr,
1787        })
1788    }
1789
1790    /// Returns time filters.
1791    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1792        self.time_filters.clone()
1793    }
1794
1795    /// Returns predicate of all exprs (including region partition expr if present).
1796    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1797        if self.predicate_all.is_empty() {
1798            None
1799        } else {
1800            Some(&self.predicate_all)
1801        }
1802    }
1803
1804    /// Returns predicate that excludes region partition expr.
1805    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1806        if self.predicate_without_region.is_empty() {
1807            None
1808        } else {
1809            Some(&self.predicate_without_region)
1810        }
1811    }
1812
1813    /// Add dynamic filters in the predicates.
1814    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1815        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1816        self.predicate_without_region.add_dyn_filters(dyn_filters);
1817    }
1818
1819    /// Returns the region partition expr from metadata, if any.
1820    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1821        self.region_partition_expr.as_ref()
1822    }
1823
1824    fn expr_to_filter(
1825        expr: &Expr,
1826        metadata: &RegionMetadata,
1827        columns: &mut HashSet<Column>,
1828    ) -> Option<SimpleFilterEvaluator> {
1829        columns.clear();
1830        // `expr_to_columns` won't return error.
1831        // We still ignore these expressions for safety.
1832        expr_to_columns(expr, columns).ok()?;
1833        if columns.len() > 1 {
1834            // Simple filter doesn't support multiple columns.
1835            return None;
1836        }
1837        let column = columns.iter().next()?;
1838        let column_meta = metadata.column_by_name(&column.name)?;
1839        if column_meta.semantic_type == SemanticType::Timestamp {
1840            SimpleFilterEvaluator::try_new(expr)
1841        } else {
1842            None
1843        }
1844    }
1845}
1846
1847#[cfg(test)]
1848mod tests {
1849    use std::sync::Arc;
1850
1851    use datafusion::physical_plan::expressions::lit as physical_lit;
1852    use datafusion_expr::{col, lit};
1853    use datatypes::value::Value;
1854    use partition::expr::col as partition_col;
1855    use store_api::metadata::RegionMetadataBuilder;
1856    use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
1857
1858    use super::*;
1859    use crate::cache::CacheManager;
1860    use crate::memtable::time_partition::TimePartitions;
1861    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1862    use crate::region::options::RegionOptions;
1863    use crate::region::version::VersionBuilder;
1864    use crate::sst::FormatType;
1865    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1866    use crate::test_util::scheduler_util::SchedulerEnv;
1867
1868    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1869        let mutable = Arc::new(TimePartitions::new(
1870            metadata.clone(),
1871            Arc::new(EmptyMemtableBuilder::default()),
1872            0,
1873            None,
1874        ));
1875        Arc::new(VersionBuilder::new(metadata, mutable).build())
1876    }
1877
1878    fn new_version_with_sst_format(
1879        metadata: RegionMetadataRef,
1880        sst_format: Option<FormatType>,
1881    ) -> VersionRef {
1882        let mutable = Arc::new(TimePartitions::new(
1883            metadata.clone(),
1884            Arc::new(EmptyMemtableBuilder::default()),
1885            0,
1886            None,
1887        ));
1888        let options = RegionOptions {
1889            sst_format,
1890            ..Default::default()
1891        };
1892        Arc::new(
1893            VersionBuilder::new(metadata, mutable)
1894                .options(options)
1895                .build(),
1896        )
1897    }
1898
1899    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1900        let env = SchedulerEnv::new().await;
1901        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
1902        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1903        let file = FileHandle::new(
1904            crate::sst::file::FileMeta::default(),
1905            Arc::new(crate::sst::file_purger::NoopFilePurger),
1906        );
1907
1908        ScanInput::new(env.access_layer.clone(), mapper)
1909            .with_predicate(predicate)
1910            .with_cache(CacheStrategy::EnableAll(Arc::new(
1911                CacheManager::builder()
1912                    .range_result_cache_size(1024)
1913                    .build(),
1914            )))
1915            .with_flat_format(true)
1916            .with_files(vec![file])
1917    }
1918
1919    #[tokio::test]
1920    async fn test_build_read_column_ids_includes_filters() {
1921        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1922        let version = new_version(metadata.clone());
1923        let env = SchedulerEnv::new().await;
1924        let request = ScanRequest {
1925            projection: Some(vec![4]),
1926            filters: vec![
1927                col("v0").gt(lit(1)),
1928                col("ts").gt(lit(0)),
1929                col("k0").eq(lit("foo")),
1930            ],
1931            ..Default::default()
1932        };
1933        let scan_region = ScanRegion::new(
1934            version,
1935            env.access_layer.clone(),
1936            request,
1937            CacheStrategy::Disabled,
1938        );
1939        let predicate =
1940            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1941        let projection = scan_region.request.projection.as_ref().unwrap();
1942        let read_ids = scan_region
1943            .build_read_column_ids(projection, &predicate)
1944            .unwrap();
1945        assert_eq!(vec![4, 0, 2, 3], read_ids);
1946    }
1947
1948    #[tokio::test]
1949    async fn test_build_read_column_ids_empty_projection() {
1950        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1951        let version = new_version(metadata.clone());
1952        let env = SchedulerEnv::new().await;
1953        let request = ScanRequest {
1954            projection: Some(vec![]),
1955            ..Default::default()
1956        };
1957        let scan_region = ScanRegion::new(
1958            version,
1959            env.access_layer.clone(),
1960            request,
1961            CacheStrategy::Disabled,
1962        );
1963        let predicate =
1964            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1965        let projection = scan_region.request.projection.as_ref().unwrap();
1966        let read_ids = scan_region
1967            .build_read_column_ids(projection, &predicate)
1968            .unwrap();
1969        // Empty projection should still read the time index column (id 2 in this test schema).
1970        assert_eq!(vec![2], read_ids);
1971    }
1972
1973    #[tokio::test]
1974    async fn test_build_read_column_ids_keeps_projection_order() {
1975        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1976        let version = new_version(metadata.clone());
1977        let env = SchedulerEnv::new().await;
1978        let request = ScanRequest {
1979            projection: Some(vec![4, 1]),
1980            filters: vec![col("v0").gt(lit(1))],
1981            ..Default::default()
1982        };
1983        let scan_region = ScanRegion::new(
1984            version,
1985            env.access_layer.clone(),
1986            request,
1987            CacheStrategy::Disabled,
1988        );
1989        let predicate =
1990            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1991        let projection = scan_region.request.projection.as_ref().unwrap();
1992        let read_ids = scan_region
1993            .build_read_column_ids(projection, &predicate)
1994            .unwrap();
1995        // Projection order preserved, extra columns appended in schema order.
1996        assert_eq!(vec![4, 1, 3], read_ids);
1997    }
1998
1999    #[tokio::test]
2000    async fn test_use_flat_format_honors_request_override() {
2001        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2002        let env = SchedulerEnv::new().await;
2003
2004        let primary_key_version =
2005            new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
2006        let request = ScanRequest::default();
2007        let scan_region = ScanRegion::new(
2008            primary_key_version.clone(),
2009            env.access_layer.clone(),
2010            request,
2011            CacheStrategy::Disabled,
2012        );
2013        assert!(!scan_region.use_flat_format());
2014
2015        let request = ScanRequest {
2016            force_flat_format: true,
2017            ..Default::default()
2018        };
2019        let scan_region = ScanRegion::new(
2020            primary_key_version,
2021            env.access_layer.clone(),
2022            request,
2023            CacheStrategy::Disabled,
2024        );
2025        assert!(scan_region.use_flat_format());
2026
2027        let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
2028        let request = ScanRequest::default();
2029        let scan_region = ScanRegion::new(
2030            flat_version,
2031            env.access_layer.clone(),
2032            request,
2033            CacheStrategy::Disabled,
2034        );
2035        assert!(scan_region.use_flat_format());
2036    }
2037
2038    #[tokio::test]
2039    async fn test_build_scan_fingerprint_for_eligible_scan() {
2040        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2041        let input = new_scan_input(
2042            metadata.clone(),
2043            vec![
2044                col("ts").gt_eq(lit(1000)),
2045                col("k0").eq(lit("foo")),
2046                col("v0").gt(lit(1)),
2047            ],
2048        )
2049        .await
2050        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2051        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2052        .with_merge_mode(MergeMode::LastNonNull)
2053        .with_filter_deleted(false);
2054
2055        let fingerprint = build_scan_fingerprint(&input).unwrap();
2056
2057        let expected = ScanRequestFingerprintBuilder {
2058            read_column_ids: input.read_column_ids.clone(),
2059            read_column_types: vec![
2060                metadata
2061                    .column_by_id(0)
2062                    .map(|col| col.column_schema.data_type.clone()),
2063                metadata
2064                    .column_by_id(2)
2065                    .map(|col| col.column_schema.data_type.clone()),
2066                metadata
2067                    .column_by_id(3)
2068                    .map(|col| col.column_schema.data_type.clone()),
2069            ],
2070            filters: vec![
2071                col("k0").eq(lit("foo")).to_string(),
2072                col("v0").gt(lit(1)).to_string(),
2073            ],
2074            time_filters: vec![col("ts").gt_eq(lit(1000)).to_string()],
2075            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2076            append_mode: false,
2077            filter_deleted: false,
2078            merge_mode: MergeMode::LastNonNull,
2079            partition_expr_version: 0,
2080        }
2081        .build();
2082        assert_eq!(expected, fingerprint);
2083    }
2084
2085    #[tokio::test]
2086    async fn test_build_scan_fingerprint_requires_tag_filter() {
2087        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2088        let input = new_scan_input(
2089            metadata,
2090            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2091        )
2092        .await;
2093
2094        assert!(build_scan_fingerprint(&input).is_none());
2095    }
2096
2097    #[tokio::test]
2098    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2099        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2100        let filters = vec![col("k0").eq(lit("foo"))];
2101
2102        let disabled = ScanInput::new(
2103            SchedulerEnv::new().await.access_layer.clone(),
2104            ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
2105        )
2106        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
2107        .with_flat_format(true);
2108        assert!(build_scan_fingerprint(&disabled).is_none());
2109
2110        let non_flat = new_scan_input(metadata.clone(), filters.clone())
2111            .await
2112            .with_flat_format(false);
2113        assert!(build_scan_fingerprint(&non_flat).is_none());
2114
2115        let compaction = new_scan_input(metadata.clone(), filters.clone())
2116            .await
2117            .with_compaction(true);
2118        assert!(build_scan_fingerprint(&compaction).is_none());
2119
2120        // No files to read.
2121        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2122        assert!(build_scan_fingerprint(&no_files).is_none());
2123    }
2124
2125    #[tokio::test]
2126    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2127        let base = metadata_with_primary_key(vec![0, 1], false);
2128        let mut builder = RegionMetadataBuilder::from_existing(base);
2129        let partition_expr = partition_col("k0")
2130            .gt_eq(Value::String("foo".into()))
2131            .as_json_str()
2132            .unwrap();
2133        builder.partition_expr_json(Some(partition_expr));
2134        let metadata = Arc::new(builder.build_without_validation().unwrap());
2135
2136        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2137        let fingerprint = build_scan_fingerprint(&input).unwrap();
2138
2139        let expected = ScanRequestFingerprintBuilder {
2140            read_column_ids: input.read_column_ids.clone(),
2141            read_column_types: vec![
2142                metadata
2143                    .column_by_id(0)
2144                    .map(|col| col.column_schema.data_type.clone()),
2145                metadata
2146                    .column_by_id(2)
2147                    .map(|col| col.column_schema.data_type.clone()),
2148                metadata
2149                    .column_by_id(3)
2150                    .map(|col| col.column_schema.data_type.clone()),
2151            ],
2152            filters: vec![col("k0").eq(lit("foo")).to_string()],
2153            time_filters: vec![],
2154            series_row_selector: None,
2155            append_mode: false,
2156            filter_deleted: true,
2157            merge_mode: MergeMode::LastRow,
2158            partition_expr_version: metadata.partition_expr_version,
2159        }
2160        .build();
2161        assert_eq!(expected, fingerprint);
2162        assert_ne!(0, metadata.partition_expr_version);
2163    }
2164
2165    #[test]
2166    fn test_update_dyn_filters_with_empty_base_predicates() {
2167        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2168        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2169        assert!(predicate_group.predicate().is_none());
2170        assert!(predicate_group.predicate_without_region().is_none());
2171
2172        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2173        predicate_group.add_dyn_filters(vec![dyn_filter]);
2174
2175        let predicate_all = predicate_group.predicate().unwrap();
2176        assert!(predicate_all.exprs().is_empty());
2177        assert_eq!(1, predicate_all.dyn_filters().len());
2178
2179        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2180        assert!(predicate_without_region.exprs().is_empty());
2181        assert_eq!(1, predicate_without_region.dyn_filters().len());
2182    }
2183}