Skip to main content

mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41    ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
42};
43use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
44use tokio::sync::{Semaphore, mpsc};
45use tokio_stream::wrappers::ReceiverStream;
46
47use crate::access_layer::AccessLayerRef;
48use crate::cache::CacheStrategy;
49use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
50use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
51#[cfg(feature = "enterprise")]
52use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
53use crate::memtable::{MemtableRange, RangesOptions};
54use crate::metrics::READ_SST_COUNT;
55use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
56use crate::read::projection::ProjectionMapper;
57use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
58use crate::read::range_cache::ScanRequestFingerprint;
59use crate::read::seq_scan::SeqScan;
60use crate::read::series_scan::SeriesScan;
61use crate::read::stream::ScanBatchStream;
62use crate::read::unordered_scan::UnorderedScan;
63use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
64use crate::region::options::MergeMode;
65use crate::region::version::VersionRef;
66use crate::sst::FormatType;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
70};
71use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
72use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
73use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
74use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
75#[cfg(feature = "vector_index")]
76use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
77use crate::sst::parquet::file_range::PreFilterMode;
78use crate::sst::parquet::reader::ReaderMetrics;
79
80/// Parallel scan channel size for flat format.
81const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
82#[cfg(feature = "vector_index")]
83const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
84
85/// A scanner scans a region and returns a [SendableRecordBatchStream].
86pub(crate) enum Scanner {
87    /// Sequential scan.
88    Seq(SeqScan),
89    /// Unordered scan.
90    Unordered(UnorderedScan),
91    /// Per-series scan.
92    Series(SeriesScan),
93}
94
95impl Scanner {
96    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
97    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
98    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
99        match self {
100            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
101            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
102            Scanner::Series(series_scan) => series_scan.build_stream().await,
103        }
104    }
105
106    /// Create a stream of [`Batch`] by this scanner.
107    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
108        match self {
109            Scanner::Seq(x) => x.scan_all_partitions(),
110            Scanner::Unordered(x) => x.scan_all_partitions(),
111            Scanner::Series(x) => x.scan_all_partitions(),
112        }
113    }
114}
115
116#[cfg(test)]
117impl Scanner {
118    /// Returns number of files to scan.
119    pub(crate) fn num_files(&self) -> usize {
120        match self {
121            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
122            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
123            Scanner::Series(series_scan) => series_scan.input().num_files(),
124        }
125    }
126
127    /// Returns number of memtables to scan.
128    pub(crate) fn num_memtables(&self) -> usize {
129        match self {
130            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
131            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
132            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
133        }
134    }
135
136    /// Returns SST file ids to scan.
137    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
138        match self {
139            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
140            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
141            Scanner::Series(series_scan) => series_scan.input().file_ids(),
142        }
143    }
144
145    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
146        match self {
147            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
148            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
149            Scanner::Series(series_scan) => series_scan.input().index_ids(),
150        }
151    }
152
153    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
154    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
155        use store_api::region_engine::{PrepareRequest, RegionScanner};
156
157        let request = PrepareRequest::default().with_target_partitions(target_partitions);
158        match self {
159            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
160            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
161            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
162        }
163    }
164}
165
166#[cfg_attr(doc, aquamarine::aquamarine)]
167/// Helper to scans a region by [ScanRequest].
168///
169/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
170/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
171///
172/// ```mermaid
173/// classDiagram
174/// class ScanRegion {
175///     -VersionRef version
176///     -ScanRequest request
177///     ~scanner() Scanner
178///     ~seq_scan() SeqScan
179/// }
180/// class Scanner {
181///     <<enumeration>>
182///     SeqScan
183///     UnorderedScan
184///     +scan() SendableRecordBatchStream
185/// }
186/// class SeqScan {
187///     -ScanInput input
188///     +build() SendableRecordBatchStream
189/// }
190/// class UnorderedScan {
191///     -ScanInput input
192///     +build() SendableRecordBatchStream
193/// }
194/// class ScanInput {
195///     -ProjectionMapper mapper
196///     -Option~TimeRange~ time_range
197///     -Option~Predicate~ predicate
198///     -Vec~MemtableRef~ memtables
199///     -Vec~FileHandle~ files
200/// }
201/// class ProjectionMapper {
202///     ~output_schema() SchemaRef
203///     ~convert(Batch) RecordBatch
204/// }
205/// ScanRegion -- Scanner
206/// ScanRegion o-- ScanRequest
207/// Scanner o-- SeqScan
208/// Scanner o-- UnorderedScan
209/// SeqScan o-- ScanInput
210/// UnorderedScan o-- ScanInput
211/// Scanner -- SendableRecordBatchStream
212/// ScanInput o-- ProjectionMapper
213/// SeqScan -- SendableRecordBatchStream
214/// UnorderedScan -- SendableRecordBatchStream
215/// ```
216pub(crate) struct ScanRegion {
217    /// Version of the region at scan.
218    version: VersionRef,
219    /// Access layer of the region.
220    access_layer: AccessLayerRef,
221    /// Scan request.
222    request: ScanRequest,
223    /// Cache.
224    cache_strategy: CacheStrategy,
225    /// Capacity of the channel to send data from parallel scan tasks to the main task.
226    parallel_scan_channel_size: usize,
227    /// Maximum number of SST files to scan concurrently.
228    max_concurrent_scan_files: usize,
229    /// Whether to ignore inverted index.
230    ignore_inverted_index: bool,
231    /// Whether to ignore fulltext index.
232    ignore_fulltext_index: bool,
233    /// Whether to ignore bloom filter.
234    ignore_bloom_filter: bool,
235    /// Start time of the scan task.
236    start_time: Option<Instant>,
237    /// Whether to filter out the deleted rows.
238    /// Usually true for normal read, and false for scan for compaction.
239    filter_deleted: bool,
240    #[cfg(feature = "enterprise")]
241    extension_range_provider: Option<BoxedExtensionRangeProvider>,
242}
243
244impl ScanRegion {
245    /// Creates a [ScanRegion].
246    pub(crate) fn new(
247        version: VersionRef,
248        access_layer: AccessLayerRef,
249        request: ScanRequest,
250        cache_strategy: CacheStrategy,
251    ) -> ScanRegion {
252        ScanRegion {
253            version,
254            access_layer,
255            request,
256            cache_strategy,
257            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
258            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
259            ignore_inverted_index: false,
260            ignore_fulltext_index: false,
261            ignore_bloom_filter: false,
262            start_time: None,
263            filter_deleted: true,
264            #[cfg(feature = "enterprise")]
265            extension_range_provider: None,
266        }
267    }
268
269    /// Sets parallel scan task channel size.
270    #[must_use]
271    pub(crate) fn with_parallel_scan_channel_size(
272        mut self,
273        parallel_scan_channel_size: usize,
274    ) -> Self {
275        self.parallel_scan_channel_size = parallel_scan_channel_size;
276        self
277    }
278
279    /// Sets maximum number of SST files to scan concurrently.
280    #[must_use]
281    pub(crate) fn with_max_concurrent_scan_files(
282        mut self,
283        max_concurrent_scan_files: usize,
284    ) -> Self {
285        self.max_concurrent_scan_files = max_concurrent_scan_files;
286        self
287    }
288
289    /// Sets whether to ignore inverted index.
290    #[must_use]
291    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292        self.ignore_inverted_index = ignore;
293        self
294    }
295
296    /// Sets whether to ignore fulltext index.
297    #[must_use]
298    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299        self.ignore_fulltext_index = ignore;
300        self
301    }
302
303    /// Sets whether to ignore bloom filter.
304    #[must_use]
305    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306        self.ignore_bloom_filter = ignore;
307        self
308    }
309
310    #[must_use]
311    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312        self.start_time = Some(now);
313        self
314    }
315
316    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317        self.filter_deleted = filter_deleted;
318    }
319
320    #[cfg(feature = "enterprise")]
321    pub(crate) fn set_extension_range_provider(
322        &mut self,
323        extension_range_provider: BoxedExtensionRangeProvider,
324    ) {
325        self.extension_range_provider = Some(extension_range_provider);
326    }
327
328    /// Returns a [Scanner] to scan the region.
329    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330    pub(crate) async fn scanner(self) -> Result<Scanner> {
331        if self.use_series_scan() {
332            self.series_scan().await.map(Scanner::Series)
333        } else if self.use_unordered_scan() {
334            // If table is append only and there is no series row selector, we use unordered scan in query.
335            // We still use seq scan in compaction.
336            self.unordered_scan().await.map(Scanner::Unordered)
337        } else {
338            self.seq_scan().await.map(Scanner::Seq)
339        }
340    }
341
342    /// Returns a [RegionScanner] to scan the region.
343    #[tracing::instrument(
344        level = tracing::Level::DEBUG,
345        skip_all,
346        fields(region_id = %self.region_id())
347    )]
348    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349        if self.use_series_scan() {
350            self.series_scan()
351                .await
352                .map(|scanner| Box::new(scanner) as _)
353        } else if self.use_unordered_scan() {
354            self.unordered_scan()
355                .await
356                .map(|scanner| Box::new(scanner) as _)
357        } else {
358            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359        }
360    }
361
362    /// Scan sequentially.
363    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365        let input = self.scan_input().await?.with_compaction(false);
366        Ok(SeqScan::new(input))
367    }
368
369    /// Unordered scan.
370    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372        let input = self.scan_input().await?;
373        Ok(UnorderedScan::new(input))
374    }
375
376    /// Scans by series.
377    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379        let input = self.scan_input().await?;
380        Ok(SeriesScan::new(input))
381    }
382
383    /// Returns true if the region can use unordered scan for current request.
384    fn use_unordered_scan(&self) -> bool {
385        // We use unordered scan when:
386        // 1. The region is in append mode.
387        // 2. There is no series row selector.
388        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
389        //
390        // We still use seq scan in compaction.
391        self.version.options.append_mode
392            && self.request.series_row_selector.is_none()
393            && (self.request.distribution.is_none()
394                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395    }
396
397    /// Returns true if the region can use series scan for current request.
398    fn use_series_scan(&self) -> bool {
399        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400    }
401
402    /// Returns true if the region use flat format.
403    fn use_flat_format(&self) -> bool {
404        self.request.force_flat_format
405            || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
406    }
407
408    /// Creates a scan input.
409    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
410    async fn scan_input(mut self) -> Result<ScanInput> {
411        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
412        let time_range = self.build_time_range_predicate();
413        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
414        let flat_format = self.use_flat_format();
415
416        let read_column_ids = match &self.request.projection {
417            Some(p) => self.build_read_column_ids(p, &predicate)?,
418            None => self
419                .version
420                .metadata
421                .column_metadatas
422                .iter()
423                .map(|col| col.column_id)
424                .collect(),
425        };
426
427        // The mapper always computes projected column ids as the schema of SSTs may change.
428        let mapper = match &self.request.projection {
429            Some(p) => ProjectionMapper::new_with_read_columns(
430                &self.version.metadata,
431                p.iter().copied(),
432                flat_format,
433                read_column_ids.clone(),
434            )?,
435            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
436        };
437
438        let ssts = &self.version.ssts;
439        let mut files = Vec::new();
440        for level in ssts.levels() {
441            for file in level.files.values() {
442                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
443                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
444                    // If the file's sequence is None (or actually is zero), it could mean the file
445                    // is generated and added to the region "directly". In this case, its data should
446                    // be considered as fresh as the memtable. So its sequence is treated greater than
447                    // the min_sequence, whatever the value of min_sequence is. Hence the default
448                    // "true" in this arm.
449                    (Some(_), None) => true,
450                    (None, _) => true,
451                };
452
453                // Finds SST files in range.
454                if exceed_min_sequence && file_in_range(file, &time_range) {
455                    files.push(file.clone());
456                }
457                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
458                // unless the timing is too good, or the sequence number wouldn't be in file.
459                // and the batch will be filtered out by tree reader anyway.
460            }
461        }
462
463        let memtables = self.version.memtables.list_memtables();
464        // Skip empty memtables and memtables out of time range.
465        let mut mem_range_builders = Vec::new();
466        let filter_mode = pre_filter_mode(
467            self.version.options.append_mode,
468            self.version.options.merge_mode(),
469        );
470
471        for m in memtables {
472            // check if memtable is empty by reading stats.
473            let Some((start, end)) = m.stats().time_range() else {
474                continue;
475            };
476            // The time range of the memtable is inclusive.
477            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
478            if !memtable_range.intersects(&time_range) {
479                continue;
480            }
481            let ranges_in_memtable = m.ranges(
482                Some(read_column_ids.as_slice()),
483                RangesOptions::default()
484                    .with_predicate(predicate.clone())
485                    .with_sequence(SequenceRange::new(
486                        self.request.memtable_min_sequence,
487                        self.request.memtable_max_sequence,
488                    ))
489                    .with_pre_filter_mode(filter_mode),
490            )?;
491            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
492                let stats = v.stats().clone();
493                MemRangeBuilder::new(v, stats)
494            }));
495        }
496
497        let region_id = self.region_id();
498        debug!(
499            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
500            region_id,
501            self.request,
502            time_range,
503            mem_range_builders.len(),
504            files.len(),
505            self.version.options.append_mode,
506            flat_format,
507        );
508
509        let (non_field_filters, field_filters) = self.partition_by_field_filters();
510        let inverted_index_appliers = [
511            self.build_invereted_index_applier(&non_field_filters),
512            self.build_invereted_index_applier(&field_filters),
513        ];
514        let bloom_filter_appliers = [
515            self.build_bloom_filter_applier(&non_field_filters),
516            self.build_bloom_filter_applier(&field_filters),
517        ];
518        let fulltext_index_appliers = [
519            self.build_fulltext_index_applier(&non_field_filters),
520            self.build_fulltext_index_applier(&field_filters),
521        ];
522        #[cfg(feature = "vector_index")]
523        let vector_index_applier = self.build_vector_index_applier();
524        #[cfg(feature = "vector_index")]
525        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
526            if self.request.filters.is_empty() {
527                search.k
528            } else {
529                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
530            }
531        });
532
533        if flat_format {
534            // The batch is already large enough so we use a small channel size here.
535            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
536        }
537
538        let input = ScanInput::new(self.access_layer, mapper)
539            .with_time_range(Some(time_range))
540            .with_predicate(predicate)
541            .with_memtables(mem_range_builders)
542            .with_files(files)
543            .with_cache(self.cache_strategy)
544            .with_inverted_index_appliers(inverted_index_appliers)
545            .with_bloom_filter_index_appliers(bloom_filter_appliers)
546            .with_fulltext_index_appliers(fulltext_index_appliers)
547            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
548            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
549            .with_start_time(self.start_time)
550            .with_append_mode(self.version.options.append_mode)
551            .with_filter_deleted(self.filter_deleted)
552            .with_merge_mode(self.version.options.merge_mode())
553            .with_series_row_selector(self.request.series_row_selector)
554            .with_distribution(self.request.distribution)
555            .with_flat_format(flat_format);
556        #[cfg(feature = "vector_index")]
557        let input = input
558            .with_vector_index_applier(vector_index_applier)
559            .with_vector_index_k(vector_index_k);
560
561        #[cfg(feature = "enterprise")]
562        let input = if let Some(provider) = self.extension_range_provider {
563            let ranges = provider
564                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
565                .await?;
566            debug!("Find extension ranges: {ranges:?}");
567            input.with_extension_ranges(ranges)
568        } else {
569            input
570        };
571        Ok(input)
572    }
573
574    fn region_id(&self) -> RegionId {
575        self.version.metadata.region_id
576    }
577
578    /// Build time range predicate from filters.
579    fn build_time_range_predicate(&self) -> TimestampRange {
580        let time_index = self.version.metadata.time_index_column();
581        let unit = time_index
582            .column_schema
583            .data_type
584            .as_timestamp()
585            .expect("Time index must have timestamp-compatible type")
586            .unit();
587        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
588    }
589
590    /// Return all columns id to read according to the projection and filters.
591    fn build_read_column_ids(
592        &self,
593        projection: &[usize],
594        predicate: &PredicateGroup,
595    ) -> Result<Vec<ColumnId>> {
596        let metadata = &self.version.metadata;
597        // use Vec for read_column_ids to keep the order of columns.
598        let mut read_column_ids = Vec::new();
599        let mut seen = HashSet::new();
600
601        for idx in projection {
602            let column =
603                metadata
604                    .column_metadatas
605                    .get(*idx)
606                    .with_context(|| InvalidRequestSnafu {
607                        region_id: metadata.region_id,
608                        reason: format!("projection index {} is out of bound", idx),
609                    })?;
610            seen.insert(column.column_id);
611            // keep the projection order
612            read_column_ids.push(column.column_id);
613        }
614
615        if projection.is_empty() {
616            let time_index = metadata.time_index_column().column_id;
617            if seen.insert(time_index) {
618                read_column_ids.push(time_index);
619            }
620        }
621
622        let mut extra_names = HashSet::new();
623        let mut columns = HashSet::new();
624
625        for expr in &self.request.filters {
626            columns.clear();
627            if expr_to_columns(expr, &mut columns).is_err() {
628                continue;
629            }
630            extra_names.extend(columns.iter().map(|column| column.name.clone()));
631        }
632
633        if let Some(expr) = predicate.region_partition_expr() {
634            expr.collect_column_names(&mut extra_names);
635        }
636
637        if !extra_names.is_empty() {
638            for column in &metadata.column_metadatas {
639                if extra_names.contains(column.column_schema.name.as_str())
640                    && !seen.contains(&column.column_id)
641                {
642                    read_column_ids.push(column.column_id);
643                }
644                extra_names.remove(column.column_schema.name.as_str());
645            }
646            if !extra_names.is_empty() {
647                warn!(
648                    "Some columns in filters are not found in region {}: {:?}",
649                    metadata.region_id, extra_names
650                );
651            }
652        }
653        Ok(read_column_ids)
654    }
655
656    /// Partitions filters into two groups: non-field filters and field filters.
657    /// Returns `(non_field_filters, field_filters)`.
658    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
659        let field_columns = self
660            .version
661            .metadata
662            .field_columns()
663            .map(|col| &col.column_schema.name)
664            .collect::<HashSet<_>>();
665
666        let mut columns = HashSet::new();
667
668        self.request.filters.iter().cloned().partition(|expr| {
669            columns.clear();
670            // `expr_to_columns` won't return error.
671            if expr_to_columns(expr, &mut columns).is_err() {
672                // If we can't extract columns, treat it as non-field filter
673                return true;
674            }
675            // Return true for non-field filters (partition puts true cases in first vec)
676            !columns
677                .iter()
678                .any(|column| field_columns.contains(&column.name))
679        })
680    }
681
682    /// Use the latest schema to build the inverted index applier.
683    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
684        if self.ignore_inverted_index {
685            return None;
686        }
687
688        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
689        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
690
691        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692
693        InvertedIndexApplierBuilder::new(
694            self.access_layer.table_dir().to_string(),
695            self.access_layer.path_type(),
696            self.access_layer.object_store().clone(),
697            self.version.metadata.as_ref(),
698            self.version.metadata.inverted_indexed_column_ids(
699                self.version
700                    .options
701                    .index_options
702                    .inverted_index
703                    .ignore_column_ids
704                    .iter(),
705            ),
706            self.access_layer.puffin_manager_factory().clone(),
707        )
708        .with_file_cache(file_cache)
709        .with_inverted_index_cache(inverted_index_cache)
710        .with_puffin_metadata_cache(puffin_metadata_cache)
711        .build(filters)
712        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
713        .ok()
714        .flatten()
715        .map(Arc::new)
716    }
717
718    /// Use the latest schema to build the bloom filter index applier.
719    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
720        if self.ignore_bloom_filter {
721            return None;
722        }
723
724        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
725        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
726        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
727
728        BloomFilterIndexApplierBuilder::new(
729            self.access_layer.table_dir().to_string(),
730            self.access_layer.path_type(),
731            self.access_layer.object_store().clone(),
732            self.version.metadata.as_ref(),
733            self.access_layer.puffin_manager_factory().clone(),
734        )
735        .with_file_cache(file_cache)
736        .with_bloom_filter_index_cache(bloom_filter_index_cache)
737        .with_puffin_metadata_cache(puffin_metadata_cache)
738        .build(filters)
739        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
740        .ok()
741        .flatten()
742        .map(Arc::new)
743    }
744
745    /// Use the latest schema to build the fulltext index applier.
746    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
747        if self.ignore_fulltext_index {
748            return None;
749        }
750
751        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
752        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
753        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
754        FulltextIndexApplierBuilder::new(
755            self.access_layer.table_dir().to_string(),
756            self.access_layer.path_type(),
757            self.access_layer.object_store().clone(),
758            self.access_layer.puffin_manager_factory().clone(),
759            self.version.metadata.as_ref(),
760        )
761        .with_file_cache(file_cache)
762        .with_puffin_metadata_cache(puffin_metadata_cache)
763        .with_bloom_filter_cache(bloom_filter_index_cache)
764        .build(filters)
765        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
766        .ok()
767        .flatten()
768        .map(Arc::new)
769    }
770
771    /// Build the vector index applier from vector search request.
772    #[cfg(feature = "vector_index")]
773    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
774        let vector_search = self.request.vector_search.as_ref()?;
775
776        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
777        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
778        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
779
780        let applier = VectorIndexApplier::new(
781            self.access_layer.table_dir().to_string(),
782            self.access_layer.path_type(),
783            self.access_layer.object_store().clone(),
784            self.access_layer.puffin_manager_factory().clone(),
785            vector_search.column_id,
786            vector_search.query_vector.clone(),
787            vector_search.metric,
788        )
789        .with_file_cache(file_cache)
790        .with_puffin_metadata_cache(puffin_metadata_cache)
791        .with_vector_index_cache(vector_index_cache);
792
793        Some(Arc::new(applier))
794    }
795}
796
797/// Returns true if the time range of a SST `file` matches the `predicate`.
798fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
799    if predicate == &TimestampRange::min_to_max() {
800        return true;
801    }
802    // end timestamp of a SST is inclusive.
803    let (start, end) = file.time_range();
804    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
805    file_ts_range.intersects(predicate)
806}
807
808/// Common input for different scanners.
809pub struct ScanInput {
810    /// Region SST access layer.
811    access_layer: AccessLayerRef,
812    /// Maps projected Batches to RecordBatches.
813    pub(crate) mapper: Arc<ProjectionMapper>,
814    /// Column ids to read from memtables and SSTs.
815    /// Notice this is different from the columns in `mapper` which are projected columns.
816    /// But this read columns might also include non-projected columns needed for filtering.
817    pub(crate) read_column_ids: Vec<ColumnId>,
818    /// Time range filter for time index.
819    pub(crate) time_range: Option<TimestampRange>,
820    /// Predicate to push down.
821    pub(crate) predicate: PredicateGroup,
822    /// Region partition expr applied at read time.
823    region_partition_expr: Option<PartitionExpr>,
824    /// Memtable range builders for memtables in the time range..
825    pub(crate) memtables: Vec<MemRangeBuilder>,
826    /// Handles to SST files to scan.
827    pub(crate) files: Vec<FileHandle>,
828    /// Cache.
829    pub(crate) cache_strategy: CacheStrategy,
830    /// Ignores file not found error.
831    ignore_file_not_found: bool,
832    /// Capacity of the channel to send data from parallel scan tasks to the main task.
833    pub(crate) parallel_scan_channel_size: usize,
834    /// Maximum number of SST files to scan concurrently.
835    pub(crate) max_concurrent_scan_files: usize,
836    /// Index appliers.
837    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
838    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
839    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
840    /// Vector index applier for KNN search.
841    #[cfg(feature = "vector_index")]
842    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
843    /// Over-fetched k for vector index scan.
844    #[cfg(feature = "vector_index")]
845    pub(crate) vector_index_k: Option<usize>,
846    /// Start time of the query.
847    pub(crate) query_start: Option<Instant>,
848    /// The region is using append mode.
849    pub(crate) append_mode: bool,
850    /// Whether to remove deletion markers.
851    pub(crate) filter_deleted: bool,
852    /// Mode to merge duplicate rows.
853    pub(crate) merge_mode: MergeMode,
854    /// Hint to select rows from time series.
855    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
856    /// Hint for the required distribution of the scanner.
857    pub(crate) distribution: Option<TimeSeriesDistribution>,
858    /// Whether to use flat format.
859    pub(crate) flat_format: bool,
860    /// Whether this scan is for compaction.
861    pub(crate) compaction: bool,
862    #[cfg(feature = "enterprise")]
863    extension_ranges: Vec<BoxedExtensionRange>,
864}
865
866impl ScanInput {
867    /// Creates a new [ScanInput].
868    #[must_use]
869    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
870        ScanInput {
871            access_layer,
872            read_column_ids: mapper.column_ids().to_vec(),
873            mapper: Arc::new(mapper),
874            time_range: None,
875            predicate: PredicateGroup::default(),
876            region_partition_expr: None,
877            memtables: Vec::new(),
878            files: Vec::new(),
879            cache_strategy: CacheStrategy::Disabled,
880            ignore_file_not_found: false,
881            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
882            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
883            inverted_index_appliers: [None, None],
884            bloom_filter_index_appliers: [None, None],
885            fulltext_index_appliers: [None, None],
886            #[cfg(feature = "vector_index")]
887            vector_index_applier: None,
888            #[cfg(feature = "vector_index")]
889            vector_index_k: None,
890            query_start: None,
891            append_mode: false,
892            filter_deleted: true,
893            merge_mode: MergeMode::default(),
894            series_row_selector: None,
895            distribution: None,
896            flat_format: false,
897            compaction: false,
898            #[cfg(feature = "enterprise")]
899            extension_ranges: Vec::new(),
900        }
901    }
902
903    /// Sets time range filter for time index.
904    #[must_use]
905    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
906        self.time_range = time_range;
907        self
908    }
909
910    /// Sets predicate to push down.
911    #[must_use]
912    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
913        self.region_partition_expr = predicate.region_partition_expr().cloned();
914        self.predicate = predicate;
915        self
916    }
917
918    /// Sets memtable range builders.
919    #[must_use]
920    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
921        self.memtables = memtables;
922        self
923    }
924
925    /// Sets files to read.
926    #[must_use]
927    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
928        self.files = files;
929        self
930    }
931
932    /// Sets cache for this query.
933    #[must_use]
934    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
935        self.cache_strategy = cache;
936        self
937    }
938
939    /// Ignores file not found error.
940    #[must_use]
941    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
942        self.ignore_file_not_found = ignore;
943        self
944    }
945
946    /// Sets scan task channel size.
947    #[must_use]
948    pub(crate) fn with_parallel_scan_channel_size(
949        mut self,
950        parallel_scan_channel_size: usize,
951    ) -> Self {
952        self.parallel_scan_channel_size = parallel_scan_channel_size;
953        self
954    }
955
956    /// Sets maximum number of SST files to scan concurrently.
957    #[must_use]
958    pub(crate) fn with_max_concurrent_scan_files(
959        mut self,
960        max_concurrent_scan_files: usize,
961    ) -> Self {
962        self.max_concurrent_scan_files = max_concurrent_scan_files;
963        self
964    }
965
966    /// Sets inverted index appliers.
967    #[must_use]
968    pub(crate) fn with_inverted_index_appliers(
969        mut self,
970        appliers: [Option<InvertedIndexApplierRef>; 2],
971    ) -> Self {
972        self.inverted_index_appliers = appliers;
973        self
974    }
975
976    /// Sets bloom filter appliers.
977    #[must_use]
978    pub(crate) fn with_bloom_filter_index_appliers(
979        mut self,
980        appliers: [Option<BloomFilterIndexApplierRef>; 2],
981    ) -> Self {
982        self.bloom_filter_index_appliers = appliers;
983        self
984    }
985
986    /// Sets fulltext index appliers.
987    #[must_use]
988    pub(crate) fn with_fulltext_index_appliers(
989        mut self,
990        appliers: [Option<FulltextIndexApplierRef>; 2],
991    ) -> Self {
992        self.fulltext_index_appliers = appliers;
993        self
994    }
995
996    /// Sets vector index applier for KNN search.
997    #[cfg(feature = "vector_index")]
998    #[must_use]
999    pub(crate) fn with_vector_index_applier(
1000        mut self,
1001        applier: Option<VectorIndexApplierRef>,
1002    ) -> Self {
1003        self.vector_index_applier = applier;
1004        self
1005    }
1006
1007    /// Sets over-fetched k for vector index scan.
1008    #[cfg(feature = "vector_index")]
1009    #[must_use]
1010    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1011        self.vector_index_k = k;
1012        self
1013    }
1014
1015    /// Sets start time of the query.
1016    #[must_use]
1017    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1018        self.query_start = now;
1019        self
1020    }
1021
1022    #[must_use]
1023    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1024        self.append_mode = is_append_mode;
1025        self
1026    }
1027
1028    /// Sets whether to remove deletion markers during scan.
1029    #[must_use]
1030    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1031        self.filter_deleted = filter_deleted;
1032        self
1033    }
1034
1035    /// Sets the merge mode.
1036    #[must_use]
1037    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1038        self.merge_mode = merge_mode;
1039        self
1040    }
1041
1042    /// Sets the distribution hint.
1043    #[must_use]
1044    pub(crate) fn with_distribution(
1045        mut self,
1046        distribution: Option<TimeSeriesDistribution>,
1047    ) -> Self {
1048        self.distribution = distribution;
1049        self
1050    }
1051
1052    /// Sets the time series row selector.
1053    #[must_use]
1054    pub(crate) fn with_series_row_selector(
1055        mut self,
1056        series_row_selector: Option<TimeSeriesRowSelector>,
1057    ) -> Self {
1058        self.series_row_selector = series_row_selector;
1059        self
1060    }
1061
1062    /// Sets whether to use flat format.
1063    #[must_use]
1064    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1065        self.flat_format = flat_format;
1066        self
1067    }
1068
1069    /// Sets whether this scan is for compaction.
1070    #[must_use]
1071    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1072        self.compaction = compaction;
1073        self
1074    }
1075
1076    /// Scans sources in parallel.
1077    ///
1078    /// # Panics if the input doesn't allow parallel scan.
1079    #[tracing::instrument(
1080        skip(self, sources, semaphore),
1081        fields(
1082            region_id = %self.region_metadata().region_id,
1083            source_count = sources.len()
1084        )
1085    )]
1086    pub(crate) fn create_parallel_sources(
1087        &self,
1088        sources: Vec<Source>,
1089        semaphore: Arc<Semaphore>,
1090    ) -> Result<Vec<Source>> {
1091        if sources.len() <= 1 {
1092            return Ok(sources);
1093        }
1094
1095        // Spawn a task for each source.
1096        let sources = sources
1097            .into_iter()
1098            .map(|source| {
1099                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1100                self.spawn_scan_task(source, semaphore.clone(), sender);
1101                let stream = Box::pin(ReceiverStream::new(receiver));
1102                Source::Stream(stream)
1103            })
1104            .collect();
1105        Ok(sources)
1106    }
1107
1108    /// Builds memtable ranges to scan by `index`.
1109    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1110        let memtable = &self.memtables[index.index];
1111        let mut ranges = SmallVec::new();
1112        memtable.build_ranges(index.row_group_index, &mut ranges);
1113        ranges
1114    }
1115
1116    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1117        if self.should_skip_region_partition(file) {
1118            self.predicate.predicate_without_region().cloned()
1119        } else {
1120            self.predicate.predicate().cloned()
1121        }
1122    }
1123
1124    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1125        match (
1126            self.region_partition_expr.as_ref(),
1127            file.meta_ref().partition_expr.as_ref(),
1128        ) {
1129            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1130            _ => false,
1131        }
1132    }
1133
1134    /// Prunes a file to scan and returns the builder to build readers.
1135    #[tracing::instrument(
1136        skip_all,
1137        fields(
1138            region_id = %self.region_metadata().region_id,
1139            file_id = %file.file_id()
1140        )
1141    )]
1142    pub async fn prune_file(
1143        &self,
1144        file: &FileHandle,
1145        reader_metrics: &mut ReaderMetrics,
1146    ) -> Result<FileRangeBuilder> {
1147        let predicate = self.predicate_for_file(file);
1148        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1149        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1150        let reader = self
1151            .access_layer
1152            .read_sst(file.clone())
1153            .predicate(predicate)
1154            .projection(Some(self.read_column_ids.clone()))
1155            .cache(self.cache_strategy.clone())
1156            .inverted_index_appliers(self.inverted_index_appliers.clone())
1157            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1158            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1159        #[cfg(feature = "vector_index")]
1160        let reader = {
1161            let mut reader = reader;
1162            reader =
1163                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1164            reader
1165        };
1166        let res = reader
1167            .expected_metadata(Some(self.mapper.metadata().clone()))
1168            .flat_format(self.flat_format)
1169            .compaction(self.compaction)
1170            .pre_filter_mode(filter_mode)
1171            .decode_primary_key_values(decode_pk_values)
1172            .build_reader_input(reader_metrics)
1173            .await;
1174        let read_input = match res {
1175            Ok(x) => x,
1176            Err(e) => {
1177                if e.is_object_not_found() && self.ignore_file_not_found {
1178                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1179                    return Ok(FileRangeBuilder::default());
1180                } else {
1181                    return Err(e);
1182                }
1183            }
1184        };
1185
1186        let Some((mut file_range_ctx, selection)) = read_input else {
1187            return Ok(FileRangeBuilder::default());
1188        };
1189
1190        let need_compat = !compat::has_same_columns_and_pk_encoding(
1191            self.mapper.metadata(),
1192            file_range_ctx.read_format().metadata(),
1193        );
1194        if need_compat {
1195            // They have different schema. We need to adapt the batch first so the
1196            // mapper can convert it.
1197            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1198                let mapper = self.mapper.as_flat().unwrap();
1199                FlatCompatBatch::try_new(
1200                    mapper,
1201                    flat_format.metadata(),
1202                    flat_format.format_projection(),
1203                    self.compaction,
1204                )?
1205                .map(CompatBatch::Flat)
1206            } else {
1207                let compact_batch = PrimaryKeyCompatBatch::new(
1208                    &self.mapper,
1209                    file_range_ctx.read_format().metadata().clone(),
1210                )?;
1211                Some(CompatBatch::PrimaryKey(compact_batch))
1212            };
1213            file_range_ctx.set_compat_batch(compat);
1214        }
1215        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1216    }
1217
1218    /// Scans the input source in another task and sends batches to the sender.
1219    #[tracing::instrument(
1220        skip(self, input, semaphore, sender),
1221        fields(region_id = %self.region_metadata().region_id)
1222    )]
1223    pub(crate) fn spawn_scan_task(
1224        &self,
1225        mut input: Source,
1226        semaphore: Arc<Semaphore>,
1227        sender: mpsc::Sender<Result<Batch>>,
1228    ) {
1229        let region_id = self.region_metadata().region_id;
1230        let span = tracing::info_span!(
1231            "ScanInput::parallel_scan_task",
1232            region_id = %region_id,
1233            stream_kind = "batch"
1234        );
1235        common_runtime::spawn_global(
1236            async move {
1237                loop {
1238                    // We release the permit before sending result to avoid the task waiting on
1239                    // the channel with the permit held.
1240                    let maybe_batch = {
1241                        // Safety: We never close the semaphore.
1242                        let _permit = semaphore.acquire().await.unwrap();
1243                        input.next_batch().await
1244                    };
1245                    match maybe_batch {
1246                        Ok(Some(batch)) => {
1247                            let _ = sender.send(Ok(batch)).await;
1248                        }
1249                        Ok(None) => break,
1250                        Err(e) => {
1251                            let _ = sender.send(Err(e)).await;
1252                            break;
1253                        }
1254                    }
1255                }
1256            }
1257            .instrument(span),
1258        );
1259    }
1260
1261    /// Scans flat sources (RecordBatch streams) in parallel.
1262    ///
1263    /// # Panics if the input doesn't allow parallel scan.
1264    #[tracing::instrument(
1265        skip(self, sources, semaphore),
1266        fields(
1267            region_id = %self.region_metadata().region_id,
1268            source_count = sources.len()
1269        )
1270    )]
1271    pub(crate) fn create_parallel_flat_sources(
1272        &self,
1273        sources: Vec<BoxedRecordBatchStream>,
1274        semaphore: Arc<Semaphore>,
1275    ) -> Result<Vec<BoxedRecordBatchStream>> {
1276        if sources.len() <= 1 {
1277            return Ok(sources);
1278        }
1279
1280        // Spawn a task for each source.
1281        let sources = sources
1282            .into_iter()
1283            .map(|source| {
1284                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1285                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1286                let stream = Box::pin(ReceiverStream::new(receiver));
1287                Box::pin(stream) as _
1288            })
1289            .collect();
1290        Ok(sources)
1291    }
1292
1293    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1294    #[tracing::instrument(
1295        skip(self, input, semaphore, sender),
1296        fields(region_id = %self.region_metadata().region_id)
1297    )]
1298    pub(crate) fn spawn_flat_scan_task(
1299        &self,
1300        mut input: BoxedRecordBatchStream,
1301        semaphore: Arc<Semaphore>,
1302        sender: mpsc::Sender<Result<RecordBatch>>,
1303    ) {
1304        let region_id = self.region_metadata().region_id;
1305        let span = tracing::info_span!(
1306            "ScanInput::parallel_scan_task",
1307            region_id = %region_id,
1308            stream_kind = "flat"
1309        );
1310        common_runtime::spawn_global(
1311            async move {
1312                loop {
1313                    // We release the permit before sending result to avoid the task waiting on
1314                    // the channel with the permit held.
1315                    let maybe_batch = {
1316                        // Safety: We never close the semaphore.
1317                        let _permit = semaphore.acquire().await.unwrap();
1318                        input.next().await
1319                    };
1320                    match maybe_batch {
1321                        Some(Ok(batch)) => {
1322                            let _ = sender.send(Ok(batch)).await;
1323                        }
1324                        Some(Err(e)) => {
1325                            let _ = sender.send(Err(e)).await;
1326                            break;
1327                        }
1328                        None => break,
1329                    }
1330                }
1331            }
1332            .instrument(span),
1333        );
1334    }
1335
1336    pub(crate) fn total_rows(&self) -> usize {
1337        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1338        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1339
1340        let rows = rows_in_files + rows_in_memtables;
1341        #[cfg(feature = "enterprise")]
1342        let rows = rows
1343            + self
1344                .extension_ranges
1345                .iter()
1346                .map(|x| x.num_rows())
1347                .sum::<u64>() as usize;
1348        rows
1349    }
1350
1351    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1352        &self.predicate
1353    }
1354
1355    /// Returns number of memtables to scan.
1356    pub(crate) fn num_memtables(&self) -> usize {
1357        self.memtables.len()
1358    }
1359
1360    /// Returns number of SST files to scan.
1361    pub(crate) fn num_files(&self) -> usize {
1362        self.files.len()
1363    }
1364
1365    /// Gets the file handle from a row group index.
1366    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1367        let file_index = index.index - self.num_memtables();
1368        &self.files[file_index]
1369    }
1370
1371    pub fn region_metadata(&self) -> &RegionMetadataRef {
1372        self.mapper.metadata()
1373    }
1374}
1375
1376#[cfg(feature = "enterprise")]
1377impl ScanInput {
1378    #[must_use]
1379    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1380        Self {
1381            extension_ranges,
1382            ..self
1383        }
1384    }
1385
1386    #[cfg(feature = "enterprise")]
1387    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1388        &self.extension_ranges
1389    }
1390
1391    /// Get a boxed [ExtensionRange] by the index in all ranges.
1392    #[cfg(feature = "enterprise")]
1393    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1394        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1395    }
1396}
1397
1398#[cfg(test)]
1399impl ScanInput {
1400    /// Returns SST file ids to scan.
1401    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1402        self.files.iter().map(|file| file.file_id()).collect()
1403    }
1404
1405    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1406        self.files.iter().map(|file| file.index_id()).collect()
1407    }
1408}
1409
1410fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1411    if append_mode {
1412        return PreFilterMode::All;
1413    }
1414
1415    match merge_mode {
1416        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1417        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1418    }
1419}
1420
1421/// Builds a [ScanRequestFingerprint] from a [ScanInput] if the scan is eligible
1422/// for partition range caching.
1423pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1424    let eligible = input.flat_format
1425        && !input.compaction
1426        && !input.files.is_empty()
1427        && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1428
1429    if !eligible {
1430        return None;
1431    }
1432
1433    let metadata = input.region_metadata();
1434    let tag_names: HashSet<&str> = metadata
1435        .column_metadatas
1436        .iter()
1437        .filter(|col| col.semantic_type == SemanticType::Tag)
1438        .map(|col| col.column_schema.name.as_str())
1439        .collect();
1440
1441    let time_index = metadata.time_index_column();
1442    let time_index_name = time_index.column_schema.name.clone();
1443    let ts_col_unit = time_index
1444        .column_schema
1445        .data_type
1446        .as_timestamp()
1447        .expect("Time index must have timestamp-compatible type")
1448        .unit();
1449
1450    let exprs = input
1451        .predicate_group()
1452        .predicate_without_region()
1453        .map(|predicate| predicate.exprs())
1454        .unwrap_or_default();
1455
1456    let mut filters = Vec::new();
1457    let mut time_filters = Vec::new();
1458    let mut has_tag_filter = false;
1459    let mut columns = HashSet::new();
1460
1461    for expr in exprs {
1462        columns.clear();
1463        let is_time_only = match expr_to_columns(expr, &mut columns) {
1464            Ok(()) if !columns.is_empty() => {
1465                has_tag_filter |= columns
1466                    .iter()
1467                    .any(|col| tag_names.contains(col.name.as_str()));
1468                columns.iter().all(|col| col.name == time_index_name)
1469            }
1470            _ => false,
1471        };
1472
1473        if is_time_only
1474            && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1475        {
1476            // Range-reducible time predicates can be safely dropped from the
1477            // cache key when the query time range covers the partition range.
1478            time_filters.push(expr.to_string());
1479        } else {
1480            // Non-time filters and non-range time predicates (those that
1481            // extract_time_range_from_expr cannot convert to a TimestampRange)
1482            // always stay in the cache key.
1483            filters.push(expr.to_string());
1484        }
1485    }
1486
1487    if !has_tag_filter {
1488        // We only cache requests that have tag filters to avoid caching all series.
1489        return None;
1490    }
1491
1492    // Ensure the filters are sorted for consistent fingerprinting.
1493    filters.sort_unstable();
1494    time_filters.sort_unstable();
1495
1496    Some(
1497        crate::read::range_cache::ScanRequestFingerprintBuilder {
1498            read_column_ids: input.read_column_ids.clone(),
1499            read_column_types: input
1500                .read_column_ids
1501                .iter()
1502                .map(|id| {
1503                    metadata
1504                        .column_by_id(*id)
1505                        .map(|col| col.column_schema.data_type.clone())
1506                })
1507                .collect(),
1508            filters,
1509            time_filters,
1510            series_row_selector: input.series_row_selector,
1511            append_mode: input.append_mode,
1512            filter_deleted: input.filter_deleted,
1513            merge_mode: input.merge_mode,
1514            partition_expr_version: metadata.partition_expr_version,
1515        }
1516        .build(),
1517    )
1518}
1519
1520/// Context shared by different streams from a scanner.
1521/// It contains the input and ranges to scan.
1522pub struct StreamContext {
1523    /// Input memtables and files.
1524    pub input: ScanInput,
1525    /// Metadata for partition ranges.
1526    pub(crate) ranges: Vec<RangeMeta>,
1527    /// Precomputed scan fingerprint for partition range caching.
1528    /// `None` when the scan is not eligible for caching.
1529    #[allow(dead_code)]
1530    pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1531
1532    // Metrics:
1533    /// The start time of the query.
1534    pub(crate) query_start: Instant,
1535}
1536
1537impl StreamContext {
1538    /// Creates a new [StreamContext] for [SeqScan].
1539    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1540        let query_start = input.query_start.unwrap_or_else(Instant::now);
1541        let ranges = RangeMeta::seq_scan_ranges(&input);
1542        READ_SST_COUNT.observe(input.num_files() as f64);
1543        let scan_fingerprint = build_scan_fingerprint(&input);
1544
1545        Self {
1546            input,
1547            ranges,
1548            scan_fingerprint,
1549            query_start,
1550        }
1551    }
1552
1553    /// Creates a new [StreamContext] for [UnorderedScan].
1554    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1555        let query_start = input.query_start.unwrap_or_else(Instant::now);
1556        let ranges = RangeMeta::unordered_scan_ranges(&input);
1557        READ_SST_COUNT.observe(input.num_files() as f64);
1558        let scan_fingerprint = build_scan_fingerprint(&input);
1559
1560        Self {
1561            input,
1562            ranges,
1563            scan_fingerprint,
1564            query_start,
1565        }
1566    }
1567
1568    /// Returns true if the index refers to a memtable.
1569    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1570        self.input.num_memtables() > index.index
1571    }
1572
1573    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1574        !self.is_mem_range_index(index)
1575            && index.index < self.input.num_files() + self.input.num_memtables()
1576    }
1577
1578    /// Retrieves the partition ranges.
1579    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1580        self.ranges
1581            .iter()
1582            .enumerate()
1583            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1584            .collect()
1585    }
1586
1587    /// Format the context for explain.
1588    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1589        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1590        for range_meta in &self.ranges {
1591            for idx in &range_meta.row_group_indices {
1592                if self.is_mem_range_index(*idx) {
1593                    num_mem_ranges += 1;
1594                } else if self.is_file_range_index(*idx) {
1595                    num_file_ranges += 1;
1596                } else {
1597                    num_other_ranges += 1;
1598                }
1599            }
1600        }
1601        if verbose {
1602            write!(f, "{{")?;
1603        }
1604        write!(
1605            f,
1606            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1607            self.ranges.len(),
1608            num_mem_ranges,
1609            self.input.num_files(),
1610            num_file_ranges,
1611        )?;
1612        if num_other_ranges > 0 {
1613            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1614        }
1615        write!(f, "}}")?;
1616
1617        if let Some(selector) = &self.input.series_row_selector {
1618            write!(f, ", \"selector\":\"{}\"", selector)?;
1619        }
1620        if let Some(distribution) = &self.input.distribution {
1621            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1622        }
1623
1624        if verbose {
1625            self.format_verbose_content(f)?;
1626        }
1627
1628        Ok(())
1629    }
1630
1631    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1632        struct FileWrapper<'a> {
1633            file: &'a FileHandle,
1634        }
1635
1636        impl fmt::Debug for FileWrapper<'_> {
1637            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1638                let (start, end) = self.file.time_range();
1639                write!(
1640                    f,
1641                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1642                    self.file.file_id(),
1643                    start.value(),
1644                    start.unit(),
1645                    end.value(),
1646                    end.unit(),
1647                    self.file.num_rows(),
1648                    self.file.size(),
1649                    self.file.index_size()
1650                )
1651            }
1652        }
1653
1654        struct InputWrapper<'a> {
1655            input: &'a ScanInput,
1656        }
1657
1658        #[cfg(feature = "enterprise")]
1659        impl InputWrapper<'_> {
1660            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1661                if self.input.extension_ranges.is_empty() {
1662                    return Ok(());
1663                }
1664
1665                let mut delimiter = "";
1666                write!(f, ", extension_ranges: [")?;
1667                for range in self.input.extension_ranges() {
1668                    write!(f, "{}{:?}", delimiter, range)?;
1669                    delimiter = ", ";
1670                }
1671                write!(f, "]")?;
1672                Ok(())
1673            }
1674        }
1675
1676        impl fmt::Debug for InputWrapper<'_> {
1677            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1678                let output_schema = self.input.mapper.output_schema();
1679                if !output_schema.is_empty() {
1680                    let names: Vec<_> = output_schema
1681                        .column_schemas()
1682                        .iter()
1683                        .map(|col| &col.name)
1684                        .collect();
1685                    write!(f, ", \"projection\": {:?}", names)?;
1686                }
1687                if let Some(predicate) = &self.input.predicate.predicate() {
1688                    if !predicate.exprs().is_empty() {
1689                        let exprs: Vec<_> =
1690                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1691                        write!(f, ", \"filters\": {:?}", exprs)?;
1692                    }
1693                    if !predicate.dyn_filters().is_empty() {
1694                        let dyn_filters: Vec<_> = predicate
1695                            .dyn_filters()
1696                            .iter()
1697                            .map(|f| format!("{}", f))
1698                            .collect();
1699                        write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1700                    }
1701                }
1702                #[cfg(feature = "vector_index")]
1703                if let Some(vector_index_k) = self.input.vector_index_k {
1704                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1705                }
1706                if !self.input.files.is_empty() {
1707                    write!(f, ", \"files\": ")?;
1708                    f.debug_list()
1709                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1710                        .finish()?;
1711                }
1712                write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1713
1714                #[cfg(feature = "enterprise")]
1715                self.format_extension_ranges(f)?;
1716
1717                Ok(())
1718            }
1719        }
1720
1721        write!(f, "{:?}", InputWrapper { input: &self.input })
1722    }
1723
1724    /// Add new dynamic filters to the predicates.
1725    /// Safe after stream creation; in-flight reads may still observe an older snapshot.
1726    pub(crate) fn add_dyn_filter_to_predicate(
1727        self: &Arc<Self>,
1728        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1729    ) -> Vec<bool> {
1730        let mut supported = Vec::with_capacity(filter_exprs.len());
1731        let filter_expr = filter_exprs
1732            .into_iter()
1733            .filter_map(|expr| {
1734                if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1735                .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1736            {
1737                supported.push(true);
1738                Some(dyn_filter)
1739            } else {
1740                supported.push(false);
1741                None
1742            }
1743            })
1744            .collect();
1745        self.input.predicate.add_dyn_filters(filter_expr);
1746        supported
1747    }
1748}
1749
1750/// Predicates to evaluate.
1751/// It only keeps filters that [SimpleFilterEvaluator] supports.
1752#[derive(Clone, Default)]
1753pub struct PredicateGroup {
1754    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1755    /// Predicate that includes request filters and region partition expr (if any).
1756    predicate_all: Predicate,
1757    /// Predicate that only includes request filters.
1758    predicate_without_region: Predicate,
1759    /// Region partition expression restored from metadata.
1760    region_partition_expr: Option<PartitionExpr>,
1761}
1762
1763impl PredicateGroup {
1764    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1765    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1766        let mut combined_exprs = exprs.to_vec();
1767        let mut region_partition_expr = None;
1768
1769        if let Some(expr_json) = metadata.partition_expr.as_ref()
1770            && !expr_json.is_empty()
1771            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1772                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1773        {
1774            let logical_expr = expr
1775                .try_as_logical_expr()
1776                .context(InvalidPartitionExprSnafu {
1777                    expr: expr_json.clone(),
1778                })?;
1779
1780            combined_exprs.push(logical_expr);
1781            region_partition_expr = Some(expr);
1782        }
1783
1784        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1785        // Columns in the expr.
1786        let mut columns = HashSet::new();
1787        for expr in &combined_exprs {
1788            columns.clear();
1789            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1790                continue;
1791            };
1792            time_filters.push(filter);
1793        }
1794        let time_filters = if time_filters.is_empty() {
1795            None
1796        } else {
1797            Some(Arc::new(time_filters))
1798        };
1799
1800        let predicate_all = Predicate::new(combined_exprs);
1801        let predicate_without_region = Predicate::new(exprs.to_vec());
1802
1803        Ok(Self {
1804            time_filters,
1805            predicate_all,
1806            predicate_without_region,
1807            region_partition_expr,
1808        })
1809    }
1810
1811    /// Returns time filters.
1812    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1813        self.time_filters.clone()
1814    }
1815
1816    /// Returns predicate of all exprs (including region partition expr if present).
1817    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1818        if self.predicate_all.is_empty() {
1819            None
1820        } else {
1821            Some(&self.predicate_all)
1822        }
1823    }
1824
1825    /// Returns predicate that excludes region partition expr.
1826    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1827        if self.predicate_without_region.is_empty() {
1828            None
1829        } else {
1830            Some(&self.predicate_without_region)
1831        }
1832    }
1833
1834    /// Add dynamic filters in the predicates.
1835    pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1836        self.predicate_all.add_dyn_filters(dyn_filters.clone());
1837        self.predicate_without_region.add_dyn_filters(dyn_filters);
1838    }
1839
1840    /// Returns the region partition expr from metadata, if any.
1841    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1842        self.region_partition_expr.as_ref()
1843    }
1844
1845    fn expr_to_filter(
1846        expr: &Expr,
1847        metadata: &RegionMetadata,
1848        columns: &mut HashSet<Column>,
1849    ) -> Option<SimpleFilterEvaluator> {
1850        columns.clear();
1851        // `expr_to_columns` won't return error.
1852        // We still ignore these expressions for safety.
1853        expr_to_columns(expr, columns).ok()?;
1854        if columns.len() > 1 {
1855            // Simple filter doesn't support multiple columns.
1856            return None;
1857        }
1858        let column = columns.iter().next()?;
1859        let column_meta = metadata.column_by_name(&column.name)?;
1860        if column_meta.semantic_type == SemanticType::Timestamp {
1861            SimpleFilterEvaluator::try_new(expr)
1862        } else {
1863            None
1864        }
1865    }
1866}
1867
1868#[cfg(test)]
1869mod tests {
1870    use std::sync::Arc;
1871
1872    use datafusion::physical_plan::expressions::lit as physical_lit;
1873    use datafusion_common::ScalarValue;
1874    use datafusion_expr::{col, lit};
1875    use datatypes::value::Value;
1876    use partition::expr::col as partition_col;
1877    use store_api::metadata::RegionMetadataBuilder;
1878    use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
1879
1880    use super::*;
1881    use crate::cache::CacheManager;
1882    use crate::memtable::time_partition::TimePartitions;
1883    use crate::read::range_cache::ScanRequestFingerprintBuilder;
1884    use crate::region::options::RegionOptions;
1885    use crate::region::version::VersionBuilder;
1886    use crate::sst::FormatType;
1887    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1888    use crate::test_util::scheduler_util::SchedulerEnv;
1889
1890    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1891        let mutable = Arc::new(TimePartitions::new(
1892            metadata.clone(),
1893            Arc::new(EmptyMemtableBuilder::default()),
1894            0,
1895            None,
1896        ));
1897        Arc::new(VersionBuilder::new(metadata, mutable).build())
1898    }
1899
1900    fn new_version_with_sst_format(
1901        metadata: RegionMetadataRef,
1902        sst_format: Option<FormatType>,
1903    ) -> VersionRef {
1904        let mutable = Arc::new(TimePartitions::new(
1905            metadata.clone(),
1906            Arc::new(EmptyMemtableBuilder::default()),
1907            0,
1908            None,
1909        ));
1910        let options = RegionOptions {
1911            sst_format,
1912            ..Default::default()
1913        };
1914        Arc::new(
1915            VersionBuilder::new(metadata, mutable)
1916                .options(options)
1917                .build(),
1918        )
1919    }
1920
1921    async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1922        let env = SchedulerEnv::new().await;
1923        let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
1924        let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1925        let file = FileHandle::new(
1926            crate::sst::file::FileMeta::default(),
1927            Arc::new(crate::sst::file_purger::NoopFilePurger),
1928        );
1929
1930        ScanInput::new(env.access_layer.clone(), mapper)
1931            .with_predicate(predicate)
1932            .with_cache(CacheStrategy::EnableAll(Arc::new(
1933                CacheManager::builder()
1934                    .range_result_cache_size(1024)
1935                    .build(),
1936            )))
1937            .with_flat_format(true)
1938            .with_files(vec![file])
1939    }
1940
1941    #[tokio::test]
1942    async fn test_build_read_column_ids_includes_filters() {
1943        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1944        let version = new_version(metadata.clone());
1945        let env = SchedulerEnv::new().await;
1946        let request = ScanRequest {
1947            projection: Some(vec![4]),
1948            filters: vec![
1949                col("v0").gt(lit(1)),
1950                col("ts").gt(lit(0)),
1951                col("k0").eq(lit("foo")),
1952            ],
1953            ..Default::default()
1954        };
1955        let scan_region = ScanRegion::new(
1956            version,
1957            env.access_layer.clone(),
1958            request,
1959            CacheStrategy::Disabled,
1960        );
1961        let predicate =
1962            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1963        let projection = scan_region.request.projection.as_ref().unwrap();
1964        let read_ids = scan_region
1965            .build_read_column_ids(projection, &predicate)
1966            .unwrap();
1967        assert_eq!(vec![4, 0, 2, 3], read_ids);
1968    }
1969
1970    #[tokio::test]
1971    async fn test_build_read_column_ids_empty_projection() {
1972        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1973        let version = new_version(metadata.clone());
1974        let env = SchedulerEnv::new().await;
1975        let request = ScanRequest {
1976            projection: Some(vec![]),
1977            ..Default::default()
1978        };
1979        let scan_region = ScanRegion::new(
1980            version,
1981            env.access_layer.clone(),
1982            request,
1983            CacheStrategy::Disabled,
1984        );
1985        let predicate =
1986            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1987        let projection = scan_region.request.projection.as_ref().unwrap();
1988        let read_ids = scan_region
1989            .build_read_column_ids(projection, &predicate)
1990            .unwrap();
1991        // Empty projection should still read the time index column (id 2 in this test schema).
1992        assert_eq!(vec![2], read_ids);
1993    }
1994
1995    #[tokio::test]
1996    async fn test_build_read_column_ids_keeps_projection_order() {
1997        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1998        let version = new_version(metadata.clone());
1999        let env = SchedulerEnv::new().await;
2000        let request = ScanRequest {
2001            projection: Some(vec![4, 1]),
2002            filters: vec![col("v0").gt(lit(1))],
2003            ..Default::default()
2004        };
2005        let scan_region = ScanRegion::new(
2006            version,
2007            env.access_layer.clone(),
2008            request,
2009            CacheStrategy::Disabled,
2010        );
2011        let predicate =
2012            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
2013        let projection = scan_region.request.projection.as_ref().unwrap();
2014        let read_ids = scan_region
2015            .build_read_column_ids(projection, &predicate)
2016            .unwrap();
2017        // Projection order preserved, extra columns appended in schema order.
2018        assert_eq!(vec![4, 1, 3], read_ids);
2019    }
2020
2021    #[tokio::test]
2022    async fn test_use_flat_format_honors_request_override() {
2023        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2024        let env = SchedulerEnv::new().await;
2025
2026        let primary_key_version =
2027            new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
2028        let request = ScanRequest::default();
2029        let scan_region = ScanRegion::new(
2030            primary_key_version.clone(),
2031            env.access_layer.clone(),
2032            request,
2033            CacheStrategy::Disabled,
2034        );
2035        assert!(!scan_region.use_flat_format());
2036
2037        let request = ScanRequest {
2038            force_flat_format: true,
2039            ..Default::default()
2040        };
2041        let scan_region = ScanRegion::new(
2042            primary_key_version,
2043            env.access_layer.clone(),
2044            request,
2045            CacheStrategy::Disabled,
2046        );
2047        assert!(scan_region.use_flat_format());
2048
2049        let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
2050        let request = ScanRequest::default();
2051        let scan_region = ScanRegion::new(
2052            flat_version,
2053            env.access_layer.clone(),
2054            request,
2055            CacheStrategy::Disabled,
2056        );
2057        assert!(scan_region.use_flat_format());
2058    }
2059
2060    /// Helper to create a timestamp millisecond literal.
2061    fn ts_lit(val: i64) -> datafusion_expr::Expr {
2062        lit(ScalarValue::TimestampMillisecond(Some(val), None))
2063    }
2064
2065    #[tokio::test]
2066    async fn test_build_scan_fingerprint_for_eligible_scan() {
2067        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2068        let input = new_scan_input(
2069            metadata.clone(),
2070            vec![
2071                col("ts").gt_eq(ts_lit(1000)),
2072                col("k0").eq(lit("foo")),
2073                col("v0").gt(lit(1)),
2074            ],
2075        )
2076        .await
2077        .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2078        .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2079        .with_merge_mode(MergeMode::LastNonNull)
2080        .with_filter_deleted(false);
2081
2082        let fingerprint = build_scan_fingerprint(&input).unwrap();
2083
2084        let expected = ScanRequestFingerprintBuilder {
2085            read_column_ids: input.read_column_ids.clone(),
2086            read_column_types: vec![
2087                metadata
2088                    .column_by_id(0)
2089                    .map(|col| col.column_schema.data_type.clone()),
2090                metadata
2091                    .column_by_id(2)
2092                    .map(|col| col.column_schema.data_type.clone()),
2093                metadata
2094                    .column_by_id(3)
2095                    .map(|col| col.column_schema.data_type.clone()),
2096            ],
2097            filters: vec![
2098                col("k0").eq(lit("foo")).to_string(),
2099                col("v0").gt(lit(1)).to_string(),
2100            ],
2101            time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2102            series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2103            append_mode: false,
2104            filter_deleted: false,
2105            merge_mode: MergeMode::LastNonNull,
2106            partition_expr_version: 0,
2107        }
2108        .build();
2109        assert_eq!(expected, fingerprint);
2110    }
2111
2112    #[tokio::test]
2113    async fn test_build_scan_fingerprint_requires_tag_filter() {
2114        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2115        let input = new_scan_input(
2116            metadata,
2117            vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2118        )
2119        .await;
2120
2121        assert!(build_scan_fingerprint(&input).is_none());
2122    }
2123
2124    #[tokio::test]
2125    async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2126        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2127        let filters = vec![col("k0").eq(lit("foo"))];
2128
2129        let disabled = ScanInput::new(
2130            SchedulerEnv::new().await.access_layer.clone(),
2131            ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
2132        )
2133        .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
2134        .with_flat_format(true);
2135        assert!(build_scan_fingerprint(&disabled).is_none());
2136
2137        let non_flat = new_scan_input(metadata.clone(), filters.clone())
2138            .await
2139            .with_flat_format(false);
2140        assert!(build_scan_fingerprint(&non_flat).is_none());
2141
2142        let compaction = new_scan_input(metadata.clone(), filters.clone())
2143            .await
2144            .with_compaction(true);
2145        assert!(build_scan_fingerprint(&compaction).is_none());
2146
2147        // No files to read.
2148        let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2149        assert!(build_scan_fingerprint(&no_files).is_none());
2150    }
2151
2152    #[tokio::test]
2153    async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2154        let base = metadata_with_primary_key(vec![0, 1], false);
2155        let mut builder = RegionMetadataBuilder::from_existing(base);
2156        let partition_expr = partition_col("k0")
2157            .gt_eq(Value::String("foo".into()))
2158            .as_json_str()
2159            .unwrap();
2160        builder.partition_expr_json(Some(partition_expr));
2161        let metadata = Arc::new(builder.build_without_validation().unwrap());
2162
2163        let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2164        let fingerprint = build_scan_fingerprint(&input).unwrap();
2165
2166        let expected = ScanRequestFingerprintBuilder {
2167            read_column_ids: input.read_column_ids.clone(),
2168            read_column_types: vec![
2169                metadata
2170                    .column_by_id(0)
2171                    .map(|col| col.column_schema.data_type.clone()),
2172                metadata
2173                    .column_by_id(2)
2174                    .map(|col| col.column_schema.data_type.clone()),
2175                metadata
2176                    .column_by_id(3)
2177                    .map(|col| col.column_schema.data_type.clone()),
2178            ],
2179            filters: vec![col("k0").eq(lit("foo")).to_string()],
2180            time_filters: vec![],
2181            series_row_selector: None,
2182            append_mode: false,
2183            filter_deleted: true,
2184            merge_mode: MergeMode::LastRow,
2185            partition_expr_version: metadata.partition_expr_version,
2186        }
2187        .build();
2188        assert_eq!(expected, fingerprint);
2189        assert_ne!(0, metadata.partition_expr_version);
2190    }
2191
2192    #[test]
2193    fn test_update_dyn_filters_with_empty_base_predicates() {
2194        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2195        let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2196        assert!(predicate_group.predicate().is_none());
2197        assert!(predicate_group.predicate_without_region().is_none());
2198
2199        let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2200        predicate_group.add_dyn_filters(vec![dyn_filter]);
2201
2202        let predicate_all = predicate_group.predicate().unwrap();
2203        assert!(predicate_all.exprs().is_empty());
2204        assert_eq!(1, predicate_all.dyn_filters().len());
2205
2206        let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2207        assert!(predicate_without_region.exprs().is_empty());
2208        assert_eq!(1, predicate_without_region.dyn_filters().len());
2209    }
2210}