mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39    RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::file::FileHandle;
64use crate::sst::index::bloom_filter::applier::{
65    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
66};
67use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
68use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
69use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
70use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
71use crate::sst::parquet::file_range::PreFilterMode;
72use crate::sst::parquet::reader::ReaderMetrics;
73
74/// Parallel scan channel size for flat format.
75const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
76
77/// A scanner scans a region and returns a [SendableRecordBatchStream].
78pub(crate) enum Scanner {
79    /// Sequential scan.
80    Seq(SeqScan),
81    /// Unordered scan.
82    Unordered(UnorderedScan),
83    /// Per-series scan.
84    Series(SeriesScan),
85}
86
87impl Scanner {
88    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
89    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
90    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
91        match self {
92            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
93            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
94            Scanner::Series(series_scan) => series_scan.build_stream().await,
95        }
96    }
97
98    /// Create a stream of [`Batch`] by this scanner.
99    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
100        match self {
101            Scanner::Seq(x) => x.scan_all_partitions(),
102            Scanner::Unordered(x) => x.scan_all_partitions(),
103            Scanner::Series(x) => x.scan_all_partitions(),
104        }
105    }
106}
107
108#[cfg(test)]
109impl Scanner {
110    /// Returns number of files to scan.
111    pub(crate) fn num_files(&self) -> usize {
112        match self {
113            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
114            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
115            Scanner::Series(series_scan) => series_scan.input().num_files(),
116        }
117    }
118
119    /// Returns number of memtables to scan.
120    pub(crate) fn num_memtables(&self) -> usize {
121        match self {
122            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
123            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
124            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
125        }
126    }
127
128    /// Returns SST file ids to scan.
129    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
130        match self {
131            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
132            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
133            Scanner::Series(series_scan) => series_scan.input().file_ids(),
134        }
135    }
136
137    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
138    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
139        use store_api::region_engine::{PrepareRequest, RegionScanner};
140
141        let request = PrepareRequest::default().with_target_partitions(target_partitions);
142        match self {
143            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
144            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
145            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
146        }
147    }
148}
149
150#[cfg_attr(doc, aquamarine::aquamarine)]
151/// Helper to scans a region by [ScanRequest].
152///
153/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
154/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
155///
156/// ```mermaid
157/// classDiagram
158/// class ScanRegion {
159///     -VersionRef version
160///     -ScanRequest request
161///     ~scanner() Scanner
162///     ~seq_scan() SeqScan
163/// }
164/// class Scanner {
165///     <<enumeration>>
166///     SeqScan
167///     UnorderedScan
168///     +scan() SendableRecordBatchStream
169/// }
170/// class SeqScan {
171///     -ScanInput input
172///     +build() SendableRecordBatchStream
173/// }
174/// class UnorderedScan {
175///     -ScanInput input
176///     +build() SendableRecordBatchStream
177/// }
178/// class ScanInput {
179///     -ProjectionMapper mapper
180///     -Option~TimeRange~ time_range
181///     -Option~Predicate~ predicate
182///     -Vec~MemtableRef~ memtables
183///     -Vec~FileHandle~ files
184/// }
185/// class ProjectionMapper {
186///     ~output_schema() SchemaRef
187///     ~convert(Batch) RecordBatch
188/// }
189/// ScanRegion -- Scanner
190/// ScanRegion o-- ScanRequest
191/// Scanner o-- SeqScan
192/// Scanner o-- UnorderedScan
193/// SeqScan o-- ScanInput
194/// UnorderedScan o-- ScanInput
195/// Scanner -- SendableRecordBatchStream
196/// ScanInput o-- ProjectionMapper
197/// SeqScan -- SendableRecordBatchStream
198/// UnorderedScan -- SendableRecordBatchStream
199/// ```
200pub(crate) struct ScanRegion {
201    /// Version of the region at scan.
202    version: VersionRef,
203    /// Access layer of the region.
204    access_layer: AccessLayerRef,
205    /// Scan request.
206    request: ScanRequest,
207    /// Cache.
208    cache_strategy: CacheStrategy,
209    /// Capacity of the channel to send data from parallel scan tasks to the main task.
210    parallel_scan_channel_size: usize,
211    /// Maximum number of SST files to scan concurrently.
212    max_concurrent_scan_files: usize,
213    /// Whether to ignore inverted index.
214    ignore_inverted_index: bool,
215    /// Whether to ignore fulltext index.
216    ignore_fulltext_index: bool,
217    /// Whether to ignore bloom filter.
218    ignore_bloom_filter: bool,
219    /// Start time of the scan task.
220    start_time: Option<Instant>,
221    /// Whether to filter out the deleted rows.
222    /// Usually true for normal read, and false for scan for compaction.
223    filter_deleted: bool,
224    /// Whether to use flat format.
225    flat_format: bool,
226    #[cfg(feature = "enterprise")]
227    extension_range_provider: Option<BoxedExtensionRangeProvider>,
228}
229
230impl ScanRegion {
231    /// Creates a [ScanRegion].
232    pub(crate) fn new(
233        version: VersionRef,
234        access_layer: AccessLayerRef,
235        request: ScanRequest,
236        cache_strategy: CacheStrategy,
237    ) -> ScanRegion {
238        ScanRegion {
239            version,
240            access_layer,
241            request,
242            cache_strategy,
243            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
244            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
245            ignore_inverted_index: false,
246            ignore_fulltext_index: false,
247            ignore_bloom_filter: false,
248            start_time: None,
249            filter_deleted: true,
250            flat_format: false,
251            #[cfg(feature = "enterprise")]
252            extension_range_provider: None,
253        }
254    }
255
256    /// Sets parallel scan task channel size.
257    #[must_use]
258    pub(crate) fn with_parallel_scan_channel_size(
259        mut self,
260        parallel_scan_channel_size: usize,
261    ) -> Self {
262        self.parallel_scan_channel_size = parallel_scan_channel_size;
263        self
264    }
265
266    /// Sets maximum number of SST files to scan concurrently.
267    #[must_use]
268    pub(crate) fn with_max_concurrent_scan_files(
269        mut self,
270        max_concurrent_scan_files: usize,
271    ) -> Self {
272        self.max_concurrent_scan_files = max_concurrent_scan_files;
273        self
274    }
275
276    /// Sets whether to ignore inverted index.
277    #[must_use]
278    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
279        self.ignore_inverted_index = ignore;
280        self
281    }
282
283    /// Sets whether to ignore fulltext index.
284    #[must_use]
285    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
286        self.ignore_fulltext_index = ignore;
287        self
288    }
289
290    /// Sets whether to ignore bloom filter.
291    #[must_use]
292    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
293        self.ignore_bloom_filter = ignore;
294        self
295    }
296
297    #[must_use]
298    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
299        self.start_time = Some(now);
300        self
301    }
302
303    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
304        self.filter_deleted = filter_deleted;
305    }
306
307    /// Sets whether to use flat format.
308    #[must_use]
309    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
310        self.flat_format = flat_format;
311        self
312    }
313
314    #[cfg(feature = "enterprise")]
315    pub(crate) fn set_extension_range_provider(
316        &mut self,
317        extension_range_provider: BoxedExtensionRangeProvider,
318    ) {
319        self.extension_range_provider = Some(extension_range_provider);
320    }
321
322    /// Returns a [Scanner] to scan the region.
323    pub(crate) async fn scanner(self) -> Result<Scanner> {
324        if self.use_series_scan() {
325            self.series_scan().await.map(Scanner::Series)
326        } else if self.use_unordered_scan() {
327            // If table is append only and there is no series row selector, we use unordered scan in query.
328            // We still use seq scan in compaction.
329            self.unordered_scan().await.map(Scanner::Unordered)
330        } else {
331            self.seq_scan().await.map(Scanner::Seq)
332        }
333    }
334
335    /// Returns a [RegionScanner] to scan the region.
336    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
337    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
338        if self.use_series_scan() {
339            self.series_scan()
340                .await
341                .map(|scanner| Box::new(scanner) as _)
342        } else if self.use_unordered_scan() {
343            self.unordered_scan()
344                .await
345                .map(|scanner| Box::new(scanner) as _)
346        } else {
347            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
348        }
349    }
350
351    /// Scan sequentially.
352    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
353        let input = self.scan_input().await?.with_compaction(false);
354        Ok(SeqScan::new(input))
355    }
356
357    /// Unordered scan.
358    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
359        let input = self.scan_input().await?;
360        Ok(UnorderedScan::new(input))
361    }
362
363    /// Scans by series.
364    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
365        let input = self.scan_input().await?;
366        Ok(SeriesScan::new(input))
367    }
368
369    /// Returns true if the region can use unordered scan for current request.
370    fn use_unordered_scan(&self) -> bool {
371        // We use unordered scan when:
372        // 1. The region is in append mode.
373        // 2. There is no series row selector.
374        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
375        //
376        // We still use seq scan in compaction.
377        self.version.options.append_mode
378            && self.request.series_row_selector.is_none()
379            && (self.request.distribution.is_none()
380                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
381    }
382
383    /// Returns true if the region can use series scan for current request.
384    fn use_series_scan(&self) -> bool {
385        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
386    }
387
388    /// Creates a scan input.
389    async fn scan_input(mut self) -> Result<ScanInput> {
390        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
391        let time_range = self.build_time_range_predicate();
392        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
393
394        // The mapper always computes projected column ids as the schema of SSTs may change.
395        let mapper = match &self.request.projection {
396            Some(p) => {
397                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
398            }
399            None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
400        };
401
402        let ssts = &self.version.ssts;
403        let mut files = Vec::new();
404        for level in ssts.levels() {
405            for file in level.files.values() {
406                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
407                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
408                    // If the file's sequence is None (or actually is zero), it could mean the file
409                    // is generated and added to the region "directly". In this case, its data should
410                    // be considered as fresh as the memtable. So its sequence is treated greater than
411                    // the min_sequence, whatever the value of min_sequence is. Hence the default
412                    // "true" in this arm.
413                    (Some(_), None) => true,
414                    (None, _) => true,
415                };
416
417                // Finds SST files in range.
418                if exceed_min_sequence && file_in_range(file, &time_range) {
419                    files.push(file.clone());
420                }
421                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
422                // unless the timing is too good, or the sequence number wouldn't be in file.
423                // and the batch will be filtered out by tree reader anyway.
424            }
425        }
426
427        let memtables = self.version.memtables.list_memtables();
428        // Skip empty memtables and memtables out of time range.
429        let mut mem_range_builders = Vec::new();
430        let filter_mode = pre_filter_mode(
431            self.version.options.append_mode,
432            self.version.options.merge_mode(),
433        );
434
435        for m in memtables {
436            // check if memtable is empty by reading stats.
437            let Some((start, end)) = m.stats().time_range() else {
438                continue;
439            };
440            // The time range of the memtable is inclusive.
441            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
442            if !memtable_range.intersects(&time_range) {
443                continue;
444            }
445            let ranges_in_memtable = m.ranges(
446                Some(mapper.column_ids()),
447                RangesOptions::default()
448                    .with_predicate(predicate.clone())
449                    .with_sequence(SequenceRange::new(
450                        self.request.memtable_min_sequence,
451                        self.request.memtable_max_sequence,
452                    ))
453                    .with_pre_filter_mode(filter_mode),
454            )?;
455            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
456                // todo: we should add stats to MemtableRange
457                let mut stats = ranges_in_memtable.stats.clone();
458                stats.num_ranges = 1;
459                stats.num_rows = v.num_rows();
460                MemRangeBuilder::new(v, stats)
461            }));
462        }
463
464        let region_id = self.region_id();
465        debug!(
466            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
467            region_id,
468            self.request,
469            time_range,
470            mem_range_builders.len(),
471            files.len(),
472            self.version.options.append_mode,
473        );
474
475        let (non_field_filters, field_filters) = self.partition_by_field_filters();
476        let inverted_index_appliers = [
477            self.build_invereted_index_applier(&non_field_filters),
478            self.build_invereted_index_applier(&field_filters),
479        ];
480        let bloom_filter_appliers = [
481            self.build_bloom_filter_applier(&non_field_filters),
482            self.build_bloom_filter_applier(&field_filters),
483        ];
484        let fulltext_index_appliers = [
485            self.build_fulltext_index_applier(&non_field_filters),
486            self.build_fulltext_index_applier(&field_filters),
487        ];
488        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
489
490        if self.flat_format {
491            // The batch is already large enough so we use a small channel size here.
492            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
493        }
494
495        let input = ScanInput::new(self.access_layer, mapper)
496            .with_time_range(Some(time_range))
497            .with_predicate(predicate)
498            .with_memtables(mem_range_builders)
499            .with_files(files)
500            .with_cache(self.cache_strategy)
501            .with_inverted_index_appliers(inverted_index_appliers)
502            .with_bloom_filter_index_appliers(bloom_filter_appliers)
503            .with_fulltext_index_appliers(fulltext_index_appliers)
504            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
505            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
506            .with_start_time(self.start_time)
507            .with_append_mode(self.version.options.append_mode)
508            .with_filter_deleted(self.filter_deleted)
509            .with_merge_mode(self.version.options.merge_mode())
510            .with_series_row_selector(self.request.series_row_selector)
511            .with_distribution(self.request.distribution)
512            .with_flat_format(self.flat_format);
513
514        #[cfg(feature = "enterprise")]
515        let input = if let Some(provider) = self.extension_range_provider {
516            let ranges = provider
517                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
518                .await?;
519            debug!("Find extension ranges: {ranges:?}");
520            input.with_extension_ranges(ranges)
521        } else {
522            input
523        };
524        Ok(input)
525    }
526
527    fn region_id(&self) -> RegionId {
528        self.version.metadata.region_id
529    }
530
531    /// Build time range predicate from filters.
532    fn build_time_range_predicate(&self) -> TimestampRange {
533        let time_index = self.version.metadata.time_index_column();
534        let unit = time_index
535            .column_schema
536            .data_type
537            .as_timestamp()
538            .expect("Time index must have timestamp-compatible type")
539            .unit();
540        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
541    }
542
543    /// Partitions filters into two groups: non-field filters and field filters.
544    /// Returns `(non_field_filters, field_filters)`.
545    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
546        let field_columns = self
547            .version
548            .metadata
549            .field_columns()
550            .map(|col| &col.column_schema.name)
551            .collect::<HashSet<_>>();
552
553        let mut columns = HashSet::new();
554
555        self.request.filters.iter().cloned().partition(|expr| {
556            columns.clear();
557            // `expr_to_columns` won't return error.
558            if expr_to_columns(expr, &mut columns).is_err() {
559                // If we can't extract columns, treat it as non-field filter
560                return true;
561            }
562            // Return true for non-field filters (partition puts true cases in first vec)
563            !columns
564                .iter()
565                .any(|column| field_columns.contains(&column.name))
566        })
567    }
568
569    /// Use the latest schema to build the inverted index applier.
570    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
571        if self.ignore_inverted_index {
572            return None;
573        }
574
575        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
576        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
577
578        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
579
580        InvertedIndexApplierBuilder::new(
581            self.access_layer.table_dir().to_string(),
582            self.access_layer.path_type(),
583            self.access_layer.object_store().clone(),
584            self.version.metadata.as_ref(),
585            self.version.metadata.inverted_indexed_column_ids(
586                self.version
587                    .options
588                    .index_options
589                    .inverted_index
590                    .ignore_column_ids
591                    .iter(),
592            ),
593            self.access_layer.puffin_manager_factory().clone(),
594        )
595        .with_file_cache(file_cache)
596        .with_inverted_index_cache(inverted_index_cache)
597        .with_puffin_metadata_cache(puffin_metadata_cache)
598        .build(filters)
599        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
600        .ok()
601        .flatten()
602        .map(Arc::new)
603    }
604
605    /// Use the latest schema to build the bloom filter index applier.
606    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
607        if self.ignore_bloom_filter {
608            return None;
609        }
610
611        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
612        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
613        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
614
615        BloomFilterIndexApplierBuilder::new(
616            self.access_layer.table_dir().to_string(),
617            self.access_layer.path_type(),
618            self.access_layer.object_store().clone(),
619            self.version.metadata.as_ref(),
620            self.access_layer.puffin_manager_factory().clone(),
621        )
622        .with_file_cache(file_cache)
623        .with_bloom_filter_index_cache(bloom_filter_index_cache)
624        .with_puffin_metadata_cache(puffin_metadata_cache)
625        .build(filters)
626        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
627        .ok()
628        .flatten()
629        .map(Arc::new)
630    }
631
632    /// Use the latest schema to build the fulltext index applier.
633    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
634        if self.ignore_fulltext_index {
635            return None;
636        }
637
638        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
639        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
640        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
641        FulltextIndexApplierBuilder::new(
642            self.access_layer.table_dir().to_string(),
643            self.access_layer.path_type(),
644            self.access_layer.object_store().clone(),
645            self.access_layer.puffin_manager_factory().clone(),
646            self.version.metadata.as_ref(),
647        )
648        .with_file_cache(file_cache)
649        .with_puffin_metadata_cache(puffin_metadata_cache)
650        .with_bloom_filter_cache(bloom_filter_index_cache)
651        .build(filters)
652        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
653        .ok()
654        .flatten()
655        .map(Arc::new)
656    }
657}
658
659/// Returns true if the time range of a SST `file` matches the `predicate`.
660fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
661    if predicate == &TimestampRange::min_to_max() {
662        return true;
663    }
664    // end timestamp of a SST is inclusive.
665    let (start, end) = file.time_range();
666    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
667    file_ts_range.intersects(predicate)
668}
669
670/// Common input for different scanners.
671pub struct ScanInput {
672    /// Region SST access layer.
673    access_layer: AccessLayerRef,
674    /// Maps projected Batches to RecordBatches.
675    pub(crate) mapper: Arc<ProjectionMapper>,
676    /// Time range filter for time index.
677    time_range: Option<TimestampRange>,
678    /// Predicate to push down.
679    pub(crate) predicate: PredicateGroup,
680    /// Region partition expr applied at read time.
681    region_partition_expr: Option<PartitionExpr>,
682    /// Memtable range builders for memtables in the time range..
683    pub(crate) memtables: Vec<MemRangeBuilder>,
684    /// Handles to SST files to scan.
685    pub(crate) files: Vec<FileHandle>,
686    /// Cache.
687    pub(crate) cache_strategy: CacheStrategy,
688    /// Ignores file not found error.
689    ignore_file_not_found: bool,
690    /// Capacity of the channel to send data from parallel scan tasks to the main task.
691    pub(crate) parallel_scan_channel_size: usize,
692    /// Maximum number of SST files to scan concurrently.
693    pub(crate) max_concurrent_scan_files: usize,
694    /// Index appliers.
695    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
696    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
697    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
698    /// Start time of the query.
699    pub(crate) query_start: Option<Instant>,
700    /// The region is using append mode.
701    pub(crate) append_mode: bool,
702    /// Whether to remove deletion markers.
703    pub(crate) filter_deleted: bool,
704    /// Mode to merge duplicate rows.
705    pub(crate) merge_mode: MergeMode,
706    /// Hint to select rows from time series.
707    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
708    /// Hint for the required distribution of the scanner.
709    pub(crate) distribution: Option<TimeSeriesDistribution>,
710    /// Whether to use flat format.
711    pub(crate) flat_format: bool,
712    /// Whether this scan is for compaction.
713    pub(crate) compaction: bool,
714    #[cfg(feature = "enterprise")]
715    extension_ranges: Vec<BoxedExtensionRange>,
716}
717
718impl ScanInput {
719    /// Creates a new [ScanInput].
720    #[must_use]
721    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
722        ScanInput {
723            access_layer,
724            mapper: Arc::new(mapper),
725            time_range: None,
726            predicate: PredicateGroup::default(),
727            region_partition_expr: None,
728            memtables: Vec::new(),
729            files: Vec::new(),
730            cache_strategy: CacheStrategy::Disabled,
731            ignore_file_not_found: false,
732            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
733            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
734            inverted_index_appliers: [None, None],
735            bloom_filter_index_appliers: [None, None],
736            fulltext_index_appliers: [None, None],
737            query_start: None,
738            append_mode: false,
739            filter_deleted: true,
740            merge_mode: MergeMode::default(),
741            series_row_selector: None,
742            distribution: None,
743            flat_format: false,
744            compaction: false,
745            #[cfg(feature = "enterprise")]
746            extension_ranges: Vec::new(),
747        }
748    }
749
750    /// Sets time range filter for time index.
751    #[must_use]
752    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
753        self.time_range = time_range;
754        self
755    }
756
757    /// Sets predicate to push down.
758    #[must_use]
759    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
760        self.region_partition_expr = predicate.region_partition_expr().cloned();
761        self.predicate = predicate;
762        self
763    }
764
765    /// Sets memtable range builders.
766    #[must_use]
767    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
768        self.memtables = memtables;
769        self
770    }
771
772    /// Sets files to read.
773    #[must_use]
774    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
775        self.files = files;
776        self
777    }
778
779    /// Sets cache for this query.
780    #[must_use]
781    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
782        self.cache_strategy = cache;
783        self
784    }
785
786    /// Ignores file not found error.
787    #[must_use]
788    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
789        self.ignore_file_not_found = ignore;
790        self
791    }
792
793    /// Sets scan task channel size.
794    #[must_use]
795    pub(crate) fn with_parallel_scan_channel_size(
796        mut self,
797        parallel_scan_channel_size: usize,
798    ) -> Self {
799        self.parallel_scan_channel_size = parallel_scan_channel_size;
800        self
801    }
802
803    /// Sets maximum number of SST files to scan concurrently.
804    #[must_use]
805    pub(crate) fn with_max_concurrent_scan_files(
806        mut self,
807        max_concurrent_scan_files: usize,
808    ) -> Self {
809        self.max_concurrent_scan_files = max_concurrent_scan_files;
810        self
811    }
812
813    /// Sets inverted index appliers.
814    #[must_use]
815    pub(crate) fn with_inverted_index_appliers(
816        mut self,
817        appliers: [Option<InvertedIndexApplierRef>; 2],
818    ) -> Self {
819        self.inverted_index_appliers = appliers;
820        self
821    }
822
823    /// Sets bloom filter appliers.
824    #[must_use]
825    pub(crate) fn with_bloom_filter_index_appliers(
826        mut self,
827        appliers: [Option<BloomFilterIndexApplierRef>; 2],
828    ) -> Self {
829        self.bloom_filter_index_appliers = appliers;
830        self
831    }
832
833    /// Sets fulltext index appliers.
834    #[must_use]
835    pub(crate) fn with_fulltext_index_appliers(
836        mut self,
837        appliers: [Option<FulltextIndexApplierRef>; 2],
838    ) -> Self {
839        self.fulltext_index_appliers = appliers;
840        self
841    }
842
843    /// Sets start time of the query.
844    #[must_use]
845    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
846        self.query_start = now;
847        self
848    }
849
850    #[must_use]
851    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
852        self.append_mode = is_append_mode;
853        self
854    }
855
856    /// Sets whether to remove deletion markers during scan.
857    #[must_use]
858    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
859        self.filter_deleted = filter_deleted;
860        self
861    }
862
863    /// Sets the merge mode.
864    #[must_use]
865    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
866        self.merge_mode = merge_mode;
867        self
868    }
869
870    /// Sets the distribution hint.
871    #[must_use]
872    pub(crate) fn with_distribution(
873        mut self,
874        distribution: Option<TimeSeriesDistribution>,
875    ) -> Self {
876        self.distribution = distribution;
877        self
878    }
879
880    /// Sets the time series row selector.
881    #[must_use]
882    pub(crate) fn with_series_row_selector(
883        mut self,
884        series_row_selector: Option<TimeSeriesRowSelector>,
885    ) -> Self {
886        self.series_row_selector = series_row_selector;
887        self
888    }
889
890    /// Sets whether to use flat format.
891    #[must_use]
892    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
893        self.flat_format = flat_format;
894        self
895    }
896
897    /// Sets whether this scan is for compaction.
898    #[must_use]
899    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
900        self.compaction = compaction;
901        self
902    }
903
904    /// Scans sources in parallel.
905    ///
906    /// # Panics if the input doesn't allow parallel scan.
907    pub(crate) fn create_parallel_sources(
908        &self,
909        sources: Vec<Source>,
910        semaphore: Arc<Semaphore>,
911    ) -> Result<Vec<Source>> {
912        if sources.len() <= 1 {
913            return Ok(sources);
914        }
915
916        // Spawn a task for each source.
917        let sources = sources
918            .into_iter()
919            .map(|source| {
920                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
921                self.spawn_scan_task(source, semaphore.clone(), sender);
922                let stream = Box::pin(ReceiverStream::new(receiver));
923                Source::Stream(stream)
924            })
925            .collect();
926        Ok(sources)
927    }
928
929    /// Builds memtable ranges to scan by `index`.
930    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
931        let memtable = &self.memtables[index.index];
932        let mut ranges = SmallVec::new();
933        memtable.build_ranges(index.row_group_index, &mut ranges);
934        ranges
935    }
936
937    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
938        if self.should_skip_region_partition(file) {
939            self.predicate.predicate_without_region().cloned()
940        } else {
941            self.predicate.predicate().cloned()
942        }
943    }
944
945    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
946        match (
947            self.region_partition_expr.as_ref(),
948            file.meta_ref().partition_expr.as_ref(),
949        ) {
950            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
951            _ => false,
952        }
953    }
954
955    /// Prunes a file to scan and returns the builder to build readers.
956    pub async fn prune_file(
957        &self,
958        file: &FileHandle,
959        reader_metrics: &mut ReaderMetrics,
960    ) -> Result<FileRangeBuilder> {
961        let predicate = self.predicate_for_file(file);
962        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
963        let res = self
964            .access_layer
965            .read_sst(file.clone())
966            .predicate(predicate)
967            .projection(Some(self.mapper.column_ids().to_vec()))
968            .cache(self.cache_strategy.clone())
969            .inverted_index_appliers(self.inverted_index_appliers.clone())
970            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
971            .fulltext_index_appliers(self.fulltext_index_appliers.clone())
972            .expected_metadata(Some(self.mapper.metadata().clone()))
973            .flat_format(self.flat_format)
974            .compaction(self.compaction)
975            .pre_filter_mode(filter_mode)
976            .build_reader_input(reader_metrics)
977            .await;
978        let (mut file_range_ctx, selection) = match res {
979            Ok(x) => x,
980            Err(e) => {
981                if e.is_object_not_found() && self.ignore_file_not_found {
982                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
983                    return Ok(FileRangeBuilder::default());
984                } else {
985                    return Err(e);
986                }
987            }
988        };
989
990        let need_compat = !compat::has_same_columns_and_pk_encoding(
991            self.mapper.metadata(),
992            file_range_ctx.read_format().metadata(),
993        );
994        if need_compat {
995            // They have different schema. We need to adapt the batch first so the
996            // mapper can convert it.
997            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
998                let mapper = self.mapper.as_flat().unwrap();
999                FlatCompatBatch::try_new(
1000                    mapper,
1001                    flat_format.metadata(),
1002                    flat_format.format_projection(),
1003                    self.compaction,
1004                )?
1005                .map(CompatBatch::Flat)
1006            } else {
1007                let compact_batch = PrimaryKeyCompatBatch::new(
1008                    &self.mapper,
1009                    file_range_ctx.read_format().metadata().clone(),
1010                )?;
1011                Some(CompatBatch::PrimaryKey(compact_batch))
1012            };
1013            file_range_ctx.set_compat_batch(compat);
1014        }
1015        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1016    }
1017
1018    /// Scans the input source in another task and sends batches to the sender.
1019    pub(crate) fn spawn_scan_task(
1020        &self,
1021        mut input: Source,
1022        semaphore: Arc<Semaphore>,
1023        sender: mpsc::Sender<Result<Batch>>,
1024    ) {
1025        common_runtime::spawn_global(async move {
1026            loop {
1027                // We release the permit before sending result to avoid the task waiting on
1028                // the channel with the permit held.
1029                let maybe_batch = {
1030                    // Safety: We never close the semaphore.
1031                    let _permit = semaphore.acquire().await.unwrap();
1032                    input.next_batch().await
1033                };
1034                match maybe_batch {
1035                    Ok(Some(batch)) => {
1036                        let _ = sender.send(Ok(batch)).await;
1037                    }
1038                    Ok(None) => break,
1039                    Err(e) => {
1040                        let _ = sender.send(Err(e)).await;
1041                        break;
1042                    }
1043                }
1044            }
1045        });
1046    }
1047
1048    /// Scans flat sources (RecordBatch streams) in parallel.
1049    ///
1050    /// # Panics if the input doesn't allow parallel scan.
1051    pub(crate) fn create_parallel_flat_sources(
1052        &self,
1053        sources: Vec<BoxedRecordBatchStream>,
1054        semaphore: Arc<Semaphore>,
1055    ) -> Result<Vec<BoxedRecordBatchStream>> {
1056        if sources.len() <= 1 {
1057            return Ok(sources);
1058        }
1059
1060        // Spawn a task for each source.
1061        let sources = sources
1062            .into_iter()
1063            .map(|source| {
1064                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1065                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1066                let stream = Box::pin(ReceiverStream::new(receiver));
1067                Box::pin(stream) as _
1068            })
1069            .collect();
1070        Ok(sources)
1071    }
1072
1073    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1074    pub(crate) fn spawn_flat_scan_task(
1075        &self,
1076        mut input: BoxedRecordBatchStream,
1077        semaphore: Arc<Semaphore>,
1078        sender: mpsc::Sender<Result<RecordBatch>>,
1079    ) {
1080        common_runtime::spawn_global(async move {
1081            loop {
1082                // We release the permit before sending result to avoid the task waiting on
1083                // the channel with the permit held.
1084                let maybe_batch = {
1085                    // Safety: We never close the semaphore.
1086                    let _permit = semaphore.acquire().await.unwrap();
1087                    input.next().await
1088                };
1089                match maybe_batch {
1090                    Some(Ok(batch)) => {
1091                        let _ = sender.send(Ok(batch)).await;
1092                    }
1093                    Some(Err(e)) => {
1094                        let _ = sender.send(Err(e)).await;
1095                        break;
1096                    }
1097                    None => break,
1098                }
1099            }
1100        });
1101    }
1102
1103    pub(crate) fn total_rows(&self) -> usize {
1104        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1105        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1106
1107        let rows = rows_in_files + rows_in_memtables;
1108        #[cfg(feature = "enterprise")]
1109        let rows = rows
1110            + self
1111                .extension_ranges
1112                .iter()
1113                .map(|x| x.num_rows())
1114                .sum::<u64>() as usize;
1115        rows
1116    }
1117
1118    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1119        &self.predicate
1120    }
1121
1122    /// Returns number of memtables to scan.
1123    pub(crate) fn num_memtables(&self) -> usize {
1124        self.memtables.len()
1125    }
1126
1127    /// Returns number of SST files to scan.
1128    pub(crate) fn num_files(&self) -> usize {
1129        self.files.len()
1130    }
1131
1132    pub fn region_metadata(&self) -> &RegionMetadataRef {
1133        self.mapper.metadata()
1134    }
1135}
1136
1137#[cfg(feature = "enterprise")]
1138impl ScanInput {
1139    #[must_use]
1140    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1141        Self {
1142            extension_ranges,
1143            ..self
1144        }
1145    }
1146
1147    #[cfg(feature = "enterprise")]
1148    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1149        &self.extension_ranges
1150    }
1151
1152    /// Get a boxed [ExtensionRange] by the index in all ranges.
1153    #[cfg(feature = "enterprise")]
1154    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1155        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1156    }
1157}
1158
1159#[cfg(test)]
1160impl ScanInput {
1161    /// Returns SST file ids to scan.
1162    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1163        self.files.iter().map(|file| file.file_id()).collect()
1164    }
1165}
1166
1167fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1168    if append_mode {
1169        return PreFilterMode::All;
1170    }
1171
1172    match merge_mode {
1173        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1174        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1175    }
1176}
1177
1178/// Context shared by different streams from a scanner.
1179/// It contains the input and ranges to scan.
1180pub struct StreamContext {
1181    /// Input memtables and files.
1182    pub input: ScanInput,
1183    /// Metadata for partition ranges.
1184    pub(crate) ranges: Vec<RangeMeta>,
1185
1186    // Metrics:
1187    /// The start time of the query.
1188    pub(crate) query_start: Instant,
1189}
1190
1191impl StreamContext {
1192    /// Creates a new [StreamContext] for [SeqScan].
1193    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1194        let query_start = input.query_start.unwrap_or_else(Instant::now);
1195        let ranges = RangeMeta::seq_scan_ranges(&input);
1196        READ_SST_COUNT.observe(input.num_files() as f64);
1197
1198        Self {
1199            input,
1200            ranges,
1201            query_start,
1202        }
1203    }
1204
1205    /// Creates a new [StreamContext] for [UnorderedScan].
1206    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1207        let query_start = input.query_start.unwrap_or_else(Instant::now);
1208        let ranges = RangeMeta::unordered_scan_ranges(&input);
1209        READ_SST_COUNT.observe(input.num_files() as f64);
1210
1211        Self {
1212            input,
1213            ranges,
1214            query_start,
1215        }
1216    }
1217
1218    /// Returns true if the index refers to a memtable.
1219    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1220        self.input.num_memtables() > index.index
1221    }
1222
1223    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1224        !self.is_mem_range_index(index)
1225            && index.index < self.input.num_files() + self.input.num_memtables()
1226    }
1227
1228    /// Retrieves the partition ranges.
1229    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1230        self.ranges
1231            .iter()
1232            .enumerate()
1233            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1234            .collect()
1235    }
1236
1237    /// Format the context for explain.
1238    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1239        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1240        for range_meta in &self.ranges {
1241            for idx in &range_meta.row_group_indices {
1242                if self.is_mem_range_index(*idx) {
1243                    num_mem_ranges += 1;
1244                } else if self.is_file_range_index(*idx) {
1245                    num_file_ranges += 1;
1246                } else {
1247                    num_other_ranges += 1;
1248                }
1249            }
1250        }
1251        if verbose {
1252            write!(f, "{{")?;
1253        }
1254        write!(
1255            f,
1256            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1257            self.ranges.len(),
1258            num_mem_ranges,
1259            self.input.num_files(),
1260            num_file_ranges,
1261        )?;
1262        if num_other_ranges > 0 {
1263            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1264        }
1265        write!(f, "}}")?;
1266
1267        if let Some(selector) = &self.input.series_row_selector {
1268            write!(f, ", \"selector\":\"{}\"", selector)?;
1269        }
1270        if let Some(distribution) = &self.input.distribution {
1271            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1272        }
1273
1274        if verbose {
1275            self.format_verbose_content(f)?;
1276        }
1277
1278        Ok(())
1279    }
1280
1281    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1282        struct FileWrapper<'a> {
1283            file: &'a FileHandle,
1284        }
1285
1286        impl fmt::Debug for FileWrapper<'_> {
1287            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1288                let (start, end) = self.file.time_range();
1289                write!(
1290                    f,
1291                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1292                    self.file.file_id(),
1293                    start.value(),
1294                    start.unit(),
1295                    end.value(),
1296                    end.unit(),
1297                    self.file.num_rows(),
1298                    self.file.size(),
1299                    self.file.index_size()
1300                )
1301            }
1302        }
1303
1304        struct InputWrapper<'a> {
1305            input: &'a ScanInput,
1306        }
1307
1308        #[cfg(feature = "enterprise")]
1309        impl InputWrapper<'_> {
1310            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1311                if self.input.extension_ranges.is_empty() {
1312                    return Ok(());
1313                }
1314
1315                let mut delimiter = "";
1316                write!(f, ", extension_ranges: [")?;
1317                for range in self.input.extension_ranges() {
1318                    write!(f, "{}{:?}", delimiter, range)?;
1319                    delimiter = ", ";
1320                }
1321                write!(f, "]")?;
1322                Ok(())
1323            }
1324        }
1325
1326        impl fmt::Debug for InputWrapper<'_> {
1327            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1328                let output_schema = self.input.mapper.output_schema();
1329                if !output_schema.is_empty() {
1330                    let names: Vec<_> = output_schema
1331                        .column_schemas()
1332                        .iter()
1333                        .map(|col| &col.name)
1334                        .collect();
1335                    write!(f, ", \"projection\": {:?}", names)?;
1336                }
1337                if let Some(predicate) = &self.input.predicate.predicate()
1338                    && !predicate.exprs().is_empty()
1339                {
1340                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1341                    write!(f, ", \"filters\": {:?}", exprs)?;
1342                }
1343                if !self.input.files.is_empty() {
1344                    write!(f, ", \"files\": ")?;
1345                    f.debug_list()
1346                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1347                        .finish()?;
1348                }
1349
1350                #[cfg(feature = "enterprise")]
1351                self.format_extension_ranges(f)?;
1352
1353                Ok(())
1354            }
1355        }
1356
1357        write!(f, "{:?}", InputWrapper { input: &self.input })
1358    }
1359}
1360
1361/// Predicates to evaluate.
1362/// It only keeps filters that [SimpleFilterEvaluator] supports.
1363#[derive(Clone, Default)]
1364pub struct PredicateGroup {
1365    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1366    /// Predicate that includes request filters and region partition expr (if any).
1367    predicate_all: Option<Predicate>,
1368    /// Predicate that only includes request filters.
1369    predicate_without_region: Option<Predicate>,
1370    /// Region partition expression restored from metadata.
1371    region_partition_expr: Option<PartitionExpr>,
1372}
1373
1374impl PredicateGroup {
1375    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1376    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1377        let mut combined_exprs = exprs.to_vec();
1378        let mut region_partition_expr = None;
1379
1380        if let Some(expr_json) = metadata.partition_expr.as_ref()
1381            && !expr_json.is_empty()
1382            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1383                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1384        {
1385            let logical_expr = expr
1386                .try_as_logical_expr()
1387                .context(InvalidPartitionExprSnafu {
1388                    expr: expr_json.clone(),
1389                })?;
1390
1391            combined_exprs.push(logical_expr);
1392            region_partition_expr = Some(expr);
1393        }
1394
1395        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1396        // Columns in the expr.
1397        let mut columns = HashSet::new();
1398        for expr in &combined_exprs {
1399            columns.clear();
1400            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1401                continue;
1402            };
1403            time_filters.push(filter);
1404        }
1405        let time_filters = if time_filters.is_empty() {
1406            None
1407        } else {
1408            Some(Arc::new(time_filters))
1409        };
1410
1411        let predicate_all = if combined_exprs.is_empty() {
1412            None
1413        } else {
1414            Some(Predicate::new(combined_exprs))
1415        };
1416        let predicate_without_region = if exprs.is_empty() {
1417            None
1418        } else {
1419            Some(Predicate::new(exprs.to_vec()))
1420        };
1421
1422        Ok(Self {
1423            time_filters,
1424            predicate_all,
1425            predicate_without_region,
1426            region_partition_expr,
1427        })
1428    }
1429
1430    /// Returns time filters.
1431    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1432        self.time_filters.clone()
1433    }
1434
1435    /// Returns predicate of all exprs (including region partition expr if present).
1436    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1437        self.predicate_all.as_ref()
1438    }
1439
1440    /// Returns predicate that excludes region partition expr.
1441    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1442        self.predicate_without_region.as_ref()
1443    }
1444
1445    /// Returns the region partition expr from metadata, if any.
1446    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1447        self.region_partition_expr.as_ref()
1448    }
1449
1450    fn expr_to_filter(
1451        expr: &Expr,
1452        metadata: &RegionMetadata,
1453        columns: &mut HashSet<Column>,
1454    ) -> Option<SimpleFilterEvaluator> {
1455        columns.clear();
1456        // `expr_to_columns` won't return error.
1457        // We still ignore these expressions for safety.
1458        expr_to_columns(expr, columns).ok()?;
1459        if columns.len() > 1 {
1460            // Simple filter doesn't support multiple columns.
1461            return None;
1462        }
1463        let column = columns.iter().next()?;
1464        let column_meta = metadata.column_by_name(&column.name)?;
1465        if column_meta.semantic_type == SemanticType::Timestamp {
1466            SimpleFilterEvaluator::try_new(expr)
1467        } else {
1468            None
1469        }
1470    }
1471}