mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39    RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::FormatType;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
67};
68use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
69use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
70use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
71use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
72use crate::sst::parquet::file_range::PreFilterMode;
73use crate::sst::parquet::reader::ReaderMetrics;
74
75/// Parallel scan channel size for flat format.
76const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
77
78/// A scanner scans a region and returns a [SendableRecordBatchStream].
79pub(crate) enum Scanner {
80    /// Sequential scan.
81    Seq(SeqScan),
82    /// Unordered scan.
83    Unordered(UnorderedScan),
84    /// Per-series scan.
85    Series(SeriesScan),
86}
87
88impl Scanner {
89    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
90    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
91    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
92        match self {
93            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
94            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
95            Scanner::Series(series_scan) => series_scan.build_stream().await,
96        }
97    }
98
99    /// Create a stream of [`Batch`] by this scanner.
100    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
101        match self {
102            Scanner::Seq(x) => x.scan_all_partitions(),
103            Scanner::Unordered(x) => x.scan_all_partitions(),
104            Scanner::Series(x) => x.scan_all_partitions(),
105        }
106    }
107}
108
109#[cfg(test)]
110impl Scanner {
111    /// Returns number of files to scan.
112    pub(crate) fn num_files(&self) -> usize {
113        match self {
114            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
115            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
116            Scanner::Series(series_scan) => series_scan.input().num_files(),
117        }
118    }
119
120    /// Returns number of memtables to scan.
121    pub(crate) fn num_memtables(&self) -> usize {
122        match self {
123            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
124            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
125            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
126        }
127    }
128
129    /// Returns SST file ids to scan.
130    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
131        match self {
132            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
133            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
134            Scanner::Series(series_scan) => series_scan.input().file_ids(),
135        }
136    }
137
138    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
139        match self {
140            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
141            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
142            Scanner::Series(series_scan) => series_scan.input().index_ids(),
143        }
144    }
145
146    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
147    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
148        use store_api::region_engine::{PrepareRequest, RegionScanner};
149
150        let request = PrepareRequest::default().with_target_partitions(target_partitions);
151        match self {
152            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
153            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
154            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
155        }
156    }
157}
158
159#[cfg_attr(doc, aquamarine::aquamarine)]
160/// Helper to scans a region by [ScanRequest].
161///
162/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
163/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
164///
165/// ```mermaid
166/// classDiagram
167/// class ScanRegion {
168///     -VersionRef version
169///     -ScanRequest request
170///     ~scanner() Scanner
171///     ~seq_scan() SeqScan
172/// }
173/// class Scanner {
174///     <<enumeration>>
175///     SeqScan
176///     UnorderedScan
177///     +scan() SendableRecordBatchStream
178/// }
179/// class SeqScan {
180///     -ScanInput input
181///     +build() SendableRecordBatchStream
182/// }
183/// class UnorderedScan {
184///     -ScanInput input
185///     +build() SendableRecordBatchStream
186/// }
187/// class ScanInput {
188///     -ProjectionMapper mapper
189///     -Option~TimeRange~ time_range
190///     -Option~Predicate~ predicate
191///     -Vec~MemtableRef~ memtables
192///     -Vec~FileHandle~ files
193/// }
194/// class ProjectionMapper {
195///     ~output_schema() SchemaRef
196///     ~convert(Batch) RecordBatch
197/// }
198/// ScanRegion -- Scanner
199/// ScanRegion o-- ScanRequest
200/// Scanner o-- SeqScan
201/// Scanner o-- UnorderedScan
202/// SeqScan o-- ScanInput
203/// UnorderedScan o-- ScanInput
204/// Scanner -- SendableRecordBatchStream
205/// ScanInput o-- ProjectionMapper
206/// SeqScan -- SendableRecordBatchStream
207/// UnorderedScan -- SendableRecordBatchStream
208/// ```
209pub(crate) struct ScanRegion {
210    /// Version of the region at scan.
211    version: VersionRef,
212    /// Access layer of the region.
213    access_layer: AccessLayerRef,
214    /// Scan request.
215    request: ScanRequest,
216    /// Cache.
217    cache_strategy: CacheStrategy,
218    /// Capacity of the channel to send data from parallel scan tasks to the main task.
219    parallel_scan_channel_size: usize,
220    /// Maximum number of SST files to scan concurrently.
221    max_concurrent_scan_files: usize,
222    /// Whether to ignore inverted index.
223    ignore_inverted_index: bool,
224    /// Whether to ignore fulltext index.
225    ignore_fulltext_index: bool,
226    /// Whether to ignore bloom filter.
227    ignore_bloom_filter: bool,
228    /// Start time of the scan task.
229    start_time: Option<Instant>,
230    /// Whether to filter out the deleted rows.
231    /// Usually true for normal read, and false for scan for compaction.
232    filter_deleted: bool,
233    #[cfg(feature = "enterprise")]
234    extension_range_provider: Option<BoxedExtensionRangeProvider>,
235}
236
237impl ScanRegion {
238    /// Creates a [ScanRegion].
239    pub(crate) fn new(
240        version: VersionRef,
241        access_layer: AccessLayerRef,
242        request: ScanRequest,
243        cache_strategy: CacheStrategy,
244    ) -> ScanRegion {
245        ScanRegion {
246            version,
247            access_layer,
248            request,
249            cache_strategy,
250            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
251            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
252            ignore_inverted_index: false,
253            ignore_fulltext_index: false,
254            ignore_bloom_filter: false,
255            start_time: None,
256            filter_deleted: true,
257            #[cfg(feature = "enterprise")]
258            extension_range_provider: None,
259        }
260    }
261
262    /// Sets parallel scan task channel size.
263    #[must_use]
264    pub(crate) fn with_parallel_scan_channel_size(
265        mut self,
266        parallel_scan_channel_size: usize,
267    ) -> Self {
268        self.parallel_scan_channel_size = parallel_scan_channel_size;
269        self
270    }
271
272    /// Sets maximum number of SST files to scan concurrently.
273    #[must_use]
274    pub(crate) fn with_max_concurrent_scan_files(
275        mut self,
276        max_concurrent_scan_files: usize,
277    ) -> Self {
278        self.max_concurrent_scan_files = max_concurrent_scan_files;
279        self
280    }
281
282    /// Sets whether to ignore inverted index.
283    #[must_use]
284    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
285        self.ignore_inverted_index = ignore;
286        self
287    }
288
289    /// Sets whether to ignore fulltext index.
290    #[must_use]
291    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
292        self.ignore_fulltext_index = ignore;
293        self
294    }
295
296    /// Sets whether to ignore bloom filter.
297    #[must_use]
298    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
299        self.ignore_bloom_filter = ignore;
300        self
301    }
302
303    #[must_use]
304    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
305        self.start_time = Some(now);
306        self
307    }
308
309    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
310        self.filter_deleted = filter_deleted;
311    }
312
313    #[cfg(feature = "enterprise")]
314    pub(crate) fn set_extension_range_provider(
315        &mut self,
316        extension_range_provider: BoxedExtensionRangeProvider,
317    ) {
318        self.extension_range_provider = Some(extension_range_provider);
319    }
320
321    /// Returns a [Scanner] to scan the region.
322    pub(crate) async fn scanner(self) -> Result<Scanner> {
323        if self.use_series_scan() {
324            self.series_scan().await.map(Scanner::Series)
325        } else if self.use_unordered_scan() {
326            // If table is append only and there is no series row selector, we use unordered scan in query.
327            // We still use seq scan in compaction.
328            self.unordered_scan().await.map(Scanner::Unordered)
329        } else {
330            self.seq_scan().await.map(Scanner::Seq)
331        }
332    }
333
334    /// Returns a [RegionScanner] to scan the region.
335    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
336    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
337        if self.use_series_scan() {
338            self.series_scan()
339                .await
340                .map(|scanner| Box::new(scanner) as _)
341        } else if self.use_unordered_scan() {
342            self.unordered_scan()
343                .await
344                .map(|scanner| Box::new(scanner) as _)
345        } else {
346            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
347        }
348    }
349
350    /// Scan sequentially.
351    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
352        let input = self.scan_input().await?.with_compaction(false);
353        Ok(SeqScan::new(input))
354    }
355
356    /// Unordered scan.
357    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
358        let input = self.scan_input().await?;
359        Ok(UnorderedScan::new(input))
360    }
361
362    /// Scans by series.
363    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
364        let input = self.scan_input().await?;
365        Ok(SeriesScan::new(input))
366    }
367
368    /// Returns true if the region can use unordered scan for current request.
369    fn use_unordered_scan(&self) -> bool {
370        // We use unordered scan when:
371        // 1. The region is in append mode.
372        // 2. There is no series row selector.
373        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
374        //
375        // We still use seq scan in compaction.
376        self.version.options.append_mode
377            && self.request.series_row_selector.is_none()
378            && (self.request.distribution.is_none()
379                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
380    }
381
382    /// Returns true if the region can use series scan for current request.
383    fn use_series_scan(&self) -> bool {
384        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
385    }
386
387    /// Returns true if the region use flat format.
388    fn use_flat_format(&self) -> bool {
389        self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
390    }
391
392    /// Creates a scan input.
393    async fn scan_input(mut self) -> Result<ScanInput> {
394        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
395        let time_range = self.build_time_range_predicate();
396        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
397        let flat_format = self.use_flat_format();
398
399        // The mapper always computes projected column ids as the schema of SSTs may change.
400        let mapper = match &self.request.projection {
401            Some(p) => {
402                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
403            }
404            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
405        };
406
407        let ssts = &self.version.ssts;
408        let mut files = Vec::new();
409        for level in ssts.levels() {
410            for file in level.files.values() {
411                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
412                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
413                    // If the file's sequence is None (or actually is zero), it could mean the file
414                    // is generated and added to the region "directly". In this case, its data should
415                    // be considered as fresh as the memtable. So its sequence is treated greater than
416                    // the min_sequence, whatever the value of min_sequence is. Hence the default
417                    // "true" in this arm.
418                    (Some(_), None) => true,
419                    (None, _) => true,
420                };
421
422                // Finds SST files in range.
423                if exceed_min_sequence && file_in_range(file, &time_range) {
424                    files.push(file.clone());
425                }
426                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
427                // unless the timing is too good, or the sequence number wouldn't be in file.
428                // and the batch will be filtered out by tree reader anyway.
429            }
430        }
431
432        let memtables = self.version.memtables.list_memtables();
433        // Skip empty memtables and memtables out of time range.
434        let mut mem_range_builders = Vec::new();
435        let filter_mode = pre_filter_mode(
436            self.version.options.append_mode,
437            self.version.options.merge_mode(),
438        );
439
440        for m in memtables {
441            // check if memtable is empty by reading stats.
442            let Some((start, end)) = m.stats().time_range() else {
443                continue;
444            };
445            // The time range of the memtable is inclusive.
446            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
447            if !memtable_range.intersects(&time_range) {
448                continue;
449            }
450            let ranges_in_memtable = m.ranges(
451                Some(mapper.column_ids()),
452                RangesOptions::default()
453                    .with_predicate(predicate.clone())
454                    .with_sequence(SequenceRange::new(
455                        self.request.memtable_min_sequence,
456                        self.request.memtable_max_sequence,
457                    ))
458                    .with_pre_filter_mode(filter_mode),
459            )?;
460            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
461                // todo: we should add stats to MemtableRange
462                let mut stats = ranges_in_memtable.stats.clone();
463                stats.num_ranges = 1;
464                stats.num_rows = v.num_rows();
465                MemRangeBuilder::new(v, stats)
466            }));
467        }
468
469        let region_id = self.region_id();
470        debug!(
471            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
472            region_id,
473            self.request,
474            time_range,
475            mem_range_builders.len(),
476            files.len(),
477            self.version.options.append_mode,
478            flat_format,
479        );
480
481        let (non_field_filters, field_filters) = self.partition_by_field_filters();
482        let inverted_index_appliers = [
483            self.build_invereted_index_applier(&non_field_filters),
484            self.build_invereted_index_applier(&field_filters),
485        ];
486        let bloom_filter_appliers = [
487            self.build_bloom_filter_applier(&non_field_filters),
488            self.build_bloom_filter_applier(&field_filters),
489        ];
490        let fulltext_index_appliers = [
491            self.build_fulltext_index_applier(&non_field_filters),
492            self.build_fulltext_index_applier(&field_filters),
493        ];
494        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
495
496        if flat_format {
497            // The batch is already large enough so we use a small channel size here.
498            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
499        }
500
501        let input = ScanInput::new(self.access_layer, mapper)
502            .with_time_range(Some(time_range))
503            .with_predicate(predicate)
504            .with_memtables(mem_range_builders)
505            .with_files(files)
506            .with_cache(self.cache_strategy)
507            .with_inverted_index_appliers(inverted_index_appliers)
508            .with_bloom_filter_index_appliers(bloom_filter_appliers)
509            .with_fulltext_index_appliers(fulltext_index_appliers)
510            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
511            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
512            .with_start_time(self.start_time)
513            .with_append_mode(self.version.options.append_mode)
514            .with_filter_deleted(self.filter_deleted)
515            .with_merge_mode(self.version.options.merge_mode())
516            .with_series_row_selector(self.request.series_row_selector)
517            .with_distribution(self.request.distribution)
518            .with_flat_format(flat_format);
519
520        #[cfg(feature = "enterprise")]
521        let input = if let Some(provider) = self.extension_range_provider {
522            let ranges = provider
523                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
524                .await?;
525            debug!("Find extension ranges: {ranges:?}");
526            input.with_extension_ranges(ranges)
527        } else {
528            input
529        };
530        Ok(input)
531    }
532
533    fn region_id(&self) -> RegionId {
534        self.version.metadata.region_id
535    }
536
537    /// Build time range predicate from filters.
538    fn build_time_range_predicate(&self) -> TimestampRange {
539        let time_index = self.version.metadata.time_index_column();
540        let unit = time_index
541            .column_schema
542            .data_type
543            .as_timestamp()
544            .expect("Time index must have timestamp-compatible type")
545            .unit();
546        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
547    }
548
549    /// Partitions filters into two groups: non-field filters and field filters.
550    /// Returns `(non_field_filters, field_filters)`.
551    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
552        let field_columns = self
553            .version
554            .metadata
555            .field_columns()
556            .map(|col| &col.column_schema.name)
557            .collect::<HashSet<_>>();
558
559        let mut columns = HashSet::new();
560
561        self.request.filters.iter().cloned().partition(|expr| {
562            columns.clear();
563            // `expr_to_columns` won't return error.
564            if expr_to_columns(expr, &mut columns).is_err() {
565                // If we can't extract columns, treat it as non-field filter
566                return true;
567            }
568            // Return true for non-field filters (partition puts true cases in first vec)
569            !columns
570                .iter()
571                .any(|column| field_columns.contains(&column.name))
572        })
573    }
574
575    /// Use the latest schema to build the inverted index applier.
576    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
577        if self.ignore_inverted_index {
578            return None;
579        }
580
581        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
582        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
583
584        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
585
586        InvertedIndexApplierBuilder::new(
587            self.access_layer.table_dir().to_string(),
588            self.access_layer.path_type(),
589            self.access_layer.object_store().clone(),
590            self.version.metadata.as_ref(),
591            self.version.metadata.inverted_indexed_column_ids(
592                self.version
593                    .options
594                    .index_options
595                    .inverted_index
596                    .ignore_column_ids
597                    .iter(),
598            ),
599            self.access_layer.puffin_manager_factory().clone(),
600        )
601        .with_file_cache(file_cache)
602        .with_inverted_index_cache(inverted_index_cache)
603        .with_puffin_metadata_cache(puffin_metadata_cache)
604        .build(filters)
605        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
606        .ok()
607        .flatten()
608        .map(Arc::new)
609    }
610
611    /// Use the latest schema to build the bloom filter index applier.
612    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
613        if self.ignore_bloom_filter {
614            return None;
615        }
616
617        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
618        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
619        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
620
621        BloomFilterIndexApplierBuilder::new(
622            self.access_layer.table_dir().to_string(),
623            self.access_layer.path_type(),
624            self.access_layer.object_store().clone(),
625            self.version.metadata.as_ref(),
626            self.access_layer.puffin_manager_factory().clone(),
627        )
628        .with_file_cache(file_cache)
629        .with_bloom_filter_index_cache(bloom_filter_index_cache)
630        .with_puffin_metadata_cache(puffin_metadata_cache)
631        .build(filters)
632        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
633        .ok()
634        .flatten()
635        .map(Arc::new)
636    }
637
638    /// Use the latest schema to build the fulltext index applier.
639    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
640        if self.ignore_fulltext_index {
641            return None;
642        }
643
644        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
645        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
646        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
647        FulltextIndexApplierBuilder::new(
648            self.access_layer.table_dir().to_string(),
649            self.access_layer.path_type(),
650            self.access_layer.object_store().clone(),
651            self.access_layer.puffin_manager_factory().clone(),
652            self.version.metadata.as_ref(),
653        )
654        .with_file_cache(file_cache)
655        .with_puffin_metadata_cache(puffin_metadata_cache)
656        .with_bloom_filter_cache(bloom_filter_index_cache)
657        .build(filters)
658        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
659        .ok()
660        .flatten()
661        .map(Arc::new)
662    }
663}
664
665/// Returns true if the time range of a SST `file` matches the `predicate`.
666fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
667    if predicate == &TimestampRange::min_to_max() {
668        return true;
669    }
670    // end timestamp of a SST is inclusive.
671    let (start, end) = file.time_range();
672    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
673    file_ts_range.intersects(predicate)
674}
675
676/// Common input for different scanners.
677pub struct ScanInput {
678    /// Region SST access layer.
679    access_layer: AccessLayerRef,
680    /// Maps projected Batches to RecordBatches.
681    pub(crate) mapper: Arc<ProjectionMapper>,
682    /// Time range filter for time index.
683    time_range: Option<TimestampRange>,
684    /// Predicate to push down.
685    pub(crate) predicate: PredicateGroup,
686    /// Region partition expr applied at read time.
687    region_partition_expr: Option<PartitionExpr>,
688    /// Memtable range builders for memtables in the time range..
689    pub(crate) memtables: Vec<MemRangeBuilder>,
690    /// Handles to SST files to scan.
691    pub(crate) files: Vec<FileHandle>,
692    /// Cache.
693    pub(crate) cache_strategy: CacheStrategy,
694    /// Ignores file not found error.
695    ignore_file_not_found: bool,
696    /// Capacity of the channel to send data from parallel scan tasks to the main task.
697    pub(crate) parallel_scan_channel_size: usize,
698    /// Maximum number of SST files to scan concurrently.
699    pub(crate) max_concurrent_scan_files: usize,
700    /// Index appliers.
701    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
702    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
703    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
704    /// Start time of the query.
705    pub(crate) query_start: Option<Instant>,
706    /// The region is using append mode.
707    pub(crate) append_mode: bool,
708    /// Whether to remove deletion markers.
709    pub(crate) filter_deleted: bool,
710    /// Mode to merge duplicate rows.
711    pub(crate) merge_mode: MergeMode,
712    /// Hint to select rows from time series.
713    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
714    /// Hint for the required distribution of the scanner.
715    pub(crate) distribution: Option<TimeSeriesDistribution>,
716    /// Whether to use flat format.
717    pub(crate) flat_format: bool,
718    /// Whether this scan is for compaction.
719    pub(crate) compaction: bool,
720    #[cfg(feature = "enterprise")]
721    extension_ranges: Vec<BoxedExtensionRange>,
722}
723
724impl ScanInput {
725    /// Creates a new [ScanInput].
726    #[must_use]
727    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
728        ScanInput {
729            access_layer,
730            mapper: Arc::new(mapper),
731            time_range: None,
732            predicate: PredicateGroup::default(),
733            region_partition_expr: None,
734            memtables: Vec::new(),
735            files: Vec::new(),
736            cache_strategy: CacheStrategy::Disabled,
737            ignore_file_not_found: false,
738            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
739            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
740            inverted_index_appliers: [None, None],
741            bloom_filter_index_appliers: [None, None],
742            fulltext_index_appliers: [None, None],
743            query_start: None,
744            append_mode: false,
745            filter_deleted: true,
746            merge_mode: MergeMode::default(),
747            series_row_selector: None,
748            distribution: None,
749            flat_format: false,
750            compaction: false,
751            #[cfg(feature = "enterprise")]
752            extension_ranges: Vec::new(),
753        }
754    }
755
756    /// Sets time range filter for time index.
757    #[must_use]
758    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
759        self.time_range = time_range;
760        self
761    }
762
763    /// Sets predicate to push down.
764    #[must_use]
765    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
766        self.region_partition_expr = predicate.region_partition_expr().cloned();
767        self.predicate = predicate;
768        self
769    }
770
771    /// Sets memtable range builders.
772    #[must_use]
773    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
774        self.memtables = memtables;
775        self
776    }
777
778    /// Sets files to read.
779    #[must_use]
780    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
781        self.files = files;
782        self
783    }
784
785    /// Sets cache for this query.
786    #[must_use]
787    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
788        self.cache_strategy = cache;
789        self
790    }
791
792    /// Ignores file not found error.
793    #[must_use]
794    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
795        self.ignore_file_not_found = ignore;
796        self
797    }
798
799    /// Sets scan task channel size.
800    #[must_use]
801    pub(crate) fn with_parallel_scan_channel_size(
802        mut self,
803        parallel_scan_channel_size: usize,
804    ) -> Self {
805        self.parallel_scan_channel_size = parallel_scan_channel_size;
806        self
807    }
808
809    /// Sets maximum number of SST files to scan concurrently.
810    #[must_use]
811    pub(crate) fn with_max_concurrent_scan_files(
812        mut self,
813        max_concurrent_scan_files: usize,
814    ) -> Self {
815        self.max_concurrent_scan_files = max_concurrent_scan_files;
816        self
817    }
818
819    /// Sets inverted index appliers.
820    #[must_use]
821    pub(crate) fn with_inverted_index_appliers(
822        mut self,
823        appliers: [Option<InvertedIndexApplierRef>; 2],
824    ) -> Self {
825        self.inverted_index_appliers = appliers;
826        self
827    }
828
829    /// Sets bloom filter appliers.
830    #[must_use]
831    pub(crate) fn with_bloom_filter_index_appliers(
832        mut self,
833        appliers: [Option<BloomFilterIndexApplierRef>; 2],
834    ) -> Self {
835        self.bloom_filter_index_appliers = appliers;
836        self
837    }
838
839    /// Sets fulltext index appliers.
840    #[must_use]
841    pub(crate) fn with_fulltext_index_appliers(
842        mut self,
843        appliers: [Option<FulltextIndexApplierRef>; 2],
844    ) -> Self {
845        self.fulltext_index_appliers = appliers;
846        self
847    }
848
849    /// Sets start time of the query.
850    #[must_use]
851    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
852        self.query_start = now;
853        self
854    }
855
856    #[must_use]
857    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
858        self.append_mode = is_append_mode;
859        self
860    }
861
862    /// Sets whether to remove deletion markers during scan.
863    #[must_use]
864    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
865        self.filter_deleted = filter_deleted;
866        self
867    }
868
869    /// Sets the merge mode.
870    #[must_use]
871    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
872        self.merge_mode = merge_mode;
873        self
874    }
875
876    /// Sets the distribution hint.
877    #[must_use]
878    pub(crate) fn with_distribution(
879        mut self,
880        distribution: Option<TimeSeriesDistribution>,
881    ) -> Self {
882        self.distribution = distribution;
883        self
884    }
885
886    /// Sets the time series row selector.
887    #[must_use]
888    pub(crate) fn with_series_row_selector(
889        mut self,
890        series_row_selector: Option<TimeSeriesRowSelector>,
891    ) -> Self {
892        self.series_row_selector = series_row_selector;
893        self
894    }
895
896    /// Sets whether to use flat format.
897    #[must_use]
898    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
899        self.flat_format = flat_format;
900        self
901    }
902
903    /// Sets whether this scan is for compaction.
904    #[must_use]
905    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
906        self.compaction = compaction;
907        self
908    }
909
910    /// Scans sources in parallel.
911    ///
912    /// # Panics if the input doesn't allow parallel scan.
913    pub(crate) fn create_parallel_sources(
914        &self,
915        sources: Vec<Source>,
916        semaphore: Arc<Semaphore>,
917    ) -> Result<Vec<Source>> {
918        if sources.len() <= 1 {
919            return Ok(sources);
920        }
921
922        // Spawn a task for each source.
923        let sources = sources
924            .into_iter()
925            .map(|source| {
926                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
927                self.spawn_scan_task(source, semaphore.clone(), sender);
928                let stream = Box::pin(ReceiverStream::new(receiver));
929                Source::Stream(stream)
930            })
931            .collect();
932        Ok(sources)
933    }
934
935    /// Builds memtable ranges to scan by `index`.
936    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
937        let memtable = &self.memtables[index.index];
938        let mut ranges = SmallVec::new();
939        memtable.build_ranges(index.row_group_index, &mut ranges);
940        ranges
941    }
942
943    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
944        if self.should_skip_region_partition(file) {
945            self.predicate.predicate_without_region().cloned()
946        } else {
947            self.predicate.predicate().cloned()
948        }
949    }
950
951    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
952        match (
953            self.region_partition_expr.as_ref(),
954            file.meta_ref().partition_expr.as_ref(),
955        ) {
956            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
957            _ => false,
958        }
959    }
960
961    /// Prunes a file to scan and returns the builder to build readers.
962    pub async fn prune_file(
963        &self,
964        file: &FileHandle,
965        reader_metrics: &mut ReaderMetrics,
966    ) -> Result<FileRangeBuilder> {
967        let predicate = self.predicate_for_file(file);
968        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
969        let decode_pk_values = !self.compaction && self.mapper.has_tags();
970        let res = self
971            .access_layer
972            .read_sst(file.clone())
973            .predicate(predicate)
974            .projection(Some(self.mapper.column_ids().to_vec()))
975            .cache(self.cache_strategy.clone())
976            .inverted_index_appliers(self.inverted_index_appliers.clone())
977            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
978            .fulltext_index_appliers(self.fulltext_index_appliers.clone())
979            .expected_metadata(Some(self.mapper.metadata().clone()))
980            .flat_format(self.flat_format)
981            .compaction(self.compaction)
982            .pre_filter_mode(filter_mode)
983            .decode_primary_key_values(decode_pk_values)
984            .build_reader_input(reader_metrics)
985            .await;
986        let (mut file_range_ctx, selection) = match res {
987            Ok(x) => x,
988            Err(e) => {
989                if e.is_object_not_found() && self.ignore_file_not_found {
990                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
991                    return Ok(FileRangeBuilder::default());
992                } else {
993                    return Err(e);
994                }
995            }
996        };
997
998        let need_compat = !compat::has_same_columns_and_pk_encoding(
999            self.mapper.metadata(),
1000            file_range_ctx.read_format().metadata(),
1001        );
1002        if need_compat {
1003            // They have different schema. We need to adapt the batch first so the
1004            // mapper can convert it.
1005            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1006                let mapper = self.mapper.as_flat().unwrap();
1007                FlatCompatBatch::try_new(
1008                    mapper,
1009                    flat_format.metadata(),
1010                    flat_format.format_projection(),
1011                    self.compaction,
1012                )?
1013                .map(CompatBatch::Flat)
1014            } else {
1015                let compact_batch = PrimaryKeyCompatBatch::new(
1016                    &self.mapper,
1017                    file_range_ctx.read_format().metadata().clone(),
1018                )?;
1019                Some(CompatBatch::PrimaryKey(compact_batch))
1020            };
1021            file_range_ctx.set_compat_batch(compat);
1022        }
1023        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1024    }
1025
1026    /// Scans the input source in another task and sends batches to the sender.
1027    pub(crate) fn spawn_scan_task(
1028        &self,
1029        mut input: Source,
1030        semaphore: Arc<Semaphore>,
1031        sender: mpsc::Sender<Result<Batch>>,
1032    ) {
1033        common_runtime::spawn_global(async move {
1034            loop {
1035                // We release the permit before sending result to avoid the task waiting on
1036                // the channel with the permit held.
1037                let maybe_batch = {
1038                    // Safety: We never close the semaphore.
1039                    let _permit = semaphore.acquire().await.unwrap();
1040                    input.next_batch().await
1041                };
1042                match maybe_batch {
1043                    Ok(Some(batch)) => {
1044                        let _ = sender.send(Ok(batch)).await;
1045                    }
1046                    Ok(None) => break,
1047                    Err(e) => {
1048                        let _ = sender.send(Err(e)).await;
1049                        break;
1050                    }
1051                }
1052            }
1053        });
1054    }
1055
1056    /// Scans flat sources (RecordBatch streams) in parallel.
1057    ///
1058    /// # Panics if the input doesn't allow parallel scan.
1059    pub(crate) fn create_parallel_flat_sources(
1060        &self,
1061        sources: Vec<BoxedRecordBatchStream>,
1062        semaphore: Arc<Semaphore>,
1063    ) -> Result<Vec<BoxedRecordBatchStream>> {
1064        if sources.len() <= 1 {
1065            return Ok(sources);
1066        }
1067
1068        // Spawn a task for each source.
1069        let sources = sources
1070            .into_iter()
1071            .map(|source| {
1072                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1073                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1074                let stream = Box::pin(ReceiverStream::new(receiver));
1075                Box::pin(stream) as _
1076            })
1077            .collect();
1078        Ok(sources)
1079    }
1080
1081    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1082    pub(crate) fn spawn_flat_scan_task(
1083        &self,
1084        mut input: BoxedRecordBatchStream,
1085        semaphore: Arc<Semaphore>,
1086        sender: mpsc::Sender<Result<RecordBatch>>,
1087    ) {
1088        common_runtime::spawn_global(async move {
1089            loop {
1090                // We release the permit before sending result to avoid the task waiting on
1091                // the channel with the permit held.
1092                let maybe_batch = {
1093                    // Safety: We never close the semaphore.
1094                    let _permit = semaphore.acquire().await.unwrap();
1095                    input.next().await
1096                };
1097                match maybe_batch {
1098                    Some(Ok(batch)) => {
1099                        let _ = sender.send(Ok(batch)).await;
1100                    }
1101                    Some(Err(e)) => {
1102                        let _ = sender.send(Err(e)).await;
1103                        break;
1104                    }
1105                    None => break,
1106                }
1107            }
1108        });
1109    }
1110
1111    pub(crate) fn total_rows(&self) -> usize {
1112        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1113        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1114
1115        let rows = rows_in_files + rows_in_memtables;
1116        #[cfg(feature = "enterprise")]
1117        let rows = rows
1118            + self
1119                .extension_ranges
1120                .iter()
1121                .map(|x| x.num_rows())
1122                .sum::<u64>() as usize;
1123        rows
1124    }
1125
1126    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1127        &self.predicate
1128    }
1129
1130    /// Returns number of memtables to scan.
1131    pub(crate) fn num_memtables(&self) -> usize {
1132        self.memtables.len()
1133    }
1134
1135    /// Returns number of SST files to scan.
1136    pub(crate) fn num_files(&self) -> usize {
1137        self.files.len()
1138    }
1139
1140    /// Gets the file handle from a row group index.
1141    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1142        let file_index = index.index - self.num_memtables();
1143        &self.files[file_index]
1144    }
1145
1146    pub fn region_metadata(&self) -> &RegionMetadataRef {
1147        self.mapper.metadata()
1148    }
1149}
1150
1151#[cfg(feature = "enterprise")]
1152impl ScanInput {
1153    #[must_use]
1154    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1155        Self {
1156            extension_ranges,
1157            ..self
1158        }
1159    }
1160
1161    #[cfg(feature = "enterprise")]
1162    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1163        &self.extension_ranges
1164    }
1165
1166    /// Get a boxed [ExtensionRange] by the index in all ranges.
1167    #[cfg(feature = "enterprise")]
1168    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1169        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1170    }
1171}
1172
1173#[cfg(test)]
1174impl ScanInput {
1175    /// Returns SST file ids to scan.
1176    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1177        self.files.iter().map(|file| file.file_id()).collect()
1178    }
1179
1180    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1181        self.files.iter().map(|file| file.index_id()).collect()
1182    }
1183}
1184
1185fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1186    if append_mode {
1187        return PreFilterMode::All;
1188    }
1189
1190    match merge_mode {
1191        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1192        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1193    }
1194}
1195
1196/// Context shared by different streams from a scanner.
1197/// It contains the input and ranges to scan.
1198pub struct StreamContext {
1199    /// Input memtables and files.
1200    pub input: ScanInput,
1201    /// Metadata for partition ranges.
1202    pub(crate) ranges: Vec<RangeMeta>,
1203
1204    // Metrics:
1205    /// The start time of the query.
1206    pub(crate) query_start: Instant,
1207}
1208
1209impl StreamContext {
1210    /// Creates a new [StreamContext] for [SeqScan].
1211    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1212        let query_start = input.query_start.unwrap_or_else(Instant::now);
1213        let ranges = RangeMeta::seq_scan_ranges(&input);
1214        READ_SST_COUNT.observe(input.num_files() as f64);
1215
1216        Self {
1217            input,
1218            ranges,
1219            query_start,
1220        }
1221    }
1222
1223    /// Creates a new [StreamContext] for [UnorderedScan].
1224    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1225        let query_start = input.query_start.unwrap_or_else(Instant::now);
1226        let ranges = RangeMeta::unordered_scan_ranges(&input);
1227        READ_SST_COUNT.observe(input.num_files() as f64);
1228
1229        Self {
1230            input,
1231            ranges,
1232            query_start,
1233        }
1234    }
1235
1236    /// Returns true if the index refers to a memtable.
1237    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1238        self.input.num_memtables() > index.index
1239    }
1240
1241    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1242        !self.is_mem_range_index(index)
1243            && index.index < self.input.num_files() + self.input.num_memtables()
1244    }
1245
1246    /// Retrieves the partition ranges.
1247    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1248        self.ranges
1249            .iter()
1250            .enumerate()
1251            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1252            .collect()
1253    }
1254
1255    /// Format the context for explain.
1256    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1257        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1258        for range_meta in &self.ranges {
1259            for idx in &range_meta.row_group_indices {
1260                if self.is_mem_range_index(*idx) {
1261                    num_mem_ranges += 1;
1262                } else if self.is_file_range_index(*idx) {
1263                    num_file_ranges += 1;
1264                } else {
1265                    num_other_ranges += 1;
1266                }
1267            }
1268        }
1269        if verbose {
1270            write!(f, "{{")?;
1271        }
1272        write!(
1273            f,
1274            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1275            self.ranges.len(),
1276            num_mem_ranges,
1277            self.input.num_files(),
1278            num_file_ranges,
1279        )?;
1280        if num_other_ranges > 0 {
1281            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1282        }
1283        write!(f, "}}")?;
1284
1285        if let Some(selector) = &self.input.series_row_selector {
1286            write!(f, ", \"selector\":\"{}\"", selector)?;
1287        }
1288        if let Some(distribution) = &self.input.distribution {
1289            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1290        }
1291
1292        if verbose {
1293            self.format_verbose_content(f)?;
1294        }
1295
1296        Ok(())
1297    }
1298
1299    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1300        struct FileWrapper<'a> {
1301            file: &'a FileHandle,
1302        }
1303
1304        impl fmt::Debug for FileWrapper<'_> {
1305            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1306                let (start, end) = self.file.time_range();
1307                write!(
1308                    f,
1309                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1310                    self.file.file_id(),
1311                    start.value(),
1312                    start.unit(),
1313                    end.value(),
1314                    end.unit(),
1315                    self.file.num_rows(),
1316                    self.file.size(),
1317                    self.file.index_size()
1318                )
1319            }
1320        }
1321
1322        struct InputWrapper<'a> {
1323            input: &'a ScanInput,
1324        }
1325
1326        #[cfg(feature = "enterprise")]
1327        impl InputWrapper<'_> {
1328            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1329                if self.input.extension_ranges.is_empty() {
1330                    return Ok(());
1331                }
1332
1333                let mut delimiter = "";
1334                write!(f, ", extension_ranges: [")?;
1335                for range in self.input.extension_ranges() {
1336                    write!(f, "{}{:?}", delimiter, range)?;
1337                    delimiter = ", ";
1338                }
1339                write!(f, "]")?;
1340                Ok(())
1341            }
1342        }
1343
1344        impl fmt::Debug for InputWrapper<'_> {
1345            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1346                let output_schema = self.input.mapper.output_schema();
1347                if !output_schema.is_empty() {
1348                    let names: Vec<_> = output_schema
1349                        .column_schemas()
1350                        .iter()
1351                        .map(|col| &col.name)
1352                        .collect();
1353                    write!(f, ", \"projection\": {:?}", names)?;
1354                }
1355                if let Some(predicate) = &self.input.predicate.predicate()
1356                    && !predicate.exprs().is_empty()
1357                {
1358                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1359                    write!(f, ", \"filters\": {:?}", exprs)?;
1360                }
1361                if !self.input.files.is_empty() {
1362                    write!(f, ", \"files\": ")?;
1363                    f.debug_list()
1364                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1365                        .finish()?;
1366                }
1367
1368                #[cfg(feature = "enterprise")]
1369                self.format_extension_ranges(f)?;
1370
1371                Ok(())
1372            }
1373        }
1374
1375        write!(f, "{:?}", InputWrapper { input: &self.input })
1376    }
1377}
1378
1379/// Predicates to evaluate.
1380/// It only keeps filters that [SimpleFilterEvaluator] supports.
1381#[derive(Clone, Default)]
1382pub struct PredicateGroup {
1383    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1384    /// Predicate that includes request filters and region partition expr (if any).
1385    predicate_all: Option<Predicate>,
1386    /// Predicate that only includes request filters.
1387    predicate_without_region: Option<Predicate>,
1388    /// Region partition expression restored from metadata.
1389    region_partition_expr: Option<PartitionExpr>,
1390}
1391
1392impl PredicateGroup {
1393    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1394    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1395        let mut combined_exprs = exprs.to_vec();
1396        let mut region_partition_expr = None;
1397
1398        if let Some(expr_json) = metadata.partition_expr.as_ref()
1399            && !expr_json.is_empty()
1400            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1401                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1402        {
1403            let logical_expr = expr
1404                .try_as_logical_expr()
1405                .context(InvalidPartitionExprSnafu {
1406                    expr: expr_json.clone(),
1407                })?;
1408
1409            combined_exprs.push(logical_expr);
1410            region_partition_expr = Some(expr);
1411        }
1412
1413        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1414        // Columns in the expr.
1415        let mut columns = HashSet::new();
1416        for expr in &combined_exprs {
1417            columns.clear();
1418            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1419                continue;
1420            };
1421            time_filters.push(filter);
1422        }
1423        let time_filters = if time_filters.is_empty() {
1424            None
1425        } else {
1426            Some(Arc::new(time_filters))
1427        };
1428
1429        let predicate_all = if combined_exprs.is_empty() {
1430            None
1431        } else {
1432            Some(Predicate::new(combined_exprs))
1433        };
1434        let predicate_without_region = if exprs.is_empty() {
1435            None
1436        } else {
1437            Some(Predicate::new(exprs.to_vec()))
1438        };
1439
1440        Ok(Self {
1441            time_filters,
1442            predicate_all,
1443            predicate_without_region,
1444            region_partition_expr,
1445        })
1446    }
1447
1448    /// Returns time filters.
1449    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1450        self.time_filters.clone()
1451    }
1452
1453    /// Returns predicate of all exprs (including region partition expr if present).
1454    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1455        self.predicate_all.as_ref()
1456    }
1457
1458    /// Returns predicate that excludes region partition expr.
1459    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1460        self.predicate_without_region.as_ref()
1461    }
1462
1463    /// Returns the region partition expr from metadata, if any.
1464    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1465        self.region_partition_expr.as_ref()
1466    }
1467
1468    fn expr_to_filter(
1469        expr: &Expr,
1470        metadata: &RegionMetadata,
1471        columns: &mut HashSet<Column>,
1472    ) -> Option<SimpleFilterEvaluator> {
1473        columns.clear();
1474        // `expr_to_columns` won't return error.
1475        // We still ignore these expressions for safety.
1476        expr_to_columns(expr, columns).ok()?;
1477        if columns.len() > 1 {
1478            // Simple filter doesn't support multiple columns.
1479            return None;
1480        }
1481        let column = columns.iter().next()?;
1482        let column_meta = metadata.column_by_name(&column.name)?;
1483        if column_meta.semantic_type == SemanticType::Timestamp {
1484            SimpleFilterEvaluator::try_new(expr)
1485        } else {
1486            None
1487        }
1488    }
1489}