mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::ResultExt;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40    RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78/// Parallel scan channel size for flat format.
79const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83/// A scanner scans a region and returns a [SendableRecordBatchStream].
84pub(crate) enum Scanner {
85    /// Sequential scan.
86    Seq(SeqScan),
87    /// Unordered scan.
88    Unordered(UnorderedScan),
89    /// Per-series scan.
90    Series(SeriesScan),
91}
92
93impl Scanner {
94    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
95    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97        match self {
98            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100            Scanner::Series(series_scan) => series_scan.build_stream().await,
101        }
102    }
103
104    /// Create a stream of [`Batch`] by this scanner.
105    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106        match self {
107            Scanner::Seq(x) => x.scan_all_partitions(),
108            Scanner::Unordered(x) => x.scan_all_partitions(),
109            Scanner::Series(x) => x.scan_all_partitions(),
110        }
111    }
112}
113
114#[cfg(test)]
115impl Scanner {
116    /// Returns number of files to scan.
117    pub(crate) fn num_files(&self) -> usize {
118        match self {
119            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121            Scanner::Series(series_scan) => series_scan.input().num_files(),
122        }
123    }
124
125    /// Returns number of memtables to scan.
126    pub(crate) fn num_memtables(&self) -> usize {
127        match self {
128            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131        }
132    }
133
134    /// Returns SST file ids to scan.
135    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136        match self {
137            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139            Scanner::Series(series_scan) => series_scan.input().file_ids(),
140        }
141    }
142
143    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144        match self {
145            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147            Scanner::Series(series_scan) => series_scan.input().index_ids(),
148        }
149    }
150
151    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
152    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153        use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155        let request = PrepareRequest::default().with_target_partitions(target_partitions);
156        match self {
157            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160        }
161    }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165/// Helper to scans a region by [ScanRequest].
166///
167/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
168/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
169///
170/// ```mermaid
171/// classDiagram
172/// class ScanRegion {
173///     -VersionRef version
174///     -ScanRequest request
175///     ~scanner() Scanner
176///     ~seq_scan() SeqScan
177/// }
178/// class Scanner {
179///     <<enumeration>>
180///     SeqScan
181///     UnorderedScan
182///     +scan() SendableRecordBatchStream
183/// }
184/// class SeqScan {
185///     -ScanInput input
186///     +build() SendableRecordBatchStream
187/// }
188/// class UnorderedScan {
189///     -ScanInput input
190///     +build() SendableRecordBatchStream
191/// }
192/// class ScanInput {
193///     -ProjectionMapper mapper
194///     -Option~TimeRange~ time_range
195///     -Option~Predicate~ predicate
196///     -Vec~MemtableRef~ memtables
197///     -Vec~FileHandle~ files
198/// }
199/// class ProjectionMapper {
200///     ~output_schema() SchemaRef
201///     ~convert(Batch) RecordBatch
202/// }
203/// ScanRegion -- Scanner
204/// ScanRegion o-- ScanRequest
205/// Scanner o-- SeqScan
206/// Scanner o-- UnorderedScan
207/// SeqScan o-- ScanInput
208/// UnorderedScan o-- ScanInput
209/// Scanner -- SendableRecordBatchStream
210/// ScanInput o-- ProjectionMapper
211/// SeqScan -- SendableRecordBatchStream
212/// UnorderedScan -- SendableRecordBatchStream
213/// ```
214pub(crate) struct ScanRegion {
215    /// Version of the region at scan.
216    version: VersionRef,
217    /// Access layer of the region.
218    access_layer: AccessLayerRef,
219    /// Scan request.
220    request: ScanRequest,
221    /// Cache.
222    cache_strategy: CacheStrategy,
223    /// Capacity of the channel to send data from parallel scan tasks to the main task.
224    parallel_scan_channel_size: usize,
225    /// Maximum number of SST files to scan concurrently.
226    max_concurrent_scan_files: usize,
227    /// Whether to ignore inverted index.
228    ignore_inverted_index: bool,
229    /// Whether to ignore fulltext index.
230    ignore_fulltext_index: bool,
231    /// Whether to ignore bloom filter.
232    ignore_bloom_filter: bool,
233    /// Start time of the scan task.
234    start_time: Option<Instant>,
235    /// Whether to filter out the deleted rows.
236    /// Usually true for normal read, and false for scan for compaction.
237    filter_deleted: bool,
238    #[cfg(feature = "enterprise")]
239    extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243    /// Creates a [ScanRegion].
244    pub(crate) fn new(
245        version: VersionRef,
246        access_layer: AccessLayerRef,
247        request: ScanRequest,
248        cache_strategy: CacheStrategy,
249    ) -> ScanRegion {
250        ScanRegion {
251            version,
252            access_layer,
253            request,
254            cache_strategy,
255            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257            ignore_inverted_index: false,
258            ignore_fulltext_index: false,
259            ignore_bloom_filter: false,
260            start_time: None,
261            filter_deleted: true,
262            #[cfg(feature = "enterprise")]
263            extension_range_provider: None,
264        }
265    }
266
267    /// Sets parallel scan task channel size.
268    #[must_use]
269    pub(crate) fn with_parallel_scan_channel_size(
270        mut self,
271        parallel_scan_channel_size: usize,
272    ) -> Self {
273        self.parallel_scan_channel_size = parallel_scan_channel_size;
274        self
275    }
276
277    /// Sets maximum number of SST files to scan concurrently.
278    #[must_use]
279    pub(crate) fn with_max_concurrent_scan_files(
280        mut self,
281        max_concurrent_scan_files: usize,
282    ) -> Self {
283        self.max_concurrent_scan_files = max_concurrent_scan_files;
284        self
285    }
286
287    /// Sets whether to ignore inverted index.
288    #[must_use]
289    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290        self.ignore_inverted_index = ignore;
291        self
292    }
293
294    /// Sets whether to ignore fulltext index.
295    #[must_use]
296    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297        self.ignore_fulltext_index = ignore;
298        self
299    }
300
301    /// Sets whether to ignore bloom filter.
302    #[must_use]
303    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304        self.ignore_bloom_filter = ignore;
305        self
306    }
307
308    #[must_use]
309    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310        self.start_time = Some(now);
311        self
312    }
313
314    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315        self.filter_deleted = filter_deleted;
316    }
317
318    #[cfg(feature = "enterprise")]
319    pub(crate) fn set_extension_range_provider(
320        &mut self,
321        extension_range_provider: BoxedExtensionRangeProvider,
322    ) {
323        self.extension_range_provider = Some(extension_range_provider);
324    }
325
326    /// Returns a [Scanner] to scan the region.
327    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328    pub(crate) async fn scanner(self) -> Result<Scanner> {
329        if self.use_series_scan() {
330            self.series_scan().await.map(Scanner::Series)
331        } else if self.use_unordered_scan() {
332            // If table is append only and there is no series row selector, we use unordered scan in query.
333            // We still use seq scan in compaction.
334            self.unordered_scan().await.map(Scanner::Unordered)
335        } else {
336            self.seq_scan().await.map(Scanner::Seq)
337        }
338    }
339
340    /// Returns a [RegionScanner] to scan the region.
341    #[tracing::instrument(
342        level = tracing::Level::DEBUG,
343        skip_all,
344        fields(region_id = %self.region_id())
345    )]
346    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347        if self.use_series_scan() {
348            self.series_scan()
349                .await
350                .map(|scanner| Box::new(scanner) as _)
351        } else if self.use_unordered_scan() {
352            self.unordered_scan()
353                .await
354                .map(|scanner| Box::new(scanner) as _)
355        } else {
356            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357        }
358    }
359
360    /// Scan sequentially.
361    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363        let input = self.scan_input().await?.with_compaction(false);
364        Ok(SeqScan::new(input))
365    }
366
367    /// Unordered scan.
368    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370        let input = self.scan_input().await?;
371        Ok(UnorderedScan::new(input))
372    }
373
374    /// Scans by series.
375    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377        let input = self.scan_input().await?;
378        Ok(SeriesScan::new(input))
379    }
380
381    /// Returns true if the region can use unordered scan for current request.
382    fn use_unordered_scan(&self) -> bool {
383        // We use unordered scan when:
384        // 1. The region is in append mode.
385        // 2. There is no series row selector.
386        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
387        //
388        // We still use seq scan in compaction.
389        self.version.options.append_mode
390            && self.request.series_row_selector.is_none()
391            && (self.request.distribution.is_none()
392                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393    }
394
395    /// Returns true if the region can use series scan for current request.
396    fn use_series_scan(&self) -> bool {
397        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398    }
399
400    /// Returns true if the region use flat format.
401    fn use_flat_format(&self) -> bool {
402        self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
403    }
404
405    /// Creates a scan input.
406    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407    async fn scan_input(mut self) -> Result<ScanInput> {
408        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409        let time_range = self.build_time_range_predicate();
410        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411        let flat_format = self.use_flat_format();
412
413        // The mapper always computes projected column ids as the schema of SSTs may change.
414        let mapper = match &self.request.projection {
415            Some(p) => {
416                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
417            }
418            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
419        };
420
421        let ssts = &self.version.ssts;
422        let mut files = Vec::new();
423        for level in ssts.levels() {
424            for file in level.files.values() {
425                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
426                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
427                    // If the file's sequence is None (or actually is zero), it could mean the file
428                    // is generated and added to the region "directly". In this case, its data should
429                    // be considered as fresh as the memtable. So its sequence is treated greater than
430                    // the min_sequence, whatever the value of min_sequence is. Hence the default
431                    // "true" in this arm.
432                    (Some(_), None) => true,
433                    (None, _) => true,
434                };
435
436                // Finds SST files in range.
437                if exceed_min_sequence && file_in_range(file, &time_range) {
438                    files.push(file.clone());
439                }
440                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
441                // unless the timing is too good, or the sequence number wouldn't be in file.
442                // and the batch will be filtered out by tree reader anyway.
443            }
444        }
445
446        let memtables = self.version.memtables.list_memtables();
447        // Skip empty memtables and memtables out of time range.
448        let mut mem_range_builders = Vec::new();
449        let filter_mode = pre_filter_mode(
450            self.version.options.append_mode,
451            self.version.options.merge_mode(),
452        );
453
454        for m in memtables {
455            // check if memtable is empty by reading stats.
456            let Some((start, end)) = m.stats().time_range() else {
457                continue;
458            };
459            // The time range of the memtable is inclusive.
460            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
461            if !memtable_range.intersects(&time_range) {
462                continue;
463            }
464            let ranges_in_memtable = m.ranges(
465                Some(mapper.column_ids()),
466                RangesOptions::default()
467                    .with_predicate(predicate.clone())
468                    .with_sequence(SequenceRange::new(
469                        self.request.memtable_min_sequence,
470                        self.request.memtable_max_sequence,
471                    ))
472                    .with_pre_filter_mode(filter_mode),
473            )?;
474            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
475                let stats = v.stats().clone();
476                MemRangeBuilder::new(v, stats)
477            }));
478        }
479
480        let region_id = self.region_id();
481        debug!(
482            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
483            region_id,
484            self.request,
485            time_range,
486            mem_range_builders.len(),
487            files.len(),
488            self.version.options.append_mode,
489            flat_format,
490        );
491
492        let (non_field_filters, field_filters) = self.partition_by_field_filters();
493        let inverted_index_appliers = [
494            self.build_invereted_index_applier(&non_field_filters),
495            self.build_invereted_index_applier(&field_filters),
496        ];
497        let bloom_filter_appliers = [
498            self.build_bloom_filter_applier(&non_field_filters),
499            self.build_bloom_filter_applier(&field_filters),
500        ];
501        let fulltext_index_appliers = [
502            self.build_fulltext_index_applier(&non_field_filters),
503            self.build_fulltext_index_applier(&field_filters),
504        ];
505        #[cfg(feature = "vector_index")]
506        let vector_index_applier = self.build_vector_index_applier();
507        #[cfg(feature = "vector_index")]
508        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
509            if self.request.filters.is_empty() {
510                search.k
511            } else {
512                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
513            }
514        });
515        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
516
517        if flat_format {
518            // The batch is already large enough so we use a small channel size here.
519            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
520        }
521
522        let input = ScanInput::new(self.access_layer, mapper)
523            .with_time_range(Some(time_range))
524            .with_predicate(predicate)
525            .with_memtables(mem_range_builders)
526            .with_files(files)
527            .with_cache(self.cache_strategy)
528            .with_inverted_index_appliers(inverted_index_appliers)
529            .with_bloom_filter_index_appliers(bloom_filter_appliers)
530            .with_fulltext_index_appliers(fulltext_index_appliers)
531            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
532            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
533            .with_start_time(self.start_time)
534            .with_append_mode(self.version.options.append_mode)
535            .with_filter_deleted(self.filter_deleted)
536            .with_merge_mode(self.version.options.merge_mode())
537            .with_series_row_selector(self.request.series_row_selector)
538            .with_distribution(self.request.distribution)
539            .with_flat_format(flat_format);
540        #[cfg(feature = "vector_index")]
541        let input = input
542            .with_vector_index_applier(vector_index_applier)
543            .with_vector_index_k(vector_index_k);
544
545        #[cfg(feature = "enterprise")]
546        let input = if let Some(provider) = self.extension_range_provider {
547            let ranges = provider
548                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
549                .await?;
550            debug!("Find extension ranges: {ranges:?}");
551            input.with_extension_ranges(ranges)
552        } else {
553            input
554        };
555        Ok(input)
556    }
557
558    fn region_id(&self) -> RegionId {
559        self.version.metadata.region_id
560    }
561
562    /// Build time range predicate from filters.
563    fn build_time_range_predicate(&self) -> TimestampRange {
564        let time_index = self.version.metadata.time_index_column();
565        let unit = time_index
566            .column_schema
567            .data_type
568            .as_timestamp()
569            .expect("Time index must have timestamp-compatible type")
570            .unit();
571        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
572    }
573
574    /// Partitions filters into two groups: non-field filters and field filters.
575    /// Returns `(non_field_filters, field_filters)`.
576    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
577        let field_columns = self
578            .version
579            .metadata
580            .field_columns()
581            .map(|col| &col.column_schema.name)
582            .collect::<HashSet<_>>();
583
584        let mut columns = HashSet::new();
585
586        self.request.filters.iter().cloned().partition(|expr| {
587            columns.clear();
588            // `expr_to_columns` won't return error.
589            if expr_to_columns(expr, &mut columns).is_err() {
590                // If we can't extract columns, treat it as non-field filter
591                return true;
592            }
593            // Return true for non-field filters (partition puts true cases in first vec)
594            !columns
595                .iter()
596                .any(|column| field_columns.contains(&column.name))
597        })
598    }
599
600    /// Use the latest schema to build the inverted index applier.
601    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
602        if self.ignore_inverted_index {
603            return None;
604        }
605
606        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
607        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
608
609        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
610
611        InvertedIndexApplierBuilder::new(
612            self.access_layer.table_dir().to_string(),
613            self.access_layer.path_type(),
614            self.access_layer.object_store().clone(),
615            self.version.metadata.as_ref(),
616            self.version.metadata.inverted_indexed_column_ids(
617                self.version
618                    .options
619                    .index_options
620                    .inverted_index
621                    .ignore_column_ids
622                    .iter(),
623            ),
624            self.access_layer.puffin_manager_factory().clone(),
625        )
626        .with_file_cache(file_cache)
627        .with_inverted_index_cache(inverted_index_cache)
628        .with_puffin_metadata_cache(puffin_metadata_cache)
629        .build(filters)
630        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
631        .ok()
632        .flatten()
633        .map(Arc::new)
634    }
635
636    /// Use the latest schema to build the bloom filter index applier.
637    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
638        if self.ignore_bloom_filter {
639            return None;
640        }
641
642        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
643        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
644        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
645
646        BloomFilterIndexApplierBuilder::new(
647            self.access_layer.table_dir().to_string(),
648            self.access_layer.path_type(),
649            self.access_layer.object_store().clone(),
650            self.version.metadata.as_ref(),
651            self.access_layer.puffin_manager_factory().clone(),
652        )
653        .with_file_cache(file_cache)
654        .with_bloom_filter_index_cache(bloom_filter_index_cache)
655        .with_puffin_metadata_cache(puffin_metadata_cache)
656        .build(filters)
657        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
658        .ok()
659        .flatten()
660        .map(Arc::new)
661    }
662
663    /// Use the latest schema to build the fulltext index applier.
664    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
665        if self.ignore_fulltext_index {
666            return None;
667        }
668
669        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
670        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
671        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
672        FulltextIndexApplierBuilder::new(
673            self.access_layer.table_dir().to_string(),
674            self.access_layer.path_type(),
675            self.access_layer.object_store().clone(),
676            self.access_layer.puffin_manager_factory().clone(),
677            self.version.metadata.as_ref(),
678        )
679        .with_file_cache(file_cache)
680        .with_puffin_metadata_cache(puffin_metadata_cache)
681        .with_bloom_filter_cache(bloom_filter_index_cache)
682        .build(filters)
683        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
684        .ok()
685        .flatten()
686        .map(Arc::new)
687    }
688
689    /// Build the vector index applier from vector search request.
690    #[cfg(feature = "vector_index")]
691    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
692        let vector_search = self.request.vector_search.as_ref()?;
693
694        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
695        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
696        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
697
698        let applier = VectorIndexApplier::new(
699            self.access_layer.table_dir().to_string(),
700            self.access_layer.path_type(),
701            self.access_layer.object_store().clone(),
702            self.access_layer.puffin_manager_factory().clone(),
703            vector_search.column_id,
704            vector_search.query_vector.clone(),
705            vector_search.metric,
706        )
707        .with_file_cache(file_cache)
708        .with_puffin_metadata_cache(puffin_metadata_cache)
709        .with_vector_index_cache(vector_index_cache);
710
711        Some(Arc::new(applier))
712    }
713}
714
715/// Returns true if the time range of a SST `file` matches the `predicate`.
716fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
717    if predicate == &TimestampRange::min_to_max() {
718        return true;
719    }
720    // end timestamp of a SST is inclusive.
721    let (start, end) = file.time_range();
722    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
723    file_ts_range.intersects(predicate)
724}
725
726/// Common input for different scanners.
727pub struct ScanInput {
728    /// Region SST access layer.
729    access_layer: AccessLayerRef,
730    /// Maps projected Batches to RecordBatches.
731    pub(crate) mapper: Arc<ProjectionMapper>,
732    /// Time range filter for time index.
733    time_range: Option<TimestampRange>,
734    /// Predicate to push down.
735    pub(crate) predicate: PredicateGroup,
736    /// Region partition expr applied at read time.
737    region_partition_expr: Option<PartitionExpr>,
738    /// Memtable range builders for memtables in the time range..
739    pub(crate) memtables: Vec<MemRangeBuilder>,
740    /// Handles to SST files to scan.
741    pub(crate) files: Vec<FileHandle>,
742    /// Cache.
743    pub(crate) cache_strategy: CacheStrategy,
744    /// Ignores file not found error.
745    ignore_file_not_found: bool,
746    /// Capacity of the channel to send data from parallel scan tasks to the main task.
747    pub(crate) parallel_scan_channel_size: usize,
748    /// Maximum number of SST files to scan concurrently.
749    pub(crate) max_concurrent_scan_files: usize,
750    /// Index appliers.
751    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
752    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
753    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
754    /// Vector index applier for KNN search.
755    #[cfg(feature = "vector_index")]
756    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
757    /// Over-fetched k for vector index scan.
758    #[cfg(feature = "vector_index")]
759    pub(crate) vector_index_k: Option<usize>,
760    /// Start time of the query.
761    pub(crate) query_start: Option<Instant>,
762    /// The region is using append mode.
763    pub(crate) append_mode: bool,
764    /// Whether to remove deletion markers.
765    pub(crate) filter_deleted: bool,
766    /// Mode to merge duplicate rows.
767    pub(crate) merge_mode: MergeMode,
768    /// Hint to select rows from time series.
769    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
770    /// Hint for the required distribution of the scanner.
771    pub(crate) distribution: Option<TimeSeriesDistribution>,
772    /// Whether to use flat format.
773    pub(crate) flat_format: bool,
774    /// Whether this scan is for compaction.
775    pub(crate) compaction: bool,
776    #[cfg(feature = "enterprise")]
777    extension_ranges: Vec<BoxedExtensionRange>,
778}
779
780impl ScanInput {
781    /// Creates a new [ScanInput].
782    #[must_use]
783    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
784        ScanInput {
785            access_layer,
786            mapper: Arc::new(mapper),
787            time_range: None,
788            predicate: PredicateGroup::default(),
789            region_partition_expr: None,
790            memtables: Vec::new(),
791            files: Vec::new(),
792            cache_strategy: CacheStrategy::Disabled,
793            ignore_file_not_found: false,
794            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
795            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
796            inverted_index_appliers: [None, None],
797            bloom_filter_index_appliers: [None, None],
798            fulltext_index_appliers: [None, None],
799            #[cfg(feature = "vector_index")]
800            vector_index_applier: None,
801            #[cfg(feature = "vector_index")]
802            vector_index_k: None,
803            query_start: None,
804            append_mode: false,
805            filter_deleted: true,
806            merge_mode: MergeMode::default(),
807            series_row_selector: None,
808            distribution: None,
809            flat_format: false,
810            compaction: false,
811            #[cfg(feature = "enterprise")]
812            extension_ranges: Vec::new(),
813        }
814    }
815
816    /// Sets time range filter for time index.
817    #[must_use]
818    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
819        self.time_range = time_range;
820        self
821    }
822
823    /// Sets predicate to push down.
824    #[must_use]
825    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
826        self.region_partition_expr = predicate.region_partition_expr().cloned();
827        self.predicate = predicate;
828        self
829    }
830
831    /// Sets memtable range builders.
832    #[must_use]
833    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
834        self.memtables = memtables;
835        self
836    }
837
838    /// Sets files to read.
839    #[must_use]
840    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
841        self.files = files;
842        self
843    }
844
845    /// Sets cache for this query.
846    #[must_use]
847    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
848        self.cache_strategy = cache;
849        self
850    }
851
852    /// Ignores file not found error.
853    #[must_use]
854    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
855        self.ignore_file_not_found = ignore;
856        self
857    }
858
859    /// Sets scan task channel size.
860    #[must_use]
861    pub(crate) fn with_parallel_scan_channel_size(
862        mut self,
863        parallel_scan_channel_size: usize,
864    ) -> Self {
865        self.parallel_scan_channel_size = parallel_scan_channel_size;
866        self
867    }
868
869    /// Sets maximum number of SST files to scan concurrently.
870    #[must_use]
871    pub(crate) fn with_max_concurrent_scan_files(
872        mut self,
873        max_concurrent_scan_files: usize,
874    ) -> Self {
875        self.max_concurrent_scan_files = max_concurrent_scan_files;
876        self
877    }
878
879    /// Sets inverted index appliers.
880    #[must_use]
881    pub(crate) fn with_inverted_index_appliers(
882        mut self,
883        appliers: [Option<InvertedIndexApplierRef>; 2],
884    ) -> Self {
885        self.inverted_index_appliers = appliers;
886        self
887    }
888
889    /// Sets bloom filter appliers.
890    #[must_use]
891    pub(crate) fn with_bloom_filter_index_appliers(
892        mut self,
893        appliers: [Option<BloomFilterIndexApplierRef>; 2],
894    ) -> Self {
895        self.bloom_filter_index_appliers = appliers;
896        self
897    }
898
899    /// Sets fulltext index appliers.
900    #[must_use]
901    pub(crate) fn with_fulltext_index_appliers(
902        mut self,
903        appliers: [Option<FulltextIndexApplierRef>; 2],
904    ) -> Self {
905        self.fulltext_index_appliers = appliers;
906        self
907    }
908
909    /// Sets vector index applier for KNN search.
910    #[cfg(feature = "vector_index")]
911    #[must_use]
912    pub(crate) fn with_vector_index_applier(
913        mut self,
914        applier: Option<VectorIndexApplierRef>,
915    ) -> Self {
916        self.vector_index_applier = applier;
917        self
918    }
919
920    /// Sets over-fetched k for vector index scan.
921    #[cfg(feature = "vector_index")]
922    #[must_use]
923    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
924        self.vector_index_k = k;
925        self
926    }
927
928    /// Sets start time of the query.
929    #[must_use]
930    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
931        self.query_start = now;
932        self
933    }
934
935    #[must_use]
936    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
937        self.append_mode = is_append_mode;
938        self
939    }
940
941    /// Sets whether to remove deletion markers during scan.
942    #[must_use]
943    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
944        self.filter_deleted = filter_deleted;
945        self
946    }
947
948    /// Sets the merge mode.
949    #[must_use]
950    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
951        self.merge_mode = merge_mode;
952        self
953    }
954
955    /// Sets the distribution hint.
956    #[must_use]
957    pub(crate) fn with_distribution(
958        mut self,
959        distribution: Option<TimeSeriesDistribution>,
960    ) -> Self {
961        self.distribution = distribution;
962        self
963    }
964
965    /// Sets the time series row selector.
966    #[must_use]
967    pub(crate) fn with_series_row_selector(
968        mut self,
969        series_row_selector: Option<TimeSeriesRowSelector>,
970    ) -> Self {
971        self.series_row_selector = series_row_selector;
972        self
973    }
974
975    /// Sets whether to use flat format.
976    #[must_use]
977    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
978        self.flat_format = flat_format;
979        self
980    }
981
982    /// Sets whether this scan is for compaction.
983    #[must_use]
984    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
985        self.compaction = compaction;
986        self
987    }
988
989    /// Scans sources in parallel.
990    ///
991    /// # Panics if the input doesn't allow parallel scan.
992    #[tracing::instrument(
993        skip(self, sources, semaphore),
994        fields(
995            region_id = %self.region_metadata().region_id,
996            source_count = sources.len()
997        )
998    )]
999    pub(crate) fn create_parallel_sources(
1000        &self,
1001        sources: Vec<Source>,
1002        semaphore: Arc<Semaphore>,
1003    ) -> Result<Vec<Source>> {
1004        if sources.len() <= 1 {
1005            return Ok(sources);
1006        }
1007
1008        // Spawn a task for each source.
1009        let sources = sources
1010            .into_iter()
1011            .map(|source| {
1012                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1013                self.spawn_scan_task(source, semaphore.clone(), sender);
1014                let stream = Box::pin(ReceiverStream::new(receiver));
1015                Source::Stream(stream)
1016            })
1017            .collect();
1018        Ok(sources)
1019    }
1020
1021    /// Builds memtable ranges to scan by `index`.
1022    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1023        let memtable = &self.memtables[index.index];
1024        let mut ranges = SmallVec::new();
1025        memtable.build_ranges(index.row_group_index, &mut ranges);
1026        ranges
1027    }
1028
1029    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1030        if self.should_skip_region_partition(file) {
1031            self.predicate.predicate_without_region().cloned()
1032        } else {
1033            self.predicate.predicate().cloned()
1034        }
1035    }
1036
1037    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1038        match (
1039            self.region_partition_expr.as_ref(),
1040            file.meta_ref().partition_expr.as_ref(),
1041        ) {
1042            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1043            _ => false,
1044        }
1045    }
1046
1047    /// Prunes a file to scan and returns the builder to build readers.
1048    #[tracing::instrument(
1049        skip_all,
1050        fields(
1051            region_id = %self.region_metadata().region_id,
1052            file_id = %file.file_id()
1053        )
1054    )]
1055    pub async fn prune_file(
1056        &self,
1057        file: &FileHandle,
1058        reader_metrics: &mut ReaderMetrics,
1059    ) -> Result<FileRangeBuilder> {
1060        let predicate = self.predicate_for_file(file);
1061        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1062        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1063        let reader = self
1064            .access_layer
1065            .read_sst(file.clone())
1066            .predicate(predicate)
1067            .projection(Some(self.mapper.column_ids().to_vec()))
1068            .cache(self.cache_strategy.clone())
1069            .inverted_index_appliers(self.inverted_index_appliers.clone())
1070            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1071            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1072        #[cfg(feature = "vector_index")]
1073        let reader = {
1074            let mut reader = reader;
1075            reader =
1076                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1077            reader
1078        };
1079        let res = reader
1080            .expected_metadata(Some(self.mapper.metadata().clone()))
1081            .flat_format(self.flat_format)
1082            .compaction(self.compaction)
1083            .pre_filter_mode(filter_mode)
1084            .decode_primary_key_values(decode_pk_values)
1085            .build_reader_input(reader_metrics)
1086            .await;
1087        let (mut file_range_ctx, selection) = match res {
1088            Ok(x) => x,
1089            Err(e) => {
1090                if e.is_object_not_found() && self.ignore_file_not_found {
1091                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1092                    return Ok(FileRangeBuilder::default());
1093                } else {
1094                    return Err(e);
1095                }
1096            }
1097        };
1098
1099        let need_compat = !compat::has_same_columns_and_pk_encoding(
1100            self.mapper.metadata(),
1101            file_range_ctx.read_format().metadata(),
1102        );
1103        if need_compat {
1104            // They have different schema. We need to adapt the batch first so the
1105            // mapper can convert it.
1106            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1107                let mapper = self.mapper.as_flat().unwrap();
1108                FlatCompatBatch::try_new(
1109                    mapper,
1110                    flat_format.metadata(),
1111                    flat_format.format_projection(),
1112                    self.compaction,
1113                )?
1114                .map(CompatBatch::Flat)
1115            } else {
1116                let compact_batch = PrimaryKeyCompatBatch::new(
1117                    &self.mapper,
1118                    file_range_ctx.read_format().metadata().clone(),
1119                )?;
1120                Some(CompatBatch::PrimaryKey(compact_batch))
1121            };
1122            file_range_ctx.set_compat_batch(compat);
1123        }
1124        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1125    }
1126
1127    /// Scans the input source in another task and sends batches to the sender.
1128    #[tracing::instrument(
1129        skip(self, input, semaphore, sender),
1130        fields(region_id = %self.region_metadata().region_id)
1131    )]
1132    pub(crate) fn spawn_scan_task(
1133        &self,
1134        mut input: Source,
1135        semaphore: Arc<Semaphore>,
1136        sender: mpsc::Sender<Result<Batch>>,
1137    ) {
1138        let region_id = self.region_metadata().region_id;
1139        let span = tracing::info_span!(
1140            "ScanInput::parallel_scan_task",
1141            region_id = %region_id,
1142            stream_kind = "batch"
1143        );
1144        common_runtime::spawn_global(
1145            async move {
1146                loop {
1147                    // We release the permit before sending result to avoid the task waiting on
1148                    // the channel with the permit held.
1149                    let maybe_batch = {
1150                        // Safety: We never close the semaphore.
1151                        let _permit = semaphore.acquire().await.unwrap();
1152                        input.next_batch().await
1153                    };
1154                    match maybe_batch {
1155                        Ok(Some(batch)) => {
1156                            let _ = sender.send(Ok(batch)).await;
1157                        }
1158                        Ok(None) => break,
1159                        Err(e) => {
1160                            let _ = sender.send(Err(e)).await;
1161                            break;
1162                        }
1163                    }
1164                }
1165            }
1166            .instrument(span),
1167        );
1168    }
1169
1170    /// Scans flat sources (RecordBatch streams) in parallel.
1171    ///
1172    /// # Panics if the input doesn't allow parallel scan.
1173    #[tracing::instrument(
1174        skip(self, sources, semaphore),
1175        fields(
1176            region_id = %self.region_metadata().region_id,
1177            source_count = sources.len()
1178        )
1179    )]
1180    pub(crate) fn create_parallel_flat_sources(
1181        &self,
1182        sources: Vec<BoxedRecordBatchStream>,
1183        semaphore: Arc<Semaphore>,
1184    ) -> Result<Vec<BoxedRecordBatchStream>> {
1185        if sources.len() <= 1 {
1186            return Ok(sources);
1187        }
1188
1189        // Spawn a task for each source.
1190        let sources = sources
1191            .into_iter()
1192            .map(|source| {
1193                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1194                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1195                let stream = Box::pin(ReceiverStream::new(receiver));
1196                Box::pin(stream) as _
1197            })
1198            .collect();
1199        Ok(sources)
1200    }
1201
1202    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1203    #[tracing::instrument(
1204        skip(self, input, semaphore, sender),
1205        fields(region_id = %self.region_metadata().region_id)
1206    )]
1207    pub(crate) fn spawn_flat_scan_task(
1208        &self,
1209        mut input: BoxedRecordBatchStream,
1210        semaphore: Arc<Semaphore>,
1211        sender: mpsc::Sender<Result<RecordBatch>>,
1212    ) {
1213        let region_id = self.region_metadata().region_id;
1214        let span = tracing::info_span!(
1215            "ScanInput::parallel_scan_task",
1216            region_id = %region_id,
1217            stream_kind = "flat"
1218        );
1219        common_runtime::spawn_global(
1220            async move {
1221                loop {
1222                    // We release the permit before sending result to avoid the task waiting on
1223                    // the channel with the permit held.
1224                    let maybe_batch = {
1225                        // Safety: We never close the semaphore.
1226                        let _permit = semaphore.acquire().await.unwrap();
1227                        input.next().await
1228                    };
1229                    match maybe_batch {
1230                        Some(Ok(batch)) => {
1231                            let _ = sender.send(Ok(batch)).await;
1232                        }
1233                        Some(Err(e)) => {
1234                            let _ = sender.send(Err(e)).await;
1235                            break;
1236                        }
1237                        None => break,
1238                    }
1239                }
1240            }
1241            .instrument(span),
1242        );
1243    }
1244
1245    pub(crate) fn total_rows(&self) -> usize {
1246        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1247        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1248
1249        let rows = rows_in_files + rows_in_memtables;
1250        #[cfg(feature = "enterprise")]
1251        let rows = rows
1252            + self
1253                .extension_ranges
1254                .iter()
1255                .map(|x| x.num_rows())
1256                .sum::<u64>() as usize;
1257        rows
1258    }
1259
1260    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1261        &self.predicate
1262    }
1263
1264    /// Returns number of memtables to scan.
1265    pub(crate) fn num_memtables(&self) -> usize {
1266        self.memtables.len()
1267    }
1268
1269    /// Returns number of SST files to scan.
1270    pub(crate) fn num_files(&self) -> usize {
1271        self.files.len()
1272    }
1273
1274    /// Gets the file handle from a row group index.
1275    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1276        let file_index = index.index - self.num_memtables();
1277        &self.files[file_index]
1278    }
1279
1280    pub fn region_metadata(&self) -> &RegionMetadataRef {
1281        self.mapper.metadata()
1282    }
1283}
1284
1285#[cfg(feature = "enterprise")]
1286impl ScanInput {
1287    #[must_use]
1288    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1289        Self {
1290            extension_ranges,
1291            ..self
1292        }
1293    }
1294
1295    #[cfg(feature = "enterprise")]
1296    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1297        &self.extension_ranges
1298    }
1299
1300    /// Get a boxed [ExtensionRange] by the index in all ranges.
1301    #[cfg(feature = "enterprise")]
1302    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1303        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1304    }
1305}
1306
1307#[cfg(test)]
1308impl ScanInput {
1309    /// Returns SST file ids to scan.
1310    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1311        self.files.iter().map(|file| file.file_id()).collect()
1312    }
1313
1314    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1315        self.files.iter().map(|file| file.index_id()).collect()
1316    }
1317}
1318
1319fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1320    if append_mode {
1321        return PreFilterMode::All;
1322    }
1323
1324    match merge_mode {
1325        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1326        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1327    }
1328}
1329
1330/// Context shared by different streams from a scanner.
1331/// It contains the input and ranges to scan.
1332pub struct StreamContext {
1333    /// Input memtables and files.
1334    pub input: ScanInput,
1335    /// Metadata for partition ranges.
1336    pub(crate) ranges: Vec<RangeMeta>,
1337
1338    // Metrics:
1339    /// The start time of the query.
1340    pub(crate) query_start: Instant,
1341}
1342
1343impl StreamContext {
1344    /// Creates a new [StreamContext] for [SeqScan].
1345    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1346        let query_start = input.query_start.unwrap_or_else(Instant::now);
1347        let ranges = RangeMeta::seq_scan_ranges(&input);
1348        READ_SST_COUNT.observe(input.num_files() as f64);
1349
1350        Self {
1351            input,
1352            ranges,
1353            query_start,
1354        }
1355    }
1356
1357    /// Creates a new [StreamContext] for [UnorderedScan].
1358    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1359        let query_start = input.query_start.unwrap_or_else(Instant::now);
1360        let ranges = RangeMeta::unordered_scan_ranges(&input);
1361        READ_SST_COUNT.observe(input.num_files() as f64);
1362
1363        Self {
1364            input,
1365            ranges,
1366            query_start,
1367        }
1368    }
1369
1370    /// Returns true if the index refers to a memtable.
1371    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1372        self.input.num_memtables() > index.index
1373    }
1374
1375    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1376        !self.is_mem_range_index(index)
1377            && index.index < self.input.num_files() + self.input.num_memtables()
1378    }
1379
1380    /// Retrieves the partition ranges.
1381    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1382        self.ranges
1383            .iter()
1384            .enumerate()
1385            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1386            .collect()
1387    }
1388
1389    /// Format the context for explain.
1390    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1391        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1392        for range_meta in &self.ranges {
1393            for idx in &range_meta.row_group_indices {
1394                if self.is_mem_range_index(*idx) {
1395                    num_mem_ranges += 1;
1396                } else if self.is_file_range_index(*idx) {
1397                    num_file_ranges += 1;
1398                } else {
1399                    num_other_ranges += 1;
1400                }
1401            }
1402        }
1403        if verbose {
1404            write!(f, "{{")?;
1405        }
1406        write!(
1407            f,
1408            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1409            self.ranges.len(),
1410            num_mem_ranges,
1411            self.input.num_files(),
1412            num_file_ranges,
1413        )?;
1414        if num_other_ranges > 0 {
1415            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1416        }
1417        write!(f, "}}")?;
1418
1419        if let Some(selector) = &self.input.series_row_selector {
1420            write!(f, ", \"selector\":\"{}\"", selector)?;
1421        }
1422        if let Some(distribution) = &self.input.distribution {
1423            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1424        }
1425
1426        if verbose {
1427            self.format_verbose_content(f)?;
1428        }
1429
1430        Ok(())
1431    }
1432
1433    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1434        struct FileWrapper<'a> {
1435            file: &'a FileHandle,
1436        }
1437
1438        impl fmt::Debug for FileWrapper<'_> {
1439            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1440                let (start, end) = self.file.time_range();
1441                write!(
1442                    f,
1443                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1444                    self.file.file_id(),
1445                    start.value(),
1446                    start.unit(),
1447                    end.value(),
1448                    end.unit(),
1449                    self.file.num_rows(),
1450                    self.file.size(),
1451                    self.file.index_size()
1452                )
1453            }
1454        }
1455
1456        struct InputWrapper<'a> {
1457            input: &'a ScanInput,
1458        }
1459
1460        #[cfg(feature = "enterprise")]
1461        impl InputWrapper<'_> {
1462            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1463                if self.input.extension_ranges.is_empty() {
1464                    return Ok(());
1465                }
1466
1467                let mut delimiter = "";
1468                write!(f, ", extension_ranges: [")?;
1469                for range in self.input.extension_ranges() {
1470                    write!(f, "{}{:?}", delimiter, range)?;
1471                    delimiter = ", ";
1472                }
1473                write!(f, "]")?;
1474                Ok(())
1475            }
1476        }
1477
1478        impl fmt::Debug for InputWrapper<'_> {
1479            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1480                let output_schema = self.input.mapper.output_schema();
1481                if !output_schema.is_empty() {
1482                    let names: Vec<_> = output_schema
1483                        .column_schemas()
1484                        .iter()
1485                        .map(|col| &col.name)
1486                        .collect();
1487                    write!(f, ", \"projection\": {:?}", names)?;
1488                }
1489                if let Some(predicate) = &self.input.predicate.predicate()
1490                    && !predicate.exprs().is_empty()
1491                {
1492                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1493                    write!(f, ", \"filters\": {:?}", exprs)?;
1494                }
1495                if !self.input.files.is_empty() {
1496                    write!(f, ", \"files\": ")?;
1497                    f.debug_list()
1498                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1499                        .finish()?;
1500                }
1501                write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1502
1503                #[cfg(feature = "enterprise")]
1504                self.format_extension_ranges(f)?;
1505
1506                Ok(())
1507            }
1508        }
1509
1510        write!(f, "{:?}", InputWrapper { input: &self.input })
1511    }
1512}
1513
1514/// Predicates to evaluate.
1515/// It only keeps filters that [SimpleFilterEvaluator] supports.
1516#[derive(Clone, Default)]
1517pub struct PredicateGroup {
1518    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1519    /// Predicate that includes request filters and region partition expr (if any).
1520    predicate_all: Option<Predicate>,
1521    /// Predicate that only includes request filters.
1522    predicate_without_region: Option<Predicate>,
1523    /// Region partition expression restored from metadata.
1524    region_partition_expr: Option<PartitionExpr>,
1525}
1526
1527impl PredicateGroup {
1528    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1529    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1530        let mut combined_exprs = exprs.to_vec();
1531        let mut region_partition_expr = None;
1532
1533        if let Some(expr_json) = metadata.partition_expr.as_ref()
1534            && !expr_json.is_empty()
1535            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1536                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1537        {
1538            let logical_expr = expr
1539                .try_as_logical_expr()
1540                .context(InvalidPartitionExprSnafu {
1541                    expr: expr_json.clone(),
1542                })?;
1543
1544            combined_exprs.push(logical_expr);
1545            region_partition_expr = Some(expr);
1546        }
1547
1548        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1549        // Columns in the expr.
1550        let mut columns = HashSet::new();
1551        for expr in &combined_exprs {
1552            columns.clear();
1553            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1554                continue;
1555            };
1556            time_filters.push(filter);
1557        }
1558        let time_filters = if time_filters.is_empty() {
1559            None
1560        } else {
1561            Some(Arc::new(time_filters))
1562        };
1563
1564        let predicate_all = if combined_exprs.is_empty() {
1565            None
1566        } else {
1567            Some(Predicate::new(combined_exprs))
1568        };
1569        let predicate_without_region = if exprs.is_empty() {
1570            None
1571        } else {
1572            Some(Predicate::new(exprs.to_vec()))
1573        };
1574
1575        Ok(Self {
1576            time_filters,
1577            predicate_all,
1578            predicate_without_region,
1579            region_partition_expr,
1580        })
1581    }
1582
1583    /// Returns time filters.
1584    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1585        self.time_filters.clone()
1586    }
1587
1588    /// Returns predicate of all exprs (including region partition expr if present).
1589    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1590        self.predicate_all.as_ref()
1591    }
1592
1593    /// Returns predicate that excludes region partition expr.
1594    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1595        self.predicate_without_region.as_ref()
1596    }
1597
1598    /// Returns the region partition expr from metadata, if any.
1599    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1600        self.region_partition_expr.as_ref()
1601    }
1602
1603    fn expr_to_filter(
1604        expr: &Expr,
1605        metadata: &RegionMetadata,
1606        columns: &mut HashSet<Column>,
1607    ) -> Option<SimpleFilterEvaluator> {
1608        columns.clear();
1609        // `expr_to_columns` won't return error.
1610        // We still ignore these expressions for safety.
1611        expr_to_columns(expr, columns).ok()?;
1612        if columns.len() > 1 {
1613            // Simple filter doesn't support multiple columns.
1614            return None;
1615        }
1616        let column = columns.iter().next()?;
1617        let column_meta = metadata.column_by_name(&column.name)?;
1618        if column_meta.semantic_type == SemanticType::Timestamp {
1619            SimpleFilterEvaluator::try_new(expr)
1620        } else {
1621            None
1622        }
1623    }
1624}