mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39    RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::FormatType;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
67};
68use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
69use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
70use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
71use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
72use crate::sst::parquet::file_range::PreFilterMode;
73use crate::sst::parquet::reader::ReaderMetrics;
74
75/// Parallel scan channel size for flat format.
76const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
77
78/// A scanner scans a region and returns a [SendableRecordBatchStream].
79pub(crate) enum Scanner {
80    /// Sequential scan.
81    Seq(SeqScan),
82    /// Unordered scan.
83    Unordered(UnorderedScan),
84    /// Per-series scan.
85    Series(SeriesScan),
86}
87
88impl Scanner {
89    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
90    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
91    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
92        match self {
93            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
94            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
95            Scanner::Series(series_scan) => series_scan.build_stream().await,
96        }
97    }
98
99    /// Create a stream of [`Batch`] by this scanner.
100    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
101        match self {
102            Scanner::Seq(x) => x.scan_all_partitions(),
103            Scanner::Unordered(x) => x.scan_all_partitions(),
104            Scanner::Series(x) => x.scan_all_partitions(),
105        }
106    }
107}
108
109#[cfg(test)]
110impl Scanner {
111    /// Returns number of files to scan.
112    pub(crate) fn num_files(&self) -> usize {
113        match self {
114            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
115            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
116            Scanner::Series(series_scan) => series_scan.input().num_files(),
117        }
118    }
119
120    /// Returns number of memtables to scan.
121    pub(crate) fn num_memtables(&self) -> usize {
122        match self {
123            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
124            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
125            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
126        }
127    }
128
129    /// Returns SST file ids to scan.
130    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
131        match self {
132            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
133            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
134            Scanner::Series(series_scan) => series_scan.input().file_ids(),
135        }
136    }
137
138    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
139    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
140        use store_api::region_engine::{PrepareRequest, RegionScanner};
141
142        let request = PrepareRequest::default().with_target_partitions(target_partitions);
143        match self {
144            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
145            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
146            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
147        }
148    }
149}
150
151#[cfg_attr(doc, aquamarine::aquamarine)]
152/// Helper to scans a region by [ScanRequest].
153///
154/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
155/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
156///
157/// ```mermaid
158/// classDiagram
159/// class ScanRegion {
160///     -VersionRef version
161///     -ScanRequest request
162///     ~scanner() Scanner
163///     ~seq_scan() SeqScan
164/// }
165/// class Scanner {
166///     <<enumeration>>
167///     SeqScan
168///     UnorderedScan
169///     +scan() SendableRecordBatchStream
170/// }
171/// class SeqScan {
172///     -ScanInput input
173///     +build() SendableRecordBatchStream
174/// }
175/// class UnorderedScan {
176///     -ScanInput input
177///     +build() SendableRecordBatchStream
178/// }
179/// class ScanInput {
180///     -ProjectionMapper mapper
181///     -Option~TimeRange~ time_range
182///     -Option~Predicate~ predicate
183///     -Vec~MemtableRef~ memtables
184///     -Vec~FileHandle~ files
185/// }
186/// class ProjectionMapper {
187///     ~output_schema() SchemaRef
188///     ~convert(Batch) RecordBatch
189/// }
190/// ScanRegion -- Scanner
191/// ScanRegion o-- ScanRequest
192/// Scanner o-- SeqScan
193/// Scanner o-- UnorderedScan
194/// SeqScan o-- ScanInput
195/// UnorderedScan o-- ScanInput
196/// Scanner -- SendableRecordBatchStream
197/// ScanInput o-- ProjectionMapper
198/// SeqScan -- SendableRecordBatchStream
199/// UnorderedScan -- SendableRecordBatchStream
200/// ```
201pub(crate) struct ScanRegion {
202    /// Version of the region at scan.
203    version: VersionRef,
204    /// Access layer of the region.
205    access_layer: AccessLayerRef,
206    /// Scan request.
207    request: ScanRequest,
208    /// Cache.
209    cache_strategy: CacheStrategy,
210    /// Capacity of the channel to send data from parallel scan tasks to the main task.
211    parallel_scan_channel_size: usize,
212    /// Maximum number of SST files to scan concurrently.
213    max_concurrent_scan_files: usize,
214    /// Whether to ignore inverted index.
215    ignore_inverted_index: bool,
216    /// Whether to ignore fulltext index.
217    ignore_fulltext_index: bool,
218    /// Whether to ignore bloom filter.
219    ignore_bloom_filter: bool,
220    /// Start time of the scan task.
221    start_time: Option<Instant>,
222    /// Whether to filter out the deleted rows.
223    /// Usually true for normal read, and false for scan for compaction.
224    filter_deleted: bool,
225    #[cfg(feature = "enterprise")]
226    extension_range_provider: Option<BoxedExtensionRangeProvider>,
227}
228
229impl ScanRegion {
230    /// Creates a [ScanRegion].
231    pub(crate) fn new(
232        version: VersionRef,
233        access_layer: AccessLayerRef,
234        request: ScanRequest,
235        cache_strategy: CacheStrategy,
236    ) -> ScanRegion {
237        ScanRegion {
238            version,
239            access_layer,
240            request,
241            cache_strategy,
242            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
243            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
244            ignore_inverted_index: false,
245            ignore_fulltext_index: false,
246            ignore_bloom_filter: false,
247            start_time: None,
248            filter_deleted: true,
249            #[cfg(feature = "enterprise")]
250            extension_range_provider: None,
251        }
252    }
253
254    /// Sets parallel scan task channel size.
255    #[must_use]
256    pub(crate) fn with_parallel_scan_channel_size(
257        mut self,
258        parallel_scan_channel_size: usize,
259    ) -> Self {
260        self.parallel_scan_channel_size = parallel_scan_channel_size;
261        self
262    }
263
264    /// Sets maximum number of SST files to scan concurrently.
265    #[must_use]
266    pub(crate) fn with_max_concurrent_scan_files(
267        mut self,
268        max_concurrent_scan_files: usize,
269    ) -> Self {
270        self.max_concurrent_scan_files = max_concurrent_scan_files;
271        self
272    }
273
274    /// Sets whether to ignore inverted index.
275    #[must_use]
276    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
277        self.ignore_inverted_index = ignore;
278        self
279    }
280
281    /// Sets whether to ignore fulltext index.
282    #[must_use]
283    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
284        self.ignore_fulltext_index = ignore;
285        self
286    }
287
288    /// Sets whether to ignore bloom filter.
289    #[must_use]
290    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
291        self.ignore_bloom_filter = ignore;
292        self
293    }
294
295    #[must_use]
296    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
297        self.start_time = Some(now);
298        self
299    }
300
301    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
302        self.filter_deleted = filter_deleted;
303    }
304
305    #[cfg(feature = "enterprise")]
306    pub(crate) fn set_extension_range_provider(
307        &mut self,
308        extension_range_provider: BoxedExtensionRangeProvider,
309    ) {
310        self.extension_range_provider = Some(extension_range_provider);
311    }
312
313    /// Returns a [Scanner] to scan the region.
314    pub(crate) async fn scanner(self) -> Result<Scanner> {
315        if self.use_series_scan() {
316            self.series_scan().await.map(Scanner::Series)
317        } else if self.use_unordered_scan() {
318            // If table is append only and there is no series row selector, we use unordered scan in query.
319            // We still use seq scan in compaction.
320            self.unordered_scan().await.map(Scanner::Unordered)
321        } else {
322            self.seq_scan().await.map(Scanner::Seq)
323        }
324    }
325
326    /// Returns a [RegionScanner] to scan the region.
327    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
328    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
329        if self.use_series_scan() {
330            self.series_scan()
331                .await
332                .map(|scanner| Box::new(scanner) as _)
333        } else if self.use_unordered_scan() {
334            self.unordered_scan()
335                .await
336                .map(|scanner| Box::new(scanner) as _)
337        } else {
338            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
339        }
340    }
341
342    /// Scan sequentially.
343    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
344        let input = self.scan_input().await?.with_compaction(false);
345        Ok(SeqScan::new(input))
346    }
347
348    /// Unordered scan.
349    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
350        let input = self.scan_input().await?;
351        Ok(UnorderedScan::new(input))
352    }
353
354    /// Scans by series.
355    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
356        let input = self.scan_input().await?;
357        Ok(SeriesScan::new(input))
358    }
359
360    /// Returns true if the region can use unordered scan for current request.
361    fn use_unordered_scan(&self) -> bool {
362        // We use unordered scan when:
363        // 1. The region is in append mode.
364        // 2. There is no series row selector.
365        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
366        //
367        // We still use seq scan in compaction.
368        self.version.options.append_mode
369            && self.request.series_row_selector.is_none()
370            && (self.request.distribution.is_none()
371                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
372    }
373
374    /// Returns true if the region can use series scan for current request.
375    fn use_series_scan(&self) -> bool {
376        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
377    }
378
379    /// Returns true if the region use flat format.
380    fn use_flat_format(&self) -> bool {
381        self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
382    }
383
384    /// Creates a scan input.
385    async fn scan_input(mut self) -> Result<ScanInput> {
386        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
387        let time_range = self.build_time_range_predicate();
388        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
389        let flat_format = self.use_flat_format();
390
391        // The mapper always computes projected column ids as the schema of SSTs may change.
392        let mapper = match &self.request.projection {
393            Some(p) => {
394                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
395            }
396            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
397        };
398
399        let ssts = &self.version.ssts;
400        let mut files = Vec::new();
401        for level in ssts.levels() {
402            for file in level.files.values() {
403                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
404                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
405                    // If the file's sequence is None (or actually is zero), it could mean the file
406                    // is generated and added to the region "directly". In this case, its data should
407                    // be considered as fresh as the memtable. So its sequence is treated greater than
408                    // the min_sequence, whatever the value of min_sequence is. Hence the default
409                    // "true" in this arm.
410                    (Some(_), None) => true,
411                    (None, _) => true,
412                };
413
414                // Finds SST files in range.
415                if exceed_min_sequence && file_in_range(file, &time_range) {
416                    files.push(file.clone());
417                }
418                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
419                // unless the timing is too good, or the sequence number wouldn't be in file.
420                // and the batch will be filtered out by tree reader anyway.
421            }
422        }
423
424        let memtables = self.version.memtables.list_memtables();
425        // Skip empty memtables and memtables out of time range.
426        let mut mem_range_builders = Vec::new();
427        let filter_mode = pre_filter_mode(
428            self.version.options.append_mode,
429            self.version.options.merge_mode(),
430        );
431
432        for m in memtables {
433            // check if memtable is empty by reading stats.
434            let Some((start, end)) = m.stats().time_range() else {
435                continue;
436            };
437            // The time range of the memtable is inclusive.
438            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
439            if !memtable_range.intersects(&time_range) {
440                continue;
441            }
442            let ranges_in_memtable = m.ranges(
443                Some(mapper.column_ids()),
444                RangesOptions::default()
445                    .with_predicate(predicate.clone())
446                    .with_sequence(SequenceRange::new(
447                        self.request.memtable_min_sequence,
448                        self.request.memtable_max_sequence,
449                    ))
450                    .with_pre_filter_mode(filter_mode),
451            )?;
452            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
453                // todo: we should add stats to MemtableRange
454                let mut stats = ranges_in_memtable.stats.clone();
455                stats.num_ranges = 1;
456                stats.num_rows = v.num_rows();
457                MemRangeBuilder::new(v, stats)
458            }));
459        }
460
461        let region_id = self.region_id();
462        debug!(
463            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
464            region_id,
465            self.request,
466            time_range,
467            mem_range_builders.len(),
468            files.len(),
469            self.version.options.append_mode,
470            flat_format,
471        );
472
473        let (non_field_filters, field_filters) = self.partition_by_field_filters();
474        let inverted_index_appliers = [
475            self.build_invereted_index_applier(&non_field_filters),
476            self.build_invereted_index_applier(&field_filters),
477        ];
478        let bloom_filter_appliers = [
479            self.build_bloom_filter_applier(&non_field_filters),
480            self.build_bloom_filter_applier(&field_filters),
481        ];
482        let fulltext_index_appliers = [
483            self.build_fulltext_index_applier(&non_field_filters),
484            self.build_fulltext_index_applier(&field_filters),
485        ];
486        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
487
488        if flat_format {
489            // The batch is already large enough so we use a small channel size here.
490            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
491        }
492
493        let input = ScanInput::new(self.access_layer, mapper)
494            .with_time_range(Some(time_range))
495            .with_predicate(predicate)
496            .with_memtables(mem_range_builders)
497            .with_files(files)
498            .with_cache(self.cache_strategy)
499            .with_inverted_index_appliers(inverted_index_appliers)
500            .with_bloom_filter_index_appliers(bloom_filter_appliers)
501            .with_fulltext_index_appliers(fulltext_index_appliers)
502            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
503            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
504            .with_start_time(self.start_time)
505            .with_append_mode(self.version.options.append_mode)
506            .with_filter_deleted(self.filter_deleted)
507            .with_merge_mode(self.version.options.merge_mode())
508            .with_series_row_selector(self.request.series_row_selector)
509            .with_distribution(self.request.distribution)
510            .with_flat_format(flat_format);
511
512        #[cfg(feature = "enterprise")]
513        let input = if let Some(provider) = self.extension_range_provider {
514            let ranges = provider
515                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
516                .await?;
517            debug!("Find extension ranges: {ranges:?}");
518            input.with_extension_ranges(ranges)
519        } else {
520            input
521        };
522        Ok(input)
523    }
524
525    fn region_id(&self) -> RegionId {
526        self.version.metadata.region_id
527    }
528
529    /// Build time range predicate from filters.
530    fn build_time_range_predicate(&self) -> TimestampRange {
531        let time_index = self.version.metadata.time_index_column();
532        let unit = time_index
533            .column_schema
534            .data_type
535            .as_timestamp()
536            .expect("Time index must have timestamp-compatible type")
537            .unit();
538        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
539    }
540
541    /// Partitions filters into two groups: non-field filters and field filters.
542    /// Returns `(non_field_filters, field_filters)`.
543    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
544        let field_columns = self
545            .version
546            .metadata
547            .field_columns()
548            .map(|col| &col.column_schema.name)
549            .collect::<HashSet<_>>();
550
551        let mut columns = HashSet::new();
552
553        self.request.filters.iter().cloned().partition(|expr| {
554            columns.clear();
555            // `expr_to_columns` won't return error.
556            if expr_to_columns(expr, &mut columns).is_err() {
557                // If we can't extract columns, treat it as non-field filter
558                return true;
559            }
560            // Return true for non-field filters (partition puts true cases in first vec)
561            !columns
562                .iter()
563                .any(|column| field_columns.contains(&column.name))
564        })
565    }
566
567    /// Use the latest schema to build the inverted index applier.
568    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
569        if self.ignore_inverted_index {
570            return None;
571        }
572
573        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
574        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
575
576        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
577
578        InvertedIndexApplierBuilder::new(
579            self.access_layer.table_dir().to_string(),
580            self.access_layer.path_type(),
581            self.access_layer.object_store().clone(),
582            self.version.metadata.as_ref(),
583            self.version.metadata.inverted_indexed_column_ids(
584                self.version
585                    .options
586                    .index_options
587                    .inverted_index
588                    .ignore_column_ids
589                    .iter(),
590            ),
591            self.access_layer.puffin_manager_factory().clone(),
592        )
593        .with_file_cache(file_cache)
594        .with_inverted_index_cache(inverted_index_cache)
595        .with_puffin_metadata_cache(puffin_metadata_cache)
596        .build(filters)
597        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
598        .ok()
599        .flatten()
600        .map(Arc::new)
601    }
602
603    /// Use the latest schema to build the bloom filter index applier.
604    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
605        if self.ignore_bloom_filter {
606            return None;
607        }
608
609        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
610        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
611        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
612
613        BloomFilterIndexApplierBuilder::new(
614            self.access_layer.table_dir().to_string(),
615            self.access_layer.path_type(),
616            self.access_layer.object_store().clone(),
617            self.version.metadata.as_ref(),
618            self.access_layer.puffin_manager_factory().clone(),
619        )
620        .with_file_cache(file_cache)
621        .with_bloom_filter_index_cache(bloom_filter_index_cache)
622        .with_puffin_metadata_cache(puffin_metadata_cache)
623        .build(filters)
624        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
625        .ok()
626        .flatten()
627        .map(Arc::new)
628    }
629
630    /// Use the latest schema to build the fulltext index applier.
631    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
632        if self.ignore_fulltext_index {
633            return None;
634        }
635
636        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
637        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
638        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
639        FulltextIndexApplierBuilder::new(
640            self.access_layer.table_dir().to_string(),
641            self.access_layer.path_type(),
642            self.access_layer.object_store().clone(),
643            self.access_layer.puffin_manager_factory().clone(),
644            self.version.metadata.as_ref(),
645        )
646        .with_file_cache(file_cache)
647        .with_puffin_metadata_cache(puffin_metadata_cache)
648        .with_bloom_filter_cache(bloom_filter_index_cache)
649        .build(filters)
650        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
651        .ok()
652        .flatten()
653        .map(Arc::new)
654    }
655}
656
657/// Returns true if the time range of a SST `file` matches the `predicate`.
658fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
659    if predicate == &TimestampRange::min_to_max() {
660        return true;
661    }
662    // end timestamp of a SST is inclusive.
663    let (start, end) = file.time_range();
664    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
665    file_ts_range.intersects(predicate)
666}
667
668/// Common input for different scanners.
669pub struct ScanInput {
670    /// Region SST access layer.
671    access_layer: AccessLayerRef,
672    /// Maps projected Batches to RecordBatches.
673    pub(crate) mapper: Arc<ProjectionMapper>,
674    /// Time range filter for time index.
675    time_range: Option<TimestampRange>,
676    /// Predicate to push down.
677    pub(crate) predicate: PredicateGroup,
678    /// Region partition expr applied at read time.
679    region_partition_expr: Option<PartitionExpr>,
680    /// Memtable range builders for memtables in the time range..
681    pub(crate) memtables: Vec<MemRangeBuilder>,
682    /// Handles to SST files to scan.
683    pub(crate) files: Vec<FileHandle>,
684    /// Cache.
685    pub(crate) cache_strategy: CacheStrategy,
686    /// Ignores file not found error.
687    ignore_file_not_found: bool,
688    /// Capacity of the channel to send data from parallel scan tasks to the main task.
689    pub(crate) parallel_scan_channel_size: usize,
690    /// Maximum number of SST files to scan concurrently.
691    pub(crate) max_concurrent_scan_files: usize,
692    /// Index appliers.
693    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
694    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
695    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
696    /// Start time of the query.
697    pub(crate) query_start: Option<Instant>,
698    /// The region is using append mode.
699    pub(crate) append_mode: bool,
700    /// Whether to remove deletion markers.
701    pub(crate) filter_deleted: bool,
702    /// Mode to merge duplicate rows.
703    pub(crate) merge_mode: MergeMode,
704    /// Hint to select rows from time series.
705    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
706    /// Hint for the required distribution of the scanner.
707    pub(crate) distribution: Option<TimeSeriesDistribution>,
708    /// Whether to use flat format.
709    pub(crate) flat_format: bool,
710    /// Whether this scan is for compaction.
711    pub(crate) compaction: bool,
712    #[cfg(feature = "enterprise")]
713    extension_ranges: Vec<BoxedExtensionRange>,
714}
715
716impl ScanInput {
717    /// Creates a new [ScanInput].
718    #[must_use]
719    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
720        ScanInput {
721            access_layer,
722            mapper: Arc::new(mapper),
723            time_range: None,
724            predicate: PredicateGroup::default(),
725            region_partition_expr: None,
726            memtables: Vec::new(),
727            files: Vec::new(),
728            cache_strategy: CacheStrategy::Disabled,
729            ignore_file_not_found: false,
730            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
731            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
732            inverted_index_appliers: [None, None],
733            bloom_filter_index_appliers: [None, None],
734            fulltext_index_appliers: [None, None],
735            query_start: None,
736            append_mode: false,
737            filter_deleted: true,
738            merge_mode: MergeMode::default(),
739            series_row_selector: None,
740            distribution: None,
741            flat_format: false,
742            compaction: false,
743            #[cfg(feature = "enterprise")]
744            extension_ranges: Vec::new(),
745        }
746    }
747
748    /// Sets time range filter for time index.
749    #[must_use]
750    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
751        self.time_range = time_range;
752        self
753    }
754
755    /// Sets predicate to push down.
756    #[must_use]
757    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
758        self.region_partition_expr = predicate.region_partition_expr().cloned();
759        self.predicate = predicate;
760        self
761    }
762
763    /// Sets memtable range builders.
764    #[must_use]
765    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
766        self.memtables = memtables;
767        self
768    }
769
770    /// Sets files to read.
771    #[must_use]
772    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
773        self.files = files;
774        self
775    }
776
777    /// Sets cache for this query.
778    #[must_use]
779    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
780        self.cache_strategy = cache;
781        self
782    }
783
784    /// Ignores file not found error.
785    #[must_use]
786    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
787        self.ignore_file_not_found = ignore;
788        self
789    }
790
791    /// Sets scan task channel size.
792    #[must_use]
793    pub(crate) fn with_parallel_scan_channel_size(
794        mut self,
795        parallel_scan_channel_size: usize,
796    ) -> Self {
797        self.parallel_scan_channel_size = parallel_scan_channel_size;
798        self
799    }
800
801    /// Sets maximum number of SST files to scan concurrently.
802    #[must_use]
803    pub(crate) fn with_max_concurrent_scan_files(
804        mut self,
805        max_concurrent_scan_files: usize,
806    ) -> Self {
807        self.max_concurrent_scan_files = max_concurrent_scan_files;
808        self
809    }
810
811    /// Sets inverted index appliers.
812    #[must_use]
813    pub(crate) fn with_inverted_index_appliers(
814        mut self,
815        appliers: [Option<InvertedIndexApplierRef>; 2],
816    ) -> Self {
817        self.inverted_index_appliers = appliers;
818        self
819    }
820
821    /// Sets bloom filter appliers.
822    #[must_use]
823    pub(crate) fn with_bloom_filter_index_appliers(
824        mut self,
825        appliers: [Option<BloomFilterIndexApplierRef>; 2],
826    ) -> Self {
827        self.bloom_filter_index_appliers = appliers;
828        self
829    }
830
831    /// Sets fulltext index appliers.
832    #[must_use]
833    pub(crate) fn with_fulltext_index_appliers(
834        mut self,
835        appliers: [Option<FulltextIndexApplierRef>; 2],
836    ) -> Self {
837        self.fulltext_index_appliers = appliers;
838        self
839    }
840
841    /// Sets start time of the query.
842    #[must_use]
843    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
844        self.query_start = now;
845        self
846    }
847
848    #[must_use]
849    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
850        self.append_mode = is_append_mode;
851        self
852    }
853
854    /// Sets whether to remove deletion markers during scan.
855    #[must_use]
856    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
857        self.filter_deleted = filter_deleted;
858        self
859    }
860
861    /// Sets the merge mode.
862    #[must_use]
863    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
864        self.merge_mode = merge_mode;
865        self
866    }
867
868    /// Sets the distribution hint.
869    #[must_use]
870    pub(crate) fn with_distribution(
871        mut self,
872        distribution: Option<TimeSeriesDistribution>,
873    ) -> Self {
874        self.distribution = distribution;
875        self
876    }
877
878    /// Sets the time series row selector.
879    #[must_use]
880    pub(crate) fn with_series_row_selector(
881        mut self,
882        series_row_selector: Option<TimeSeriesRowSelector>,
883    ) -> Self {
884        self.series_row_selector = series_row_selector;
885        self
886    }
887
888    /// Sets whether to use flat format.
889    #[must_use]
890    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
891        self.flat_format = flat_format;
892        self
893    }
894
895    /// Sets whether this scan is for compaction.
896    #[must_use]
897    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
898        self.compaction = compaction;
899        self
900    }
901
902    /// Scans sources in parallel.
903    ///
904    /// # Panics if the input doesn't allow parallel scan.
905    pub(crate) fn create_parallel_sources(
906        &self,
907        sources: Vec<Source>,
908        semaphore: Arc<Semaphore>,
909    ) -> Result<Vec<Source>> {
910        if sources.len() <= 1 {
911            return Ok(sources);
912        }
913
914        // Spawn a task for each source.
915        let sources = sources
916            .into_iter()
917            .map(|source| {
918                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
919                self.spawn_scan_task(source, semaphore.clone(), sender);
920                let stream = Box::pin(ReceiverStream::new(receiver));
921                Source::Stream(stream)
922            })
923            .collect();
924        Ok(sources)
925    }
926
927    /// Builds memtable ranges to scan by `index`.
928    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
929        let memtable = &self.memtables[index.index];
930        let mut ranges = SmallVec::new();
931        memtable.build_ranges(index.row_group_index, &mut ranges);
932        ranges
933    }
934
935    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
936        if self.should_skip_region_partition(file) {
937            self.predicate.predicate_without_region().cloned()
938        } else {
939            self.predicate.predicate().cloned()
940        }
941    }
942
943    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
944        match (
945            self.region_partition_expr.as_ref(),
946            file.meta_ref().partition_expr.as_ref(),
947        ) {
948            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
949            _ => false,
950        }
951    }
952
953    /// Prunes a file to scan and returns the builder to build readers.
954    pub async fn prune_file(
955        &self,
956        file: &FileHandle,
957        reader_metrics: &mut ReaderMetrics,
958    ) -> Result<FileRangeBuilder> {
959        let predicate = self.predicate_for_file(file);
960        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
961        let res = self
962            .access_layer
963            .read_sst(file.clone())
964            .predicate(predicate)
965            .projection(Some(self.mapper.column_ids().to_vec()))
966            .cache(self.cache_strategy.clone())
967            .inverted_index_appliers(self.inverted_index_appliers.clone())
968            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
969            .fulltext_index_appliers(self.fulltext_index_appliers.clone())
970            .expected_metadata(Some(self.mapper.metadata().clone()))
971            .flat_format(self.flat_format)
972            .compaction(self.compaction)
973            .pre_filter_mode(filter_mode)
974            .build_reader_input(reader_metrics)
975            .await;
976        let (mut file_range_ctx, selection) = match res {
977            Ok(x) => x,
978            Err(e) => {
979                if e.is_object_not_found() && self.ignore_file_not_found {
980                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
981                    return Ok(FileRangeBuilder::default());
982                } else {
983                    return Err(e);
984                }
985            }
986        };
987
988        let need_compat = !compat::has_same_columns_and_pk_encoding(
989            self.mapper.metadata(),
990            file_range_ctx.read_format().metadata(),
991        );
992        if need_compat {
993            // They have different schema. We need to adapt the batch first so the
994            // mapper can convert it.
995            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
996                let mapper = self.mapper.as_flat().unwrap();
997                FlatCompatBatch::try_new(
998                    mapper,
999                    flat_format.metadata(),
1000                    flat_format.format_projection(),
1001                    self.compaction,
1002                )?
1003                .map(CompatBatch::Flat)
1004            } else {
1005                let compact_batch = PrimaryKeyCompatBatch::new(
1006                    &self.mapper,
1007                    file_range_ctx.read_format().metadata().clone(),
1008                )?;
1009                Some(CompatBatch::PrimaryKey(compact_batch))
1010            };
1011            file_range_ctx.set_compat_batch(compat);
1012        }
1013        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1014    }
1015
1016    /// Scans the input source in another task and sends batches to the sender.
1017    pub(crate) fn spawn_scan_task(
1018        &self,
1019        mut input: Source,
1020        semaphore: Arc<Semaphore>,
1021        sender: mpsc::Sender<Result<Batch>>,
1022    ) {
1023        common_runtime::spawn_global(async move {
1024            loop {
1025                // We release the permit before sending result to avoid the task waiting on
1026                // the channel with the permit held.
1027                let maybe_batch = {
1028                    // Safety: We never close the semaphore.
1029                    let _permit = semaphore.acquire().await.unwrap();
1030                    input.next_batch().await
1031                };
1032                match maybe_batch {
1033                    Ok(Some(batch)) => {
1034                        let _ = sender.send(Ok(batch)).await;
1035                    }
1036                    Ok(None) => break,
1037                    Err(e) => {
1038                        let _ = sender.send(Err(e)).await;
1039                        break;
1040                    }
1041                }
1042            }
1043        });
1044    }
1045
1046    /// Scans flat sources (RecordBatch streams) in parallel.
1047    ///
1048    /// # Panics if the input doesn't allow parallel scan.
1049    pub(crate) fn create_parallel_flat_sources(
1050        &self,
1051        sources: Vec<BoxedRecordBatchStream>,
1052        semaphore: Arc<Semaphore>,
1053    ) -> Result<Vec<BoxedRecordBatchStream>> {
1054        if sources.len() <= 1 {
1055            return Ok(sources);
1056        }
1057
1058        // Spawn a task for each source.
1059        let sources = sources
1060            .into_iter()
1061            .map(|source| {
1062                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1063                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1064                let stream = Box::pin(ReceiverStream::new(receiver));
1065                Box::pin(stream) as _
1066            })
1067            .collect();
1068        Ok(sources)
1069    }
1070
1071    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1072    pub(crate) fn spawn_flat_scan_task(
1073        &self,
1074        mut input: BoxedRecordBatchStream,
1075        semaphore: Arc<Semaphore>,
1076        sender: mpsc::Sender<Result<RecordBatch>>,
1077    ) {
1078        common_runtime::spawn_global(async move {
1079            loop {
1080                // We release the permit before sending result to avoid the task waiting on
1081                // the channel with the permit held.
1082                let maybe_batch = {
1083                    // Safety: We never close the semaphore.
1084                    let _permit = semaphore.acquire().await.unwrap();
1085                    input.next().await
1086                };
1087                match maybe_batch {
1088                    Some(Ok(batch)) => {
1089                        let _ = sender.send(Ok(batch)).await;
1090                    }
1091                    Some(Err(e)) => {
1092                        let _ = sender.send(Err(e)).await;
1093                        break;
1094                    }
1095                    None => break,
1096                }
1097            }
1098        });
1099    }
1100
1101    pub(crate) fn total_rows(&self) -> usize {
1102        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1103        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1104
1105        let rows = rows_in_files + rows_in_memtables;
1106        #[cfg(feature = "enterprise")]
1107        let rows = rows
1108            + self
1109                .extension_ranges
1110                .iter()
1111                .map(|x| x.num_rows())
1112                .sum::<u64>() as usize;
1113        rows
1114    }
1115
1116    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1117        &self.predicate
1118    }
1119
1120    /// Returns number of memtables to scan.
1121    pub(crate) fn num_memtables(&self) -> usize {
1122        self.memtables.len()
1123    }
1124
1125    /// Returns number of SST files to scan.
1126    pub(crate) fn num_files(&self) -> usize {
1127        self.files.len()
1128    }
1129
1130    pub fn region_metadata(&self) -> &RegionMetadataRef {
1131        self.mapper.metadata()
1132    }
1133}
1134
1135#[cfg(feature = "enterprise")]
1136impl ScanInput {
1137    #[must_use]
1138    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1139        Self {
1140            extension_ranges,
1141            ..self
1142        }
1143    }
1144
1145    #[cfg(feature = "enterprise")]
1146    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1147        &self.extension_ranges
1148    }
1149
1150    /// Get a boxed [ExtensionRange] by the index in all ranges.
1151    #[cfg(feature = "enterprise")]
1152    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1153        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1154    }
1155}
1156
1157#[cfg(test)]
1158impl ScanInput {
1159    /// Returns SST file ids to scan.
1160    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1161        self.files.iter().map(|file| file.file_id()).collect()
1162    }
1163}
1164
1165fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1166    if append_mode {
1167        return PreFilterMode::All;
1168    }
1169
1170    match merge_mode {
1171        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1172        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1173    }
1174}
1175
1176/// Context shared by different streams from a scanner.
1177/// It contains the input and ranges to scan.
1178pub struct StreamContext {
1179    /// Input memtables and files.
1180    pub input: ScanInput,
1181    /// Metadata for partition ranges.
1182    pub(crate) ranges: Vec<RangeMeta>,
1183
1184    // Metrics:
1185    /// The start time of the query.
1186    pub(crate) query_start: Instant,
1187}
1188
1189impl StreamContext {
1190    /// Creates a new [StreamContext] for [SeqScan].
1191    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1192        let query_start = input.query_start.unwrap_or_else(Instant::now);
1193        let ranges = RangeMeta::seq_scan_ranges(&input);
1194        READ_SST_COUNT.observe(input.num_files() as f64);
1195
1196        Self {
1197            input,
1198            ranges,
1199            query_start,
1200        }
1201    }
1202
1203    /// Creates a new [StreamContext] for [UnorderedScan].
1204    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1205        let query_start = input.query_start.unwrap_or_else(Instant::now);
1206        let ranges = RangeMeta::unordered_scan_ranges(&input);
1207        READ_SST_COUNT.observe(input.num_files() as f64);
1208
1209        Self {
1210            input,
1211            ranges,
1212            query_start,
1213        }
1214    }
1215
1216    /// Returns true if the index refers to a memtable.
1217    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1218        self.input.num_memtables() > index.index
1219    }
1220
1221    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1222        !self.is_mem_range_index(index)
1223            && index.index < self.input.num_files() + self.input.num_memtables()
1224    }
1225
1226    /// Retrieves the partition ranges.
1227    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1228        self.ranges
1229            .iter()
1230            .enumerate()
1231            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1232            .collect()
1233    }
1234
1235    /// Format the context for explain.
1236    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1237        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1238        for range_meta in &self.ranges {
1239            for idx in &range_meta.row_group_indices {
1240                if self.is_mem_range_index(*idx) {
1241                    num_mem_ranges += 1;
1242                } else if self.is_file_range_index(*idx) {
1243                    num_file_ranges += 1;
1244                } else {
1245                    num_other_ranges += 1;
1246                }
1247            }
1248        }
1249        if verbose {
1250            write!(f, "{{")?;
1251        }
1252        write!(
1253            f,
1254            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1255            self.ranges.len(),
1256            num_mem_ranges,
1257            self.input.num_files(),
1258            num_file_ranges,
1259        )?;
1260        if num_other_ranges > 0 {
1261            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1262        }
1263        write!(f, "}}")?;
1264
1265        if let Some(selector) = &self.input.series_row_selector {
1266            write!(f, ", \"selector\":\"{}\"", selector)?;
1267        }
1268        if let Some(distribution) = &self.input.distribution {
1269            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1270        }
1271
1272        if verbose {
1273            self.format_verbose_content(f)?;
1274        }
1275
1276        Ok(())
1277    }
1278
1279    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1280        struct FileWrapper<'a> {
1281            file: &'a FileHandle,
1282        }
1283
1284        impl fmt::Debug for FileWrapper<'_> {
1285            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1286                let (start, end) = self.file.time_range();
1287                write!(
1288                    f,
1289                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1290                    self.file.file_id(),
1291                    start.value(),
1292                    start.unit(),
1293                    end.value(),
1294                    end.unit(),
1295                    self.file.num_rows(),
1296                    self.file.size(),
1297                    self.file.index_size()
1298                )
1299            }
1300        }
1301
1302        struct InputWrapper<'a> {
1303            input: &'a ScanInput,
1304        }
1305
1306        #[cfg(feature = "enterprise")]
1307        impl InputWrapper<'_> {
1308            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1309                if self.input.extension_ranges.is_empty() {
1310                    return Ok(());
1311                }
1312
1313                let mut delimiter = "";
1314                write!(f, ", extension_ranges: [")?;
1315                for range in self.input.extension_ranges() {
1316                    write!(f, "{}{:?}", delimiter, range)?;
1317                    delimiter = ", ";
1318                }
1319                write!(f, "]")?;
1320                Ok(())
1321            }
1322        }
1323
1324        impl fmt::Debug for InputWrapper<'_> {
1325            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1326                let output_schema = self.input.mapper.output_schema();
1327                if !output_schema.is_empty() {
1328                    let names: Vec<_> = output_schema
1329                        .column_schemas()
1330                        .iter()
1331                        .map(|col| &col.name)
1332                        .collect();
1333                    write!(f, ", \"projection\": {:?}", names)?;
1334                }
1335                if let Some(predicate) = &self.input.predicate.predicate()
1336                    && !predicate.exprs().is_empty()
1337                {
1338                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1339                    write!(f, ", \"filters\": {:?}", exprs)?;
1340                }
1341                if !self.input.files.is_empty() {
1342                    write!(f, ", \"files\": ")?;
1343                    f.debug_list()
1344                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1345                        .finish()?;
1346                }
1347
1348                #[cfg(feature = "enterprise")]
1349                self.format_extension_ranges(f)?;
1350
1351                Ok(())
1352            }
1353        }
1354
1355        write!(f, "{:?}", InputWrapper { input: &self.input })
1356    }
1357}
1358
1359/// Predicates to evaluate.
1360/// It only keeps filters that [SimpleFilterEvaluator] supports.
1361#[derive(Clone, Default)]
1362pub struct PredicateGroup {
1363    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1364    /// Predicate that includes request filters and region partition expr (if any).
1365    predicate_all: Option<Predicate>,
1366    /// Predicate that only includes request filters.
1367    predicate_without_region: Option<Predicate>,
1368    /// Region partition expression restored from metadata.
1369    region_partition_expr: Option<PartitionExpr>,
1370}
1371
1372impl PredicateGroup {
1373    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1374    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1375        let mut combined_exprs = exprs.to_vec();
1376        let mut region_partition_expr = None;
1377
1378        if let Some(expr_json) = metadata.partition_expr.as_ref()
1379            && !expr_json.is_empty()
1380            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1381                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1382        {
1383            let logical_expr = expr
1384                .try_as_logical_expr()
1385                .context(InvalidPartitionExprSnafu {
1386                    expr: expr_json.clone(),
1387                })?;
1388
1389            combined_exprs.push(logical_expr);
1390            region_partition_expr = Some(expr);
1391        }
1392
1393        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1394        // Columns in the expr.
1395        let mut columns = HashSet::new();
1396        for expr in &combined_exprs {
1397            columns.clear();
1398            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1399                continue;
1400            };
1401            time_filters.push(filter);
1402        }
1403        let time_filters = if time_filters.is_empty() {
1404            None
1405        } else {
1406            Some(Arc::new(time_filters))
1407        };
1408
1409        let predicate_all = if combined_exprs.is_empty() {
1410            None
1411        } else {
1412            Some(Predicate::new(combined_exprs))
1413        };
1414        let predicate_without_region = if exprs.is_empty() {
1415            None
1416        } else {
1417            Some(Predicate::new(exprs.to_vec()))
1418        };
1419
1420        Ok(Self {
1421            time_filters,
1422            predicate_all,
1423            predicate_without_region,
1424            region_partition_expr,
1425        })
1426    }
1427
1428    /// Returns time filters.
1429    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1430        self.time_filters.clone()
1431    }
1432
1433    /// Returns predicate of all exprs (including region partition expr if present).
1434    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1435        self.predicate_all.as_ref()
1436    }
1437
1438    /// Returns predicate that excludes region partition expr.
1439    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1440        self.predicate_without_region.as_ref()
1441    }
1442
1443    /// Returns the region partition expr from metadata, if any.
1444    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1445        self.region_partition_expr.as_ref()
1446    }
1447
1448    fn expr_to_filter(
1449        expr: &Expr,
1450        metadata: &RegionMetadata,
1451        columns: &mut HashSet<Column>,
1452    ) -> Option<SimpleFilterEvaluator> {
1453        columns.clear();
1454        // `expr_to_columns` won't return error.
1455        // We still ignore these expressions for safety.
1456        expr_to_columns(expr, columns).ok()?;
1457        if columns.len() > 1 {
1458            // Simple filter doesn't support multiple columns.
1459            return None;
1460        }
1461        let column = columns.iter().next()?;
1462        let column_meta = metadata.column_by_name(&column.name)?;
1463        if column_meta.semantic_type == SemanticType::Timestamp {
1464            SimpleFilterEvaluator::try_new(expr)
1465        } else {
1466            None
1467        }
1468    }
1469}