mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::{OptionExt as _, ResultExt};
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40    ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78/// Parallel scan channel size for flat format.
79const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83/// A scanner scans a region and returns a [SendableRecordBatchStream].
84pub(crate) enum Scanner {
85    /// Sequential scan.
86    Seq(SeqScan),
87    /// Unordered scan.
88    Unordered(UnorderedScan),
89    /// Per-series scan.
90    Series(SeriesScan),
91}
92
93impl Scanner {
94    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
95    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97        match self {
98            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100            Scanner::Series(series_scan) => series_scan.build_stream().await,
101        }
102    }
103
104    /// Create a stream of [`Batch`] by this scanner.
105    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106        match self {
107            Scanner::Seq(x) => x.scan_all_partitions(),
108            Scanner::Unordered(x) => x.scan_all_partitions(),
109            Scanner::Series(x) => x.scan_all_partitions(),
110        }
111    }
112}
113
114#[cfg(test)]
115impl Scanner {
116    /// Returns number of files to scan.
117    pub(crate) fn num_files(&self) -> usize {
118        match self {
119            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121            Scanner::Series(series_scan) => series_scan.input().num_files(),
122        }
123    }
124
125    /// Returns number of memtables to scan.
126    pub(crate) fn num_memtables(&self) -> usize {
127        match self {
128            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131        }
132    }
133
134    /// Returns SST file ids to scan.
135    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136        match self {
137            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139            Scanner::Series(series_scan) => series_scan.input().file_ids(),
140        }
141    }
142
143    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144        match self {
145            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147            Scanner::Series(series_scan) => series_scan.input().index_ids(),
148        }
149    }
150
151    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
152    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153        use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155        let request = PrepareRequest::default().with_target_partitions(target_partitions);
156        match self {
157            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160        }
161    }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165/// Helper to scans a region by [ScanRequest].
166///
167/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
168/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
169///
170/// ```mermaid
171/// classDiagram
172/// class ScanRegion {
173///     -VersionRef version
174///     -ScanRequest request
175///     ~scanner() Scanner
176///     ~seq_scan() SeqScan
177/// }
178/// class Scanner {
179///     <<enumeration>>
180///     SeqScan
181///     UnorderedScan
182///     +scan() SendableRecordBatchStream
183/// }
184/// class SeqScan {
185///     -ScanInput input
186///     +build() SendableRecordBatchStream
187/// }
188/// class UnorderedScan {
189///     -ScanInput input
190///     +build() SendableRecordBatchStream
191/// }
192/// class ScanInput {
193///     -ProjectionMapper mapper
194///     -Option~TimeRange~ time_range
195///     -Option~Predicate~ predicate
196///     -Vec~MemtableRef~ memtables
197///     -Vec~FileHandle~ files
198/// }
199/// class ProjectionMapper {
200///     ~output_schema() SchemaRef
201///     ~convert(Batch) RecordBatch
202/// }
203/// ScanRegion -- Scanner
204/// ScanRegion o-- ScanRequest
205/// Scanner o-- SeqScan
206/// Scanner o-- UnorderedScan
207/// SeqScan o-- ScanInput
208/// UnorderedScan o-- ScanInput
209/// Scanner -- SendableRecordBatchStream
210/// ScanInput o-- ProjectionMapper
211/// SeqScan -- SendableRecordBatchStream
212/// UnorderedScan -- SendableRecordBatchStream
213/// ```
214pub(crate) struct ScanRegion {
215    /// Version of the region at scan.
216    version: VersionRef,
217    /// Access layer of the region.
218    access_layer: AccessLayerRef,
219    /// Scan request.
220    request: ScanRequest,
221    /// Cache.
222    cache_strategy: CacheStrategy,
223    /// Capacity of the channel to send data from parallel scan tasks to the main task.
224    parallel_scan_channel_size: usize,
225    /// Maximum number of SST files to scan concurrently.
226    max_concurrent_scan_files: usize,
227    /// Whether to ignore inverted index.
228    ignore_inverted_index: bool,
229    /// Whether to ignore fulltext index.
230    ignore_fulltext_index: bool,
231    /// Whether to ignore bloom filter.
232    ignore_bloom_filter: bool,
233    /// Start time of the scan task.
234    start_time: Option<Instant>,
235    /// Whether to filter out the deleted rows.
236    /// Usually true for normal read, and false for scan for compaction.
237    filter_deleted: bool,
238    #[cfg(feature = "enterprise")]
239    extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243    /// Creates a [ScanRegion].
244    pub(crate) fn new(
245        version: VersionRef,
246        access_layer: AccessLayerRef,
247        request: ScanRequest,
248        cache_strategy: CacheStrategy,
249    ) -> ScanRegion {
250        ScanRegion {
251            version,
252            access_layer,
253            request,
254            cache_strategy,
255            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257            ignore_inverted_index: false,
258            ignore_fulltext_index: false,
259            ignore_bloom_filter: false,
260            start_time: None,
261            filter_deleted: true,
262            #[cfg(feature = "enterprise")]
263            extension_range_provider: None,
264        }
265    }
266
267    /// Sets parallel scan task channel size.
268    #[must_use]
269    pub(crate) fn with_parallel_scan_channel_size(
270        mut self,
271        parallel_scan_channel_size: usize,
272    ) -> Self {
273        self.parallel_scan_channel_size = parallel_scan_channel_size;
274        self
275    }
276
277    /// Sets maximum number of SST files to scan concurrently.
278    #[must_use]
279    pub(crate) fn with_max_concurrent_scan_files(
280        mut self,
281        max_concurrent_scan_files: usize,
282    ) -> Self {
283        self.max_concurrent_scan_files = max_concurrent_scan_files;
284        self
285    }
286
287    /// Sets whether to ignore inverted index.
288    #[must_use]
289    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290        self.ignore_inverted_index = ignore;
291        self
292    }
293
294    /// Sets whether to ignore fulltext index.
295    #[must_use]
296    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297        self.ignore_fulltext_index = ignore;
298        self
299    }
300
301    /// Sets whether to ignore bloom filter.
302    #[must_use]
303    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304        self.ignore_bloom_filter = ignore;
305        self
306    }
307
308    #[must_use]
309    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310        self.start_time = Some(now);
311        self
312    }
313
314    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315        self.filter_deleted = filter_deleted;
316    }
317
318    #[cfg(feature = "enterprise")]
319    pub(crate) fn set_extension_range_provider(
320        &mut self,
321        extension_range_provider: BoxedExtensionRangeProvider,
322    ) {
323        self.extension_range_provider = Some(extension_range_provider);
324    }
325
326    /// Returns a [Scanner] to scan the region.
327    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328    pub(crate) async fn scanner(self) -> Result<Scanner> {
329        if self.use_series_scan() {
330            self.series_scan().await.map(Scanner::Series)
331        } else if self.use_unordered_scan() {
332            // If table is append only and there is no series row selector, we use unordered scan in query.
333            // We still use seq scan in compaction.
334            self.unordered_scan().await.map(Scanner::Unordered)
335        } else {
336            self.seq_scan().await.map(Scanner::Seq)
337        }
338    }
339
340    /// Returns a [RegionScanner] to scan the region.
341    #[tracing::instrument(
342        level = tracing::Level::DEBUG,
343        skip_all,
344        fields(region_id = %self.region_id())
345    )]
346    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347        if self.use_series_scan() {
348            self.series_scan()
349                .await
350                .map(|scanner| Box::new(scanner) as _)
351        } else if self.use_unordered_scan() {
352            self.unordered_scan()
353                .await
354                .map(|scanner| Box::new(scanner) as _)
355        } else {
356            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357        }
358    }
359
360    /// Scan sequentially.
361    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363        let input = self.scan_input().await?.with_compaction(false);
364        Ok(SeqScan::new(input))
365    }
366
367    /// Unordered scan.
368    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370        let input = self.scan_input().await?;
371        Ok(UnorderedScan::new(input))
372    }
373
374    /// Scans by series.
375    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377        let input = self.scan_input().await?;
378        Ok(SeriesScan::new(input))
379    }
380
381    /// Returns true if the region can use unordered scan for current request.
382    fn use_unordered_scan(&self) -> bool {
383        // We use unordered scan when:
384        // 1. The region is in append mode.
385        // 2. There is no series row selector.
386        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
387        //
388        // We still use seq scan in compaction.
389        self.version.options.append_mode
390            && self.request.series_row_selector.is_none()
391            && (self.request.distribution.is_none()
392                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393    }
394
395    /// Returns true if the region can use series scan for current request.
396    fn use_series_scan(&self) -> bool {
397        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398    }
399
400    /// Returns true if the region use flat format.
401    fn use_flat_format(&self) -> bool {
402        self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
403    }
404
405    /// Creates a scan input.
406    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407    async fn scan_input(mut self) -> Result<ScanInput> {
408        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409        let time_range = self.build_time_range_predicate();
410        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411        let flat_format = self.use_flat_format();
412
413        let read_column_ids = match &self.request.projection {
414            Some(p) => self.build_read_column_ids(p, &predicate)?,
415            None => self
416                .version
417                .metadata
418                .column_metadatas
419                .iter()
420                .map(|col| col.column_id)
421                .collect(),
422        };
423
424        // The mapper always computes projected column ids as the schema of SSTs may change.
425        let mapper = match &self.request.projection {
426            Some(p) => ProjectionMapper::new_with_read_columns(
427                &self.version.metadata,
428                p.iter().copied(),
429                flat_format,
430                read_column_ids.clone(),
431            )?,
432            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
433        };
434
435        let ssts = &self.version.ssts;
436        let mut files = Vec::new();
437        for level in ssts.levels() {
438            for file in level.files.values() {
439                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
440                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
441                    // If the file's sequence is None (or actually is zero), it could mean the file
442                    // is generated and added to the region "directly". In this case, its data should
443                    // be considered as fresh as the memtable. So its sequence is treated greater than
444                    // the min_sequence, whatever the value of min_sequence is. Hence the default
445                    // "true" in this arm.
446                    (Some(_), None) => true,
447                    (None, _) => true,
448                };
449
450                // Finds SST files in range.
451                if exceed_min_sequence && file_in_range(file, &time_range) {
452                    files.push(file.clone());
453                }
454                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
455                // unless the timing is too good, or the sequence number wouldn't be in file.
456                // and the batch will be filtered out by tree reader anyway.
457            }
458        }
459
460        let memtables = self.version.memtables.list_memtables();
461        // Skip empty memtables and memtables out of time range.
462        let mut mem_range_builders = Vec::new();
463        let filter_mode = pre_filter_mode(
464            self.version.options.append_mode,
465            self.version.options.merge_mode(),
466        );
467
468        for m in memtables {
469            // check if memtable is empty by reading stats.
470            let Some((start, end)) = m.stats().time_range() else {
471                continue;
472            };
473            // The time range of the memtable is inclusive.
474            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
475            if !memtable_range.intersects(&time_range) {
476                continue;
477            }
478            let ranges_in_memtable = m.ranges(
479                Some(read_column_ids.as_slice()),
480                RangesOptions::default()
481                    .with_predicate(predicate.clone())
482                    .with_sequence(SequenceRange::new(
483                        self.request.memtable_min_sequence,
484                        self.request.memtable_max_sequence,
485                    ))
486                    .with_pre_filter_mode(filter_mode),
487            )?;
488            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
489                let stats = v.stats().clone();
490                MemRangeBuilder::new(v, stats)
491            }));
492        }
493
494        let region_id = self.region_id();
495        debug!(
496            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
497            region_id,
498            self.request,
499            time_range,
500            mem_range_builders.len(),
501            files.len(),
502            self.version.options.append_mode,
503            flat_format,
504        );
505
506        let (non_field_filters, field_filters) = self.partition_by_field_filters();
507        let inverted_index_appliers = [
508            self.build_invereted_index_applier(&non_field_filters),
509            self.build_invereted_index_applier(&field_filters),
510        ];
511        let bloom_filter_appliers = [
512            self.build_bloom_filter_applier(&non_field_filters),
513            self.build_bloom_filter_applier(&field_filters),
514        ];
515        let fulltext_index_appliers = [
516            self.build_fulltext_index_applier(&non_field_filters),
517            self.build_fulltext_index_applier(&field_filters),
518        ];
519        #[cfg(feature = "vector_index")]
520        let vector_index_applier = self.build_vector_index_applier();
521        #[cfg(feature = "vector_index")]
522        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
523            if self.request.filters.is_empty() {
524                search.k
525            } else {
526                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
527            }
528        });
529
530        if flat_format {
531            // The batch is already large enough so we use a small channel size here.
532            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
533        }
534
535        let input = ScanInput::new(self.access_layer, mapper)
536            .with_time_range(Some(time_range))
537            .with_predicate(predicate)
538            .with_memtables(mem_range_builders)
539            .with_files(files)
540            .with_cache(self.cache_strategy)
541            .with_inverted_index_appliers(inverted_index_appliers)
542            .with_bloom_filter_index_appliers(bloom_filter_appliers)
543            .with_fulltext_index_appliers(fulltext_index_appliers)
544            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
545            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
546            .with_start_time(self.start_time)
547            .with_append_mode(self.version.options.append_mode)
548            .with_filter_deleted(self.filter_deleted)
549            .with_merge_mode(self.version.options.merge_mode())
550            .with_series_row_selector(self.request.series_row_selector)
551            .with_distribution(self.request.distribution)
552            .with_flat_format(flat_format);
553        #[cfg(feature = "vector_index")]
554        let input = input
555            .with_vector_index_applier(vector_index_applier)
556            .with_vector_index_k(vector_index_k);
557
558        #[cfg(feature = "enterprise")]
559        let input = if let Some(provider) = self.extension_range_provider {
560            let ranges = provider
561                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
562                .await?;
563            debug!("Find extension ranges: {ranges:?}");
564            input.with_extension_ranges(ranges)
565        } else {
566            input
567        };
568        Ok(input)
569    }
570
571    fn region_id(&self) -> RegionId {
572        self.version.metadata.region_id
573    }
574
575    /// Build time range predicate from filters.
576    fn build_time_range_predicate(&self) -> TimestampRange {
577        let time_index = self.version.metadata.time_index_column();
578        let unit = time_index
579            .column_schema
580            .data_type
581            .as_timestamp()
582            .expect("Time index must have timestamp-compatible type")
583            .unit();
584        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
585    }
586
587    /// Return all columns id to read according to the projection and filters.
588    fn build_read_column_ids(
589        &self,
590        projection: &[usize],
591        predicate: &PredicateGroup,
592    ) -> Result<Vec<ColumnId>> {
593        let metadata = &self.version.metadata;
594        // use Vec for read_column_ids to keep the order of columns.
595        let mut read_column_ids = Vec::new();
596        let mut seen = HashSet::new();
597
598        for idx in projection {
599            let column =
600                metadata
601                    .column_metadatas
602                    .get(*idx)
603                    .with_context(|| InvalidRequestSnafu {
604                        region_id: metadata.region_id,
605                        reason: format!("projection index {} is out of bound", idx),
606                    })?;
607            seen.insert(column.column_id);
608            // keep the projection order
609            read_column_ids.push(column.column_id);
610        }
611
612        if projection.is_empty() {
613            let time_index = metadata.time_index_column().column_id;
614            if seen.insert(time_index) {
615                read_column_ids.push(time_index);
616            }
617        }
618
619        let mut extra_names = HashSet::new();
620        let mut columns = HashSet::new();
621
622        for expr in &self.request.filters {
623            columns.clear();
624            if expr_to_columns(expr, &mut columns).is_err() {
625                continue;
626            }
627            extra_names.extend(columns.iter().map(|column| column.name.clone()));
628        }
629
630        if let Some(expr) = predicate.region_partition_expr() {
631            expr.collect_column_names(&mut extra_names);
632        }
633
634        if !extra_names.is_empty() {
635            for column in &metadata.column_metadatas {
636                if extra_names.contains(column.column_schema.name.as_str())
637                    && !seen.contains(&column.column_id)
638                {
639                    read_column_ids.push(column.column_id);
640                }
641                extra_names.remove(column.column_schema.name.as_str());
642            }
643            if !extra_names.is_empty() {
644                warn!(
645                    "Some columns in filters are not found in region {}: {:?}",
646                    metadata.region_id, extra_names
647                );
648            }
649        }
650        Ok(read_column_ids)
651    }
652
653    /// Partitions filters into two groups: non-field filters and field filters.
654    /// Returns `(non_field_filters, field_filters)`.
655    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
656        let field_columns = self
657            .version
658            .metadata
659            .field_columns()
660            .map(|col| &col.column_schema.name)
661            .collect::<HashSet<_>>();
662
663        let mut columns = HashSet::new();
664
665        self.request.filters.iter().cloned().partition(|expr| {
666            columns.clear();
667            // `expr_to_columns` won't return error.
668            if expr_to_columns(expr, &mut columns).is_err() {
669                // If we can't extract columns, treat it as non-field filter
670                return true;
671            }
672            // Return true for non-field filters (partition puts true cases in first vec)
673            !columns
674                .iter()
675                .any(|column| field_columns.contains(&column.name))
676        })
677    }
678
679    /// Use the latest schema to build the inverted index applier.
680    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
681        if self.ignore_inverted_index {
682            return None;
683        }
684
685        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
686        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
687
688        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
689
690        InvertedIndexApplierBuilder::new(
691            self.access_layer.table_dir().to_string(),
692            self.access_layer.path_type(),
693            self.access_layer.object_store().clone(),
694            self.version.metadata.as_ref(),
695            self.version.metadata.inverted_indexed_column_ids(
696                self.version
697                    .options
698                    .index_options
699                    .inverted_index
700                    .ignore_column_ids
701                    .iter(),
702            ),
703            self.access_layer.puffin_manager_factory().clone(),
704        )
705        .with_file_cache(file_cache)
706        .with_inverted_index_cache(inverted_index_cache)
707        .with_puffin_metadata_cache(puffin_metadata_cache)
708        .build(filters)
709        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
710        .ok()
711        .flatten()
712        .map(Arc::new)
713    }
714
715    /// Use the latest schema to build the bloom filter index applier.
716    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
717        if self.ignore_bloom_filter {
718            return None;
719        }
720
721        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
722        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
723        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
724
725        BloomFilterIndexApplierBuilder::new(
726            self.access_layer.table_dir().to_string(),
727            self.access_layer.path_type(),
728            self.access_layer.object_store().clone(),
729            self.version.metadata.as_ref(),
730            self.access_layer.puffin_manager_factory().clone(),
731        )
732        .with_file_cache(file_cache)
733        .with_bloom_filter_index_cache(bloom_filter_index_cache)
734        .with_puffin_metadata_cache(puffin_metadata_cache)
735        .build(filters)
736        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
737        .ok()
738        .flatten()
739        .map(Arc::new)
740    }
741
742    /// Use the latest schema to build the fulltext index applier.
743    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
744        if self.ignore_fulltext_index {
745            return None;
746        }
747
748        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
749        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
750        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
751        FulltextIndexApplierBuilder::new(
752            self.access_layer.table_dir().to_string(),
753            self.access_layer.path_type(),
754            self.access_layer.object_store().clone(),
755            self.access_layer.puffin_manager_factory().clone(),
756            self.version.metadata.as_ref(),
757        )
758        .with_file_cache(file_cache)
759        .with_puffin_metadata_cache(puffin_metadata_cache)
760        .with_bloom_filter_cache(bloom_filter_index_cache)
761        .build(filters)
762        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
763        .ok()
764        .flatten()
765        .map(Arc::new)
766    }
767
768    /// Build the vector index applier from vector search request.
769    #[cfg(feature = "vector_index")]
770    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
771        let vector_search = self.request.vector_search.as_ref()?;
772
773        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
774        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
775        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
776
777        let applier = VectorIndexApplier::new(
778            self.access_layer.table_dir().to_string(),
779            self.access_layer.path_type(),
780            self.access_layer.object_store().clone(),
781            self.access_layer.puffin_manager_factory().clone(),
782            vector_search.column_id,
783            vector_search.query_vector.clone(),
784            vector_search.metric,
785        )
786        .with_file_cache(file_cache)
787        .with_puffin_metadata_cache(puffin_metadata_cache)
788        .with_vector_index_cache(vector_index_cache);
789
790        Some(Arc::new(applier))
791    }
792}
793
794/// Returns true if the time range of a SST `file` matches the `predicate`.
795fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
796    if predicate == &TimestampRange::min_to_max() {
797        return true;
798    }
799    // end timestamp of a SST is inclusive.
800    let (start, end) = file.time_range();
801    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
802    file_ts_range.intersects(predicate)
803}
804
805/// Common input for different scanners.
806pub struct ScanInput {
807    /// Region SST access layer.
808    access_layer: AccessLayerRef,
809    /// Maps projected Batches to RecordBatches.
810    pub(crate) mapper: Arc<ProjectionMapper>,
811    /// Column ids to read from memtables and SSTs.
812    /// Notice this is different from the columns in `mapper` which are projected columns.
813    /// But this read columns might also include non-projected columns needed for filtering.
814    pub(crate) read_column_ids: Vec<ColumnId>,
815    /// Time range filter for time index.
816    time_range: Option<TimestampRange>,
817    /// Predicate to push down.
818    pub(crate) predicate: PredicateGroup,
819    /// Region partition expr applied at read time.
820    region_partition_expr: Option<PartitionExpr>,
821    /// Memtable range builders for memtables in the time range..
822    pub(crate) memtables: Vec<MemRangeBuilder>,
823    /// Handles to SST files to scan.
824    pub(crate) files: Vec<FileHandle>,
825    /// Cache.
826    pub(crate) cache_strategy: CacheStrategy,
827    /// Ignores file not found error.
828    ignore_file_not_found: bool,
829    /// Capacity of the channel to send data from parallel scan tasks to the main task.
830    pub(crate) parallel_scan_channel_size: usize,
831    /// Maximum number of SST files to scan concurrently.
832    pub(crate) max_concurrent_scan_files: usize,
833    /// Index appliers.
834    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
835    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
836    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
837    /// Vector index applier for KNN search.
838    #[cfg(feature = "vector_index")]
839    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
840    /// Over-fetched k for vector index scan.
841    #[cfg(feature = "vector_index")]
842    pub(crate) vector_index_k: Option<usize>,
843    /// Start time of the query.
844    pub(crate) query_start: Option<Instant>,
845    /// The region is using append mode.
846    pub(crate) append_mode: bool,
847    /// Whether to remove deletion markers.
848    pub(crate) filter_deleted: bool,
849    /// Mode to merge duplicate rows.
850    pub(crate) merge_mode: MergeMode,
851    /// Hint to select rows from time series.
852    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
853    /// Hint for the required distribution of the scanner.
854    pub(crate) distribution: Option<TimeSeriesDistribution>,
855    /// Whether to use flat format.
856    pub(crate) flat_format: bool,
857    /// Whether this scan is for compaction.
858    pub(crate) compaction: bool,
859    #[cfg(feature = "enterprise")]
860    extension_ranges: Vec<BoxedExtensionRange>,
861}
862
863impl ScanInput {
864    /// Creates a new [ScanInput].
865    #[must_use]
866    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
867        ScanInput {
868            access_layer,
869            read_column_ids: mapper.column_ids().to_vec(),
870            mapper: Arc::new(mapper),
871            time_range: None,
872            predicate: PredicateGroup::default(),
873            region_partition_expr: None,
874            memtables: Vec::new(),
875            files: Vec::new(),
876            cache_strategy: CacheStrategy::Disabled,
877            ignore_file_not_found: false,
878            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
879            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
880            inverted_index_appliers: [None, None],
881            bloom_filter_index_appliers: [None, None],
882            fulltext_index_appliers: [None, None],
883            #[cfg(feature = "vector_index")]
884            vector_index_applier: None,
885            #[cfg(feature = "vector_index")]
886            vector_index_k: None,
887            query_start: None,
888            append_mode: false,
889            filter_deleted: true,
890            merge_mode: MergeMode::default(),
891            series_row_selector: None,
892            distribution: None,
893            flat_format: false,
894            compaction: false,
895            #[cfg(feature = "enterprise")]
896            extension_ranges: Vec::new(),
897        }
898    }
899
900    /// Sets time range filter for time index.
901    #[must_use]
902    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
903        self.time_range = time_range;
904        self
905    }
906
907    /// Sets predicate to push down.
908    #[must_use]
909    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
910        self.region_partition_expr = predicate.region_partition_expr().cloned();
911        self.predicate = predicate;
912        self
913    }
914
915    /// Sets memtable range builders.
916    #[must_use]
917    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
918        self.memtables = memtables;
919        self
920    }
921
922    /// Sets files to read.
923    #[must_use]
924    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
925        self.files = files;
926        self
927    }
928
929    /// Sets cache for this query.
930    #[must_use]
931    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
932        self.cache_strategy = cache;
933        self
934    }
935
936    /// Ignores file not found error.
937    #[must_use]
938    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
939        self.ignore_file_not_found = ignore;
940        self
941    }
942
943    /// Sets scan task channel size.
944    #[must_use]
945    pub(crate) fn with_parallel_scan_channel_size(
946        mut self,
947        parallel_scan_channel_size: usize,
948    ) -> Self {
949        self.parallel_scan_channel_size = parallel_scan_channel_size;
950        self
951    }
952
953    /// Sets maximum number of SST files to scan concurrently.
954    #[must_use]
955    pub(crate) fn with_max_concurrent_scan_files(
956        mut self,
957        max_concurrent_scan_files: usize,
958    ) -> Self {
959        self.max_concurrent_scan_files = max_concurrent_scan_files;
960        self
961    }
962
963    /// Sets inverted index appliers.
964    #[must_use]
965    pub(crate) fn with_inverted_index_appliers(
966        mut self,
967        appliers: [Option<InvertedIndexApplierRef>; 2],
968    ) -> Self {
969        self.inverted_index_appliers = appliers;
970        self
971    }
972
973    /// Sets bloom filter appliers.
974    #[must_use]
975    pub(crate) fn with_bloom_filter_index_appliers(
976        mut self,
977        appliers: [Option<BloomFilterIndexApplierRef>; 2],
978    ) -> Self {
979        self.bloom_filter_index_appliers = appliers;
980        self
981    }
982
983    /// Sets fulltext index appliers.
984    #[must_use]
985    pub(crate) fn with_fulltext_index_appliers(
986        mut self,
987        appliers: [Option<FulltextIndexApplierRef>; 2],
988    ) -> Self {
989        self.fulltext_index_appliers = appliers;
990        self
991    }
992
993    /// Sets vector index applier for KNN search.
994    #[cfg(feature = "vector_index")]
995    #[must_use]
996    pub(crate) fn with_vector_index_applier(
997        mut self,
998        applier: Option<VectorIndexApplierRef>,
999    ) -> Self {
1000        self.vector_index_applier = applier;
1001        self
1002    }
1003
1004    /// Sets over-fetched k for vector index scan.
1005    #[cfg(feature = "vector_index")]
1006    #[must_use]
1007    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1008        self.vector_index_k = k;
1009        self
1010    }
1011
1012    /// Sets start time of the query.
1013    #[must_use]
1014    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1015        self.query_start = now;
1016        self
1017    }
1018
1019    #[must_use]
1020    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1021        self.append_mode = is_append_mode;
1022        self
1023    }
1024
1025    /// Sets whether to remove deletion markers during scan.
1026    #[must_use]
1027    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1028        self.filter_deleted = filter_deleted;
1029        self
1030    }
1031
1032    /// Sets the merge mode.
1033    #[must_use]
1034    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1035        self.merge_mode = merge_mode;
1036        self
1037    }
1038
1039    /// Sets the distribution hint.
1040    #[must_use]
1041    pub(crate) fn with_distribution(
1042        mut self,
1043        distribution: Option<TimeSeriesDistribution>,
1044    ) -> Self {
1045        self.distribution = distribution;
1046        self
1047    }
1048
1049    /// Sets the time series row selector.
1050    #[must_use]
1051    pub(crate) fn with_series_row_selector(
1052        mut self,
1053        series_row_selector: Option<TimeSeriesRowSelector>,
1054    ) -> Self {
1055        self.series_row_selector = series_row_selector;
1056        self
1057    }
1058
1059    /// Sets whether to use flat format.
1060    #[must_use]
1061    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1062        self.flat_format = flat_format;
1063        self
1064    }
1065
1066    /// Sets whether this scan is for compaction.
1067    #[must_use]
1068    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1069        self.compaction = compaction;
1070        self
1071    }
1072
1073    /// Scans sources in parallel.
1074    ///
1075    /// # Panics if the input doesn't allow parallel scan.
1076    #[tracing::instrument(
1077        skip(self, sources, semaphore),
1078        fields(
1079            region_id = %self.region_metadata().region_id,
1080            source_count = sources.len()
1081        )
1082    )]
1083    pub(crate) fn create_parallel_sources(
1084        &self,
1085        sources: Vec<Source>,
1086        semaphore: Arc<Semaphore>,
1087    ) -> Result<Vec<Source>> {
1088        if sources.len() <= 1 {
1089            return Ok(sources);
1090        }
1091
1092        // Spawn a task for each source.
1093        let sources = sources
1094            .into_iter()
1095            .map(|source| {
1096                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1097                self.spawn_scan_task(source, semaphore.clone(), sender);
1098                let stream = Box::pin(ReceiverStream::new(receiver));
1099                Source::Stream(stream)
1100            })
1101            .collect();
1102        Ok(sources)
1103    }
1104
1105    /// Builds memtable ranges to scan by `index`.
1106    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1107        let memtable = &self.memtables[index.index];
1108        let mut ranges = SmallVec::new();
1109        memtable.build_ranges(index.row_group_index, &mut ranges);
1110        ranges
1111    }
1112
1113    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1114        if self.should_skip_region_partition(file) {
1115            self.predicate.predicate_without_region().cloned()
1116        } else {
1117            self.predicate.predicate().cloned()
1118        }
1119    }
1120
1121    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1122        match (
1123            self.region_partition_expr.as_ref(),
1124            file.meta_ref().partition_expr.as_ref(),
1125        ) {
1126            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1127            _ => false,
1128        }
1129    }
1130
1131    /// Prunes a file to scan and returns the builder to build readers.
1132    #[tracing::instrument(
1133        skip_all,
1134        fields(
1135            region_id = %self.region_metadata().region_id,
1136            file_id = %file.file_id()
1137        )
1138    )]
1139    pub async fn prune_file(
1140        &self,
1141        file: &FileHandle,
1142        reader_metrics: &mut ReaderMetrics,
1143    ) -> Result<FileRangeBuilder> {
1144        let predicate = self.predicate_for_file(file);
1145        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1146        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1147        let reader = self
1148            .access_layer
1149            .read_sst(file.clone())
1150            .predicate(predicate)
1151            .projection(Some(self.read_column_ids.clone()))
1152            .cache(self.cache_strategy.clone())
1153            .inverted_index_appliers(self.inverted_index_appliers.clone())
1154            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1155            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1156        #[cfg(feature = "vector_index")]
1157        let reader = {
1158            let mut reader = reader;
1159            reader =
1160                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1161            reader
1162        };
1163        let res = reader
1164            .expected_metadata(Some(self.mapper.metadata().clone()))
1165            .flat_format(self.flat_format)
1166            .compaction(self.compaction)
1167            .pre_filter_mode(filter_mode)
1168            .decode_primary_key_values(decode_pk_values)
1169            .build_reader_input(reader_metrics)
1170            .await;
1171        let (mut file_range_ctx, selection) = match res {
1172            Ok(x) => x,
1173            Err(e) => {
1174                if e.is_object_not_found() && self.ignore_file_not_found {
1175                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1176                    return Ok(FileRangeBuilder::default());
1177                } else {
1178                    return Err(e);
1179                }
1180            }
1181        };
1182
1183        let need_compat = !compat::has_same_columns_and_pk_encoding(
1184            self.mapper.metadata(),
1185            file_range_ctx.read_format().metadata(),
1186        );
1187        if need_compat {
1188            // They have different schema. We need to adapt the batch first so the
1189            // mapper can convert it.
1190            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1191                let mapper = self.mapper.as_flat().unwrap();
1192                FlatCompatBatch::try_new(
1193                    mapper,
1194                    flat_format.metadata(),
1195                    flat_format.format_projection(),
1196                    self.compaction,
1197                )?
1198                .map(CompatBatch::Flat)
1199            } else {
1200                let compact_batch = PrimaryKeyCompatBatch::new(
1201                    &self.mapper,
1202                    file_range_ctx.read_format().metadata().clone(),
1203                )?;
1204                Some(CompatBatch::PrimaryKey(compact_batch))
1205            };
1206            file_range_ctx.set_compat_batch(compat);
1207        }
1208        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1209    }
1210
1211    /// Scans the input source in another task and sends batches to the sender.
1212    #[tracing::instrument(
1213        skip(self, input, semaphore, sender),
1214        fields(region_id = %self.region_metadata().region_id)
1215    )]
1216    pub(crate) fn spawn_scan_task(
1217        &self,
1218        mut input: Source,
1219        semaphore: Arc<Semaphore>,
1220        sender: mpsc::Sender<Result<Batch>>,
1221    ) {
1222        let region_id = self.region_metadata().region_id;
1223        let span = tracing::info_span!(
1224            "ScanInput::parallel_scan_task",
1225            region_id = %region_id,
1226            stream_kind = "batch"
1227        );
1228        common_runtime::spawn_global(
1229            async move {
1230                loop {
1231                    // We release the permit before sending result to avoid the task waiting on
1232                    // the channel with the permit held.
1233                    let maybe_batch = {
1234                        // Safety: We never close the semaphore.
1235                        let _permit = semaphore.acquire().await.unwrap();
1236                        input.next_batch().await
1237                    };
1238                    match maybe_batch {
1239                        Ok(Some(batch)) => {
1240                            let _ = sender.send(Ok(batch)).await;
1241                        }
1242                        Ok(None) => break,
1243                        Err(e) => {
1244                            let _ = sender.send(Err(e)).await;
1245                            break;
1246                        }
1247                    }
1248                }
1249            }
1250            .instrument(span),
1251        );
1252    }
1253
1254    /// Scans flat sources (RecordBatch streams) in parallel.
1255    ///
1256    /// # Panics if the input doesn't allow parallel scan.
1257    #[tracing::instrument(
1258        skip(self, sources, semaphore),
1259        fields(
1260            region_id = %self.region_metadata().region_id,
1261            source_count = sources.len()
1262        )
1263    )]
1264    pub(crate) fn create_parallel_flat_sources(
1265        &self,
1266        sources: Vec<BoxedRecordBatchStream>,
1267        semaphore: Arc<Semaphore>,
1268    ) -> Result<Vec<BoxedRecordBatchStream>> {
1269        if sources.len() <= 1 {
1270            return Ok(sources);
1271        }
1272
1273        // Spawn a task for each source.
1274        let sources = sources
1275            .into_iter()
1276            .map(|source| {
1277                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1278                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1279                let stream = Box::pin(ReceiverStream::new(receiver));
1280                Box::pin(stream) as _
1281            })
1282            .collect();
1283        Ok(sources)
1284    }
1285
1286    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1287    #[tracing::instrument(
1288        skip(self, input, semaphore, sender),
1289        fields(region_id = %self.region_metadata().region_id)
1290    )]
1291    pub(crate) fn spawn_flat_scan_task(
1292        &self,
1293        mut input: BoxedRecordBatchStream,
1294        semaphore: Arc<Semaphore>,
1295        sender: mpsc::Sender<Result<RecordBatch>>,
1296    ) {
1297        let region_id = self.region_metadata().region_id;
1298        let span = tracing::info_span!(
1299            "ScanInput::parallel_scan_task",
1300            region_id = %region_id,
1301            stream_kind = "flat"
1302        );
1303        common_runtime::spawn_global(
1304            async move {
1305                loop {
1306                    // We release the permit before sending result to avoid the task waiting on
1307                    // the channel with the permit held.
1308                    let maybe_batch = {
1309                        // Safety: We never close the semaphore.
1310                        let _permit = semaphore.acquire().await.unwrap();
1311                        input.next().await
1312                    };
1313                    match maybe_batch {
1314                        Some(Ok(batch)) => {
1315                            let _ = sender.send(Ok(batch)).await;
1316                        }
1317                        Some(Err(e)) => {
1318                            let _ = sender.send(Err(e)).await;
1319                            break;
1320                        }
1321                        None => break,
1322                    }
1323                }
1324            }
1325            .instrument(span),
1326        );
1327    }
1328
1329    pub(crate) fn total_rows(&self) -> usize {
1330        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1331        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1332
1333        let rows = rows_in_files + rows_in_memtables;
1334        #[cfg(feature = "enterprise")]
1335        let rows = rows
1336            + self
1337                .extension_ranges
1338                .iter()
1339                .map(|x| x.num_rows())
1340                .sum::<u64>() as usize;
1341        rows
1342    }
1343
1344    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1345        &self.predicate
1346    }
1347
1348    /// Returns number of memtables to scan.
1349    pub(crate) fn num_memtables(&self) -> usize {
1350        self.memtables.len()
1351    }
1352
1353    /// Returns number of SST files to scan.
1354    pub(crate) fn num_files(&self) -> usize {
1355        self.files.len()
1356    }
1357
1358    /// Gets the file handle from a row group index.
1359    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1360        let file_index = index.index - self.num_memtables();
1361        &self.files[file_index]
1362    }
1363
1364    pub fn region_metadata(&self) -> &RegionMetadataRef {
1365        self.mapper.metadata()
1366    }
1367}
1368
1369#[cfg(feature = "enterprise")]
1370impl ScanInput {
1371    #[must_use]
1372    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1373        Self {
1374            extension_ranges,
1375            ..self
1376        }
1377    }
1378
1379    #[cfg(feature = "enterprise")]
1380    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1381        &self.extension_ranges
1382    }
1383
1384    /// Get a boxed [ExtensionRange] by the index in all ranges.
1385    #[cfg(feature = "enterprise")]
1386    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1387        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1388    }
1389}
1390
1391#[cfg(test)]
1392impl ScanInput {
1393    /// Returns SST file ids to scan.
1394    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1395        self.files.iter().map(|file| file.file_id()).collect()
1396    }
1397
1398    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1399        self.files.iter().map(|file| file.index_id()).collect()
1400    }
1401}
1402
1403fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1404    if append_mode {
1405        return PreFilterMode::All;
1406    }
1407
1408    match merge_mode {
1409        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1410        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1411    }
1412}
1413
1414/// Context shared by different streams from a scanner.
1415/// It contains the input and ranges to scan.
1416pub struct StreamContext {
1417    /// Input memtables and files.
1418    pub input: ScanInput,
1419    /// Metadata for partition ranges.
1420    pub(crate) ranges: Vec<RangeMeta>,
1421
1422    // Metrics:
1423    /// The start time of the query.
1424    pub(crate) query_start: Instant,
1425}
1426
1427impl StreamContext {
1428    /// Creates a new [StreamContext] for [SeqScan].
1429    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1430        let query_start = input.query_start.unwrap_or_else(Instant::now);
1431        let ranges = RangeMeta::seq_scan_ranges(&input);
1432        READ_SST_COUNT.observe(input.num_files() as f64);
1433
1434        Self {
1435            input,
1436            ranges,
1437            query_start,
1438        }
1439    }
1440
1441    /// Creates a new [StreamContext] for [UnorderedScan].
1442    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1443        let query_start = input.query_start.unwrap_or_else(Instant::now);
1444        let ranges = RangeMeta::unordered_scan_ranges(&input);
1445        READ_SST_COUNT.observe(input.num_files() as f64);
1446
1447        Self {
1448            input,
1449            ranges,
1450            query_start,
1451        }
1452    }
1453
1454    /// Returns true if the index refers to a memtable.
1455    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1456        self.input.num_memtables() > index.index
1457    }
1458
1459    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1460        !self.is_mem_range_index(index)
1461            && index.index < self.input.num_files() + self.input.num_memtables()
1462    }
1463
1464    /// Retrieves the partition ranges.
1465    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1466        self.ranges
1467            .iter()
1468            .enumerate()
1469            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1470            .collect()
1471    }
1472
1473    /// Format the context for explain.
1474    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1475        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1476        for range_meta in &self.ranges {
1477            for idx in &range_meta.row_group_indices {
1478                if self.is_mem_range_index(*idx) {
1479                    num_mem_ranges += 1;
1480                } else if self.is_file_range_index(*idx) {
1481                    num_file_ranges += 1;
1482                } else {
1483                    num_other_ranges += 1;
1484                }
1485            }
1486        }
1487        if verbose {
1488            write!(f, "{{")?;
1489        }
1490        write!(
1491            f,
1492            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1493            self.ranges.len(),
1494            num_mem_ranges,
1495            self.input.num_files(),
1496            num_file_ranges,
1497        )?;
1498        if num_other_ranges > 0 {
1499            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1500        }
1501        write!(f, "}}")?;
1502
1503        if let Some(selector) = &self.input.series_row_selector {
1504            write!(f, ", \"selector\":\"{}\"", selector)?;
1505        }
1506        if let Some(distribution) = &self.input.distribution {
1507            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1508        }
1509
1510        if verbose {
1511            self.format_verbose_content(f)?;
1512        }
1513
1514        Ok(())
1515    }
1516
1517    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1518        struct FileWrapper<'a> {
1519            file: &'a FileHandle,
1520        }
1521
1522        impl fmt::Debug for FileWrapper<'_> {
1523            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1524                let (start, end) = self.file.time_range();
1525                write!(
1526                    f,
1527                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1528                    self.file.file_id(),
1529                    start.value(),
1530                    start.unit(),
1531                    end.value(),
1532                    end.unit(),
1533                    self.file.num_rows(),
1534                    self.file.size(),
1535                    self.file.index_size()
1536                )
1537            }
1538        }
1539
1540        struct InputWrapper<'a> {
1541            input: &'a ScanInput,
1542        }
1543
1544        #[cfg(feature = "enterprise")]
1545        impl InputWrapper<'_> {
1546            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1547                if self.input.extension_ranges.is_empty() {
1548                    return Ok(());
1549                }
1550
1551                let mut delimiter = "";
1552                write!(f, ", extension_ranges: [")?;
1553                for range in self.input.extension_ranges() {
1554                    write!(f, "{}{:?}", delimiter, range)?;
1555                    delimiter = ", ";
1556                }
1557                write!(f, "]")?;
1558                Ok(())
1559            }
1560        }
1561
1562        impl fmt::Debug for InputWrapper<'_> {
1563            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1564                let output_schema = self.input.mapper.output_schema();
1565                if !output_schema.is_empty() {
1566                    let names: Vec<_> = output_schema
1567                        .column_schemas()
1568                        .iter()
1569                        .map(|col| &col.name)
1570                        .collect();
1571                    write!(f, ", \"projection\": {:?}", names)?;
1572                }
1573                if let Some(predicate) = &self.input.predicate.predicate()
1574                    && !predicate.exprs().is_empty()
1575                {
1576                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1577                    write!(f, ", \"filters\": {:?}", exprs)?;
1578                }
1579                if !self.input.files.is_empty() {
1580                    write!(f, ", \"files\": ")?;
1581                    f.debug_list()
1582                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1583                        .finish()?;
1584                }
1585                write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1586
1587                #[cfg(feature = "enterprise")]
1588                self.format_extension_ranges(f)?;
1589
1590                Ok(())
1591            }
1592        }
1593
1594        write!(f, "{:?}", InputWrapper { input: &self.input })
1595    }
1596}
1597
1598/// Predicates to evaluate.
1599/// It only keeps filters that [SimpleFilterEvaluator] supports.
1600#[derive(Clone, Default)]
1601pub struct PredicateGroup {
1602    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1603    /// Predicate that includes request filters and region partition expr (if any).
1604    predicate_all: Option<Predicate>,
1605    /// Predicate that only includes request filters.
1606    predicate_without_region: Option<Predicate>,
1607    /// Region partition expression restored from metadata.
1608    region_partition_expr: Option<PartitionExpr>,
1609}
1610
1611impl PredicateGroup {
1612    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1613    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1614        let mut combined_exprs = exprs.to_vec();
1615        let mut region_partition_expr = None;
1616
1617        if let Some(expr_json) = metadata.partition_expr.as_ref()
1618            && !expr_json.is_empty()
1619            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1620                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1621        {
1622            let logical_expr = expr
1623                .try_as_logical_expr()
1624                .context(InvalidPartitionExprSnafu {
1625                    expr: expr_json.clone(),
1626                })?;
1627
1628            combined_exprs.push(logical_expr);
1629            region_partition_expr = Some(expr);
1630        }
1631
1632        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1633        // Columns in the expr.
1634        let mut columns = HashSet::new();
1635        for expr in &combined_exprs {
1636            columns.clear();
1637            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1638                continue;
1639            };
1640            time_filters.push(filter);
1641        }
1642        let time_filters = if time_filters.is_empty() {
1643            None
1644        } else {
1645            Some(Arc::new(time_filters))
1646        };
1647
1648        let predicate_all = if combined_exprs.is_empty() {
1649            None
1650        } else {
1651            Some(Predicate::new(combined_exprs))
1652        };
1653        let predicate_without_region = if exprs.is_empty() {
1654            None
1655        } else {
1656            Some(Predicate::new(exprs.to_vec()))
1657        };
1658
1659        Ok(Self {
1660            time_filters,
1661            predicate_all,
1662            predicate_without_region,
1663            region_partition_expr,
1664        })
1665    }
1666
1667    /// Returns time filters.
1668    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1669        self.time_filters.clone()
1670    }
1671
1672    /// Returns predicate of all exprs (including region partition expr if present).
1673    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1674        self.predicate_all.as_ref()
1675    }
1676
1677    /// Returns predicate that excludes region partition expr.
1678    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1679        self.predicate_without_region.as_ref()
1680    }
1681
1682    /// Returns the region partition expr from metadata, if any.
1683    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1684        self.region_partition_expr.as_ref()
1685    }
1686
1687    fn expr_to_filter(
1688        expr: &Expr,
1689        metadata: &RegionMetadata,
1690        columns: &mut HashSet<Column>,
1691    ) -> Option<SimpleFilterEvaluator> {
1692        columns.clear();
1693        // `expr_to_columns` won't return error.
1694        // We still ignore these expressions for safety.
1695        expr_to_columns(expr, columns).ok()?;
1696        if columns.len() > 1 {
1697            // Simple filter doesn't support multiple columns.
1698            return None;
1699        }
1700        let column = columns.iter().next()?;
1701        let column_meta = metadata.column_by_name(&column.name)?;
1702        if column_meta.semantic_type == SemanticType::Timestamp {
1703            SimpleFilterEvaluator::try_new(expr)
1704        } else {
1705            None
1706        }
1707    }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712    use std::sync::Arc;
1713
1714    use datafusion_expr::{col, lit};
1715    use store_api::storage::ScanRequest;
1716
1717    use super::*;
1718    use crate::memtable::time_partition::TimePartitions;
1719    use crate::region::version::VersionBuilder;
1720    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1721    use crate::test_util::scheduler_util::SchedulerEnv;
1722
1723    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1724        let mutable = Arc::new(TimePartitions::new(
1725            metadata.clone(),
1726            Arc::new(EmptyMemtableBuilder::default()),
1727            0,
1728            None,
1729        ));
1730        Arc::new(VersionBuilder::new(metadata, mutable).build())
1731    }
1732
1733    #[tokio::test]
1734    async fn test_build_read_column_ids_includes_filters() {
1735        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1736        let version = new_version(metadata.clone());
1737        let env = SchedulerEnv::new().await;
1738        let request = ScanRequest {
1739            projection: Some(vec![4]),
1740            filters: vec![
1741                col("v0").gt(lit(1)),
1742                col("ts").gt(lit(0)),
1743                col("k0").eq(lit("foo")),
1744            ],
1745            ..Default::default()
1746        };
1747        let scan_region = ScanRegion::new(
1748            version,
1749            env.access_layer.clone(),
1750            request,
1751            CacheStrategy::Disabled,
1752        );
1753        let predicate =
1754            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1755        let projection = scan_region.request.projection.as_ref().unwrap();
1756        let read_ids = scan_region
1757            .build_read_column_ids(projection, &predicate)
1758            .unwrap();
1759        assert_eq!(vec![4, 0, 2, 3], read_ids);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_build_read_column_ids_empty_projection() {
1764        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1765        let version = new_version(metadata.clone());
1766        let env = SchedulerEnv::new().await;
1767        let request = ScanRequest {
1768            projection: Some(vec![]),
1769            ..Default::default()
1770        };
1771        let scan_region = ScanRegion::new(
1772            version,
1773            env.access_layer.clone(),
1774            request,
1775            CacheStrategy::Disabled,
1776        );
1777        let predicate =
1778            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1779        let projection = scan_region.request.projection.as_ref().unwrap();
1780        let read_ids = scan_region
1781            .build_read_column_ids(projection, &predicate)
1782            .unwrap();
1783        // Empty projection should still read the time index column (id 2 in this test schema).
1784        assert_eq!(vec![2], read_ids);
1785    }
1786
1787    #[tokio::test]
1788    async fn test_build_read_column_ids_keeps_projection_order() {
1789        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1790        let version = new_version(metadata.clone());
1791        let env = SchedulerEnv::new().await;
1792        let request = ScanRequest {
1793            projection: Some(vec![4, 1]),
1794            filters: vec![col("v0").gt(lit(1))],
1795            ..Default::default()
1796        };
1797        let scan_region = ScanRegion::new(
1798            version,
1799            env.access_layer.clone(),
1800            request,
1801            CacheStrategy::Disabled,
1802        );
1803        let predicate =
1804            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1805        let projection = scan_region.request.projection.as_ref().unwrap();
1806        let read_ids = scan_region
1807            .build_read_column_ids(projection, &predicate)
1808            .unwrap();
1809        // Projection order preserved, extra columns appended in schema order.
1810        assert_eq!(vec![4, 1, 3], read_ids);
1811    }
1812}