mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::predicate::{Predicate, build_time_range_predicate};
40use tokio::sync::{Semaphore, mpsc};
41use tokio_stream::wrappers::ReceiverStream;
42
43use crate::access_layer::AccessLayerRef;
44use crate::cache::CacheStrategy;
45use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
46use crate::error::{InvalidPartitionExprSnafu, Result};
47#[cfg(feature = "enterprise")]
48use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
49use crate::memtable::MemtableRange;
50use crate::metrics::READ_SST_COUNT;
51use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
52use crate::read::projection::ProjectionMapper;
53use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
54use crate::read::seq_scan::SeqScan;
55use crate::read::series_scan::SeriesScan;
56use crate::read::stream::ScanBatchStream;
57use crate::read::unordered_scan::UnorderedScan;
58use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
59use crate::region::options::MergeMode;
60use crate::region::version::VersionRef;
61use crate::sst::file::FileHandle;
62use crate::sst::index::bloom_filter::applier::{
63    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
64};
65use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
66use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
67use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
68use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
69use crate::sst::parquet::reader::ReaderMetrics;
70
71/// Parallel scan channel size for flat format.
72const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
73
74/// A scanner scans a region and returns a [SendableRecordBatchStream].
75pub(crate) enum Scanner {
76    /// Sequential scan.
77    Seq(SeqScan),
78    /// Unordered scan.
79    Unordered(UnorderedScan),
80    /// Per-series scan.
81    Series(SeriesScan),
82}
83
84impl Scanner {
85    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
86    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
87    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
88        match self {
89            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
90            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
91            Scanner::Series(series_scan) => series_scan.build_stream().await,
92        }
93    }
94
95    /// Create a stream of [`Batch`] by this scanner.
96    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
97        match self {
98            Scanner::Seq(x) => x.scan_all_partitions(),
99            Scanner::Unordered(x) => x.scan_all_partitions(),
100            Scanner::Series(x) => x.scan_all_partitions(),
101        }
102    }
103}
104
105#[cfg(test)]
106impl Scanner {
107    /// Returns number of files to scan.
108    pub(crate) fn num_files(&self) -> usize {
109        match self {
110            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
111            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
112            Scanner::Series(series_scan) => series_scan.input().num_files(),
113        }
114    }
115
116    /// Returns number of memtables to scan.
117    pub(crate) fn num_memtables(&self) -> usize {
118        match self {
119            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
120            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
121            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
122        }
123    }
124
125    /// Returns SST file ids to scan.
126    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
127        match self {
128            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
129            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
130            Scanner::Series(series_scan) => series_scan.input().file_ids(),
131        }
132    }
133
134    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
135    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
136        use store_api::region_engine::{PrepareRequest, RegionScanner};
137
138        let request = PrepareRequest::default().with_target_partitions(target_partitions);
139        match self {
140            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
141            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
142            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
143        }
144    }
145}
146
147#[cfg_attr(doc, aquamarine::aquamarine)]
148/// Helper to scans a region by [ScanRequest].
149///
150/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
151/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
152///
153/// ```mermaid
154/// classDiagram
155/// class ScanRegion {
156///     -VersionRef version
157///     -ScanRequest request
158///     ~scanner() Scanner
159///     ~seq_scan() SeqScan
160/// }
161/// class Scanner {
162///     <<enumeration>>
163///     SeqScan
164///     UnorderedScan
165///     +scan() SendableRecordBatchStream
166/// }
167/// class SeqScan {
168///     -ScanInput input
169///     +build() SendableRecordBatchStream
170/// }
171/// class UnorderedScan {
172///     -ScanInput input
173///     +build() SendableRecordBatchStream
174/// }
175/// class ScanInput {
176///     -ProjectionMapper mapper
177///     -Option~TimeRange~ time_range
178///     -Option~Predicate~ predicate
179///     -Vec~MemtableRef~ memtables
180///     -Vec~FileHandle~ files
181/// }
182/// class ProjectionMapper {
183///     ~output_schema() SchemaRef
184///     ~convert(Batch) RecordBatch
185/// }
186/// ScanRegion -- Scanner
187/// ScanRegion o-- ScanRequest
188/// Scanner o-- SeqScan
189/// Scanner o-- UnorderedScan
190/// SeqScan o-- ScanInput
191/// UnorderedScan o-- ScanInput
192/// Scanner -- SendableRecordBatchStream
193/// ScanInput o-- ProjectionMapper
194/// SeqScan -- SendableRecordBatchStream
195/// UnorderedScan -- SendableRecordBatchStream
196/// ```
197pub(crate) struct ScanRegion {
198    /// Version of the region at scan.
199    version: VersionRef,
200    /// Access layer of the region.
201    access_layer: AccessLayerRef,
202    /// Scan request.
203    request: ScanRequest,
204    /// Cache.
205    cache_strategy: CacheStrategy,
206    /// Capacity of the channel to send data from parallel scan tasks to the main task.
207    parallel_scan_channel_size: usize,
208    /// Maximum number of SST files to scan concurrently.
209    max_concurrent_scan_files: usize,
210    /// Whether to ignore inverted index.
211    ignore_inverted_index: bool,
212    /// Whether to ignore fulltext index.
213    ignore_fulltext_index: bool,
214    /// Whether to ignore bloom filter.
215    ignore_bloom_filter: bool,
216    /// Start time of the scan task.
217    start_time: Option<Instant>,
218    /// Whether to filter out the deleted rows.
219    /// Usually true for normal read, and false for scan for compaction.
220    filter_deleted: bool,
221    /// Whether to use flat format.
222    flat_format: bool,
223    #[cfg(feature = "enterprise")]
224    extension_range_provider: Option<BoxedExtensionRangeProvider>,
225}
226
227impl ScanRegion {
228    /// Creates a [ScanRegion].
229    pub(crate) fn new(
230        version: VersionRef,
231        access_layer: AccessLayerRef,
232        request: ScanRequest,
233        cache_strategy: CacheStrategy,
234    ) -> ScanRegion {
235        ScanRegion {
236            version,
237            access_layer,
238            request,
239            cache_strategy,
240            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
241            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
242            ignore_inverted_index: false,
243            ignore_fulltext_index: false,
244            ignore_bloom_filter: false,
245            start_time: None,
246            filter_deleted: true,
247            flat_format: false,
248            #[cfg(feature = "enterprise")]
249            extension_range_provider: None,
250        }
251    }
252
253    /// Sets parallel scan task channel size.
254    #[must_use]
255    pub(crate) fn with_parallel_scan_channel_size(
256        mut self,
257        parallel_scan_channel_size: usize,
258    ) -> Self {
259        self.parallel_scan_channel_size = parallel_scan_channel_size;
260        self
261    }
262
263    /// Sets maximum number of SST files to scan concurrently.
264    #[must_use]
265    pub(crate) fn with_max_concurrent_scan_files(
266        mut self,
267        max_concurrent_scan_files: usize,
268    ) -> Self {
269        self.max_concurrent_scan_files = max_concurrent_scan_files;
270        self
271    }
272
273    /// Sets whether to ignore inverted index.
274    #[must_use]
275    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
276        self.ignore_inverted_index = ignore;
277        self
278    }
279
280    /// Sets whether to ignore fulltext index.
281    #[must_use]
282    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
283        self.ignore_fulltext_index = ignore;
284        self
285    }
286
287    /// Sets whether to ignore bloom filter.
288    #[must_use]
289    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
290        self.ignore_bloom_filter = ignore;
291        self
292    }
293
294    #[must_use]
295    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
296        self.start_time = Some(now);
297        self
298    }
299
300    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
301        self.filter_deleted = filter_deleted;
302    }
303
304    /// Sets whether to use flat format.
305    #[must_use]
306    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
307        self.flat_format = flat_format;
308        self
309    }
310
311    #[cfg(feature = "enterprise")]
312    pub(crate) fn set_extension_range_provider(
313        &mut self,
314        extension_range_provider: BoxedExtensionRangeProvider,
315    ) {
316        self.extension_range_provider = Some(extension_range_provider);
317    }
318
319    /// Returns a [Scanner] to scan the region.
320    pub(crate) async fn scanner(self) -> Result<Scanner> {
321        if self.use_series_scan() {
322            self.series_scan().await.map(Scanner::Series)
323        } else if self.use_unordered_scan() {
324            // If table is append only and there is no series row selector, we use unordered scan in query.
325            // We still use seq scan in compaction.
326            self.unordered_scan().await.map(Scanner::Unordered)
327        } else {
328            self.seq_scan().await.map(Scanner::Seq)
329        }
330    }
331
332    /// Returns a [RegionScanner] to scan the region.
333    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
334    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
335        if self.use_series_scan() {
336            self.series_scan()
337                .await
338                .map(|scanner| Box::new(scanner) as _)
339        } else if self.use_unordered_scan() {
340            self.unordered_scan()
341                .await
342                .map(|scanner| Box::new(scanner) as _)
343        } else {
344            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
345        }
346    }
347
348    /// Scan sequentially.
349    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
350        let input = self.scan_input().await?.with_compaction(false);
351        Ok(SeqScan::new(input))
352    }
353
354    /// Unordered scan.
355    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
356        let input = self.scan_input().await?;
357        Ok(UnorderedScan::new(input))
358    }
359
360    /// Scans by series.
361    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
362        let input = self.scan_input().await?;
363        Ok(SeriesScan::new(input))
364    }
365
366    /// Returns true if the region can use unordered scan for current request.
367    fn use_unordered_scan(&self) -> bool {
368        // We use unordered scan when:
369        // 1. The region is in append mode.
370        // 2. There is no series row selector.
371        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
372        //
373        // We still use seq scan in compaction.
374        self.version.options.append_mode
375            && self.request.series_row_selector.is_none()
376            && (self.request.distribution.is_none()
377                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
378    }
379
380    /// Returns true if the region can use series scan for current request.
381    fn use_series_scan(&self) -> bool {
382        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
383    }
384
385    /// Creates a scan input.
386    async fn scan_input(mut self) -> Result<ScanInput> {
387        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
388        let time_range = self.build_time_range_predicate();
389        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
390
391        // The mapper always computes projected column ids as the schema of SSTs may change.
392        let mapper = match &self.request.projection {
393            Some(p) => {
394                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
395            }
396            None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
397        };
398
399        let ssts = &self.version.ssts;
400        let mut files = Vec::new();
401        for level in ssts.levels() {
402            for file in level.files.values() {
403                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
404                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
405                    // If the file's sequence is None (or actually is zero), it could mean the file
406                    // is generated and added to the region "directly". In this case, its data should
407                    // be considered as fresh as the memtable. So its sequence is treated greater than
408                    // the min_sequence, whatever the value of min_sequence is. Hence the default
409                    // "true" in this arm.
410                    (Some(_), None) => true,
411                    (None, _) => true,
412                };
413
414                // Finds SST files in range.
415                if exceed_min_sequence && file_in_range(file, &time_range) {
416                    files.push(file.clone());
417                }
418                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
419                // unless the timing is too good, or the sequence number wouldn't be in file.
420                // and the batch will be filtered out by tree reader anyway.
421            }
422        }
423
424        let memtables = self.version.memtables.list_memtables();
425        // Skip empty memtables and memtables out of time range.
426        let mut mem_range_builders = Vec::new();
427
428        for m in memtables {
429            // check if memtable is empty by reading stats.
430            let Some((start, end)) = m.stats().time_range() else {
431                continue;
432            };
433            // The time range of the memtable is inclusive.
434            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
435            if !memtable_range.intersects(&time_range) {
436                continue;
437            }
438            let ranges_in_memtable = m.ranges(
439                Some(mapper.column_ids()),
440                predicate.clone(),
441                self.request.sequence,
442                false,
443            )?;
444            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
445                // todo: we should add stats to MemtableRange
446                let mut stats = ranges_in_memtable.stats.clone();
447                stats.num_ranges = 1;
448                stats.num_rows = v.num_rows();
449                MemRangeBuilder::new(v, stats)
450            }));
451        }
452
453        let region_id = self.region_id();
454        debug!(
455            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
456            region_id,
457            self.request,
458            time_range,
459            mem_range_builders.len(),
460            files.len(),
461            self.version.options.append_mode,
462        );
463
464        // Remove field filters for LastNonNull mode after logging the request.
465        self.maybe_remove_field_filters();
466
467        let inverted_index_applier = self.build_invereted_index_applier();
468        let bloom_filter_applier = self.build_bloom_filter_applier();
469        let fulltext_index_applier = self.build_fulltext_index_applier();
470        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
471
472        if self.flat_format {
473            // The batch is already large enough so we use a small channel size here.
474            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
475        }
476
477        let input = ScanInput::new(self.access_layer, mapper)
478            .with_time_range(Some(time_range))
479            .with_predicate(predicate)
480            .with_memtables(mem_range_builders)
481            .with_files(files)
482            .with_cache(self.cache_strategy)
483            .with_inverted_index_applier(inverted_index_applier)
484            .with_bloom_filter_index_applier(bloom_filter_applier)
485            .with_fulltext_index_applier(fulltext_index_applier)
486            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
487            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
488            .with_start_time(self.start_time)
489            .with_append_mode(self.version.options.append_mode)
490            .with_filter_deleted(self.filter_deleted)
491            .with_merge_mode(self.version.options.merge_mode())
492            .with_series_row_selector(self.request.series_row_selector)
493            .with_distribution(self.request.distribution)
494            .with_flat_format(self.flat_format);
495
496        #[cfg(feature = "enterprise")]
497        let input = if let Some(provider) = self.extension_range_provider {
498            let ranges = provider
499                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
500                .await?;
501            debug!("Find extension ranges: {ranges:?}");
502            input.with_extension_ranges(ranges)
503        } else {
504            input
505        };
506        Ok(input)
507    }
508
509    fn region_id(&self) -> RegionId {
510        self.version.metadata.region_id
511    }
512
513    /// Build time range predicate from filters.
514    fn build_time_range_predicate(&self) -> TimestampRange {
515        let time_index = self.version.metadata.time_index_column();
516        let unit = time_index
517            .column_schema
518            .data_type
519            .as_timestamp()
520            .expect("Time index must have timestamp-compatible type")
521            .unit();
522        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
523    }
524
525    /// Remove field filters if the merge mode is [MergeMode::LastNonNull].
526    fn maybe_remove_field_filters(&mut self) {
527        if self.version.options.merge_mode() != MergeMode::LastNonNull {
528            return;
529        }
530
531        // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
532        let field_columns = self
533            .version
534            .metadata
535            .field_columns()
536            .map(|col| &col.column_schema.name)
537            .collect::<HashSet<_>>();
538        // Columns in the expr.
539        let mut columns = HashSet::new();
540
541        self.request.filters.retain(|expr| {
542            columns.clear();
543            // `expr_to_columns` won't return error.
544            if expr_to_columns(expr, &mut columns).is_err() {
545                return false;
546            }
547            for column in &columns {
548                if field_columns.contains(&column.name) {
549                    // This expr uses the field column.
550                    return false;
551                }
552            }
553            true
554        });
555    }
556
557    /// Use the latest schema to build the inverted index applier.
558    fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
559        if self.ignore_inverted_index {
560            return None;
561        }
562
563        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
564        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
565
566        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
567
568        InvertedIndexApplierBuilder::new(
569            self.access_layer.table_dir().to_string(),
570            self.access_layer.path_type(),
571            self.access_layer.object_store().clone(),
572            self.version.metadata.as_ref(),
573            self.version.metadata.inverted_indexed_column_ids(
574                self.version
575                    .options
576                    .index_options
577                    .inverted_index
578                    .ignore_column_ids
579                    .iter(),
580            ),
581            self.access_layer.puffin_manager_factory().clone(),
582        )
583        .with_file_cache(file_cache)
584        .with_inverted_index_cache(inverted_index_cache)
585        .with_puffin_metadata_cache(puffin_metadata_cache)
586        .build(&self.request.filters)
587        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
588        .ok()
589        .flatten()
590        .map(Arc::new)
591    }
592
593    /// Use the latest schema to build the bloom filter index applier.
594    fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
595        if self.ignore_bloom_filter {
596            return None;
597        }
598
599        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
600        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
601        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
602
603        BloomFilterIndexApplierBuilder::new(
604            self.access_layer.table_dir().to_string(),
605            self.access_layer.path_type(),
606            self.access_layer.object_store().clone(),
607            self.version.metadata.as_ref(),
608            self.access_layer.puffin_manager_factory().clone(),
609        )
610        .with_file_cache(file_cache)
611        .with_bloom_filter_index_cache(bloom_filter_index_cache)
612        .with_puffin_metadata_cache(puffin_metadata_cache)
613        .build(&self.request.filters)
614        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
615        .ok()
616        .flatten()
617        .map(Arc::new)
618    }
619
620    /// Use the latest schema to build the fulltext index applier.
621    fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
622        if self.ignore_fulltext_index {
623            return None;
624        }
625
626        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
627        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
628        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
629        FulltextIndexApplierBuilder::new(
630            self.access_layer.table_dir().to_string(),
631            self.access_layer.path_type(),
632            self.access_layer.object_store().clone(),
633            self.access_layer.puffin_manager_factory().clone(),
634            self.version.metadata.as_ref(),
635        )
636        .with_file_cache(file_cache)
637        .with_puffin_metadata_cache(puffin_metadata_cache)
638        .with_bloom_filter_cache(bloom_filter_index_cache)
639        .build(&self.request.filters)
640        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
641        .ok()
642        .flatten()
643        .map(Arc::new)
644    }
645}
646
647/// Returns true if the time range of a SST `file` matches the `predicate`.
648fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
649    if predicate == &TimestampRange::min_to_max() {
650        return true;
651    }
652    // end timestamp of a SST is inclusive.
653    let (start, end) = file.time_range();
654    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
655    file_ts_range.intersects(predicate)
656}
657
658/// Common input for different scanners.
659pub struct ScanInput {
660    /// Region SST access layer.
661    access_layer: AccessLayerRef,
662    /// Maps projected Batches to RecordBatches.
663    pub(crate) mapper: Arc<ProjectionMapper>,
664    /// Time range filter for time index.
665    time_range: Option<TimestampRange>,
666    /// Predicate to push down.
667    pub(crate) predicate: PredicateGroup,
668    /// Region partition expr applied at read time.
669    region_partition_expr: Option<PartitionExpr>,
670    /// Memtable range builders for memtables in the time range..
671    pub(crate) memtables: Vec<MemRangeBuilder>,
672    /// Handles to SST files to scan.
673    pub(crate) files: Vec<FileHandle>,
674    /// Cache.
675    pub(crate) cache_strategy: CacheStrategy,
676    /// Ignores file not found error.
677    ignore_file_not_found: bool,
678    /// Capacity of the channel to send data from parallel scan tasks to the main task.
679    pub(crate) parallel_scan_channel_size: usize,
680    /// Maximum number of SST files to scan concurrently.
681    pub(crate) max_concurrent_scan_files: usize,
682    /// Index appliers.
683    inverted_index_applier: Option<InvertedIndexApplierRef>,
684    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
685    fulltext_index_applier: Option<FulltextIndexApplierRef>,
686    /// Start time of the query.
687    pub(crate) query_start: Option<Instant>,
688    /// The region is using append mode.
689    pub(crate) append_mode: bool,
690    /// Whether to remove deletion markers.
691    pub(crate) filter_deleted: bool,
692    /// Mode to merge duplicate rows.
693    pub(crate) merge_mode: MergeMode,
694    /// Hint to select rows from time series.
695    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
696    /// Hint for the required distribution of the scanner.
697    pub(crate) distribution: Option<TimeSeriesDistribution>,
698    /// Whether to use flat format.
699    pub(crate) flat_format: bool,
700    /// Whether this scan is for compaction.
701    pub(crate) compaction: bool,
702    #[cfg(feature = "enterprise")]
703    extension_ranges: Vec<BoxedExtensionRange>,
704}
705
706impl ScanInput {
707    /// Creates a new [ScanInput].
708    #[must_use]
709    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
710        ScanInput {
711            access_layer,
712            mapper: Arc::new(mapper),
713            time_range: None,
714            predicate: PredicateGroup::default(),
715            region_partition_expr: None,
716            memtables: Vec::new(),
717            files: Vec::new(),
718            cache_strategy: CacheStrategy::Disabled,
719            ignore_file_not_found: false,
720            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
721            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
722            inverted_index_applier: None,
723            bloom_filter_index_applier: None,
724            fulltext_index_applier: None,
725            query_start: None,
726            append_mode: false,
727            filter_deleted: true,
728            merge_mode: MergeMode::default(),
729            series_row_selector: None,
730            distribution: None,
731            flat_format: false,
732            compaction: false,
733            #[cfg(feature = "enterprise")]
734            extension_ranges: Vec::new(),
735        }
736    }
737
738    /// Sets time range filter for time index.
739    #[must_use]
740    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
741        self.time_range = time_range;
742        self
743    }
744
745    /// Sets predicate to push down.
746    #[must_use]
747    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
748        self.region_partition_expr = predicate.region_partition_expr().cloned();
749        self.predicate = predicate;
750        self
751    }
752
753    /// Sets memtable range builders.
754    #[must_use]
755    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
756        self.memtables = memtables;
757        self
758    }
759
760    /// Sets files to read.
761    #[must_use]
762    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
763        self.files = files;
764        self
765    }
766
767    /// Sets cache for this query.
768    #[must_use]
769    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
770        self.cache_strategy = cache;
771        self
772    }
773
774    /// Ignores file not found error.
775    #[must_use]
776    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
777        self.ignore_file_not_found = ignore;
778        self
779    }
780
781    /// Sets scan task channel size.
782    #[must_use]
783    pub(crate) fn with_parallel_scan_channel_size(
784        mut self,
785        parallel_scan_channel_size: usize,
786    ) -> Self {
787        self.parallel_scan_channel_size = parallel_scan_channel_size;
788        self
789    }
790
791    /// Sets maximum number of SST files to scan concurrently.
792    #[must_use]
793    pub(crate) fn with_max_concurrent_scan_files(
794        mut self,
795        max_concurrent_scan_files: usize,
796    ) -> Self {
797        self.max_concurrent_scan_files = max_concurrent_scan_files;
798        self
799    }
800
801    /// Sets invereted index applier.
802    #[must_use]
803    pub(crate) fn with_inverted_index_applier(
804        mut self,
805        applier: Option<InvertedIndexApplierRef>,
806    ) -> Self {
807        self.inverted_index_applier = applier;
808        self
809    }
810
811    /// Sets bloom filter applier.
812    #[must_use]
813    pub(crate) fn with_bloom_filter_index_applier(
814        mut self,
815        applier: Option<BloomFilterIndexApplierRef>,
816    ) -> Self {
817        self.bloom_filter_index_applier = applier;
818        self
819    }
820
821    /// Sets fulltext index applier.
822    #[must_use]
823    pub(crate) fn with_fulltext_index_applier(
824        mut self,
825        applier: Option<FulltextIndexApplierRef>,
826    ) -> Self {
827        self.fulltext_index_applier = applier;
828        self
829    }
830
831    /// Sets start time of the query.
832    #[must_use]
833    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
834        self.query_start = now;
835        self
836    }
837
838    #[must_use]
839    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
840        self.append_mode = is_append_mode;
841        self
842    }
843
844    /// Sets whether to remove deletion markers during scan.
845    #[must_use]
846    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
847        self.filter_deleted = filter_deleted;
848        self
849    }
850
851    /// Sets the merge mode.
852    #[must_use]
853    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
854        self.merge_mode = merge_mode;
855        self
856    }
857
858    /// Sets the distribution hint.
859    #[must_use]
860    pub(crate) fn with_distribution(
861        mut self,
862        distribution: Option<TimeSeriesDistribution>,
863    ) -> Self {
864        self.distribution = distribution;
865        self
866    }
867
868    /// Sets the time series row selector.
869    #[must_use]
870    pub(crate) fn with_series_row_selector(
871        mut self,
872        series_row_selector: Option<TimeSeriesRowSelector>,
873    ) -> Self {
874        self.series_row_selector = series_row_selector;
875        self
876    }
877
878    /// Sets whether to use flat format.
879    #[must_use]
880    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
881        self.flat_format = flat_format;
882        self
883    }
884
885    /// Sets whether this scan is for compaction.
886    #[must_use]
887    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
888        self.compaction = compaction;
889        self
890    }
891
892    /// Scans sources in parallel.
893    ///
894    /// # Panics if the input doesn't allow parallel scan.
895    pub(crate) fn create_parallel_sources(
896        &self,
897        sources: Vec<Source>,
898        semaphore: Arc<Semaphore>,
899    ) -> Result<Vec<Source>> {
900        if sources.len() <= 1 {
901            return Ok(sources);
902        }
903
904        // Spawn a task for each source.
905        let sources = sources
906            .into_iter()
907            .map(|source| {
908                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
909                self.spawn_scan_task(source, semaphore.clone(), sender);
910                let stream = Box::pin(ReceiverStream::new(receiver));
911                Source::Stream(stream)
912            })
913            .collect();
914        Ok(sources)
915    }
916
917    /// Builds memtable ranges to scan by `index`.
918    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
919        let memtable = &self.memtables[index.index];
920        let mut ranges = SmallVec::new();
921        memtable.build_ranges(index.row_group_index, &mut ranges);
922        ranges
923    }
924
925    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
926        if self.should_skip_region_partition(file) {
927            self.predicate.predicate_without_region().cloned()
928        } else {
929            self.predicate.predicate().cloned()
930        }
931    }
932
933    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
934        match (
935            self.region_partition_expr.as_ref(),
936            file.meta_ref().partition_expr.as_ref(),
937        ) {
938            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
939            _ => false,
940        }
941    }
942
943    /// Prunes a file to scan and returns the builder to build readers.
944    pub async fn prune_file(
945        &self,
946        file: &FileHandle,
947        reader_metrics: &mut ReaderMetrics,
948    ) -> Result<FileRangeBuilder> {
949        let predicate = self.predicate_for_file(file);
950        let res = self
951            .access_layer
952            .read_sst(file.clone())
953            .predicate(predicate)
954            .projection(Some(self.mapper.column_ids().to_vec()))
955            .cache(self.cache_strategy.clone())
956            .inverted_index_applier(self.inverted_index_applier.clone())
957            .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
958            .fulltext_index_applier(self.fulltext_index_applier.clone())
959            .expected_metadata(Some(self.mapper.metadata().clone()))
960            .flat_format(self.flat_format)
961            .compaction(self.compaction)
962            .build_reader_input(reader_metrics)
963            .await;
964        let (mut file_range_ctx, selection) = match res {
965            Ok(x) => x,
966            Err(e) => {
967                if e.is_object_not_found() && self.ignore_file_not_found {
968                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
969                    return Ok(FileRangeBuilder::default());
970                } else {
971                    return Err(e);
972                }
973            }
974        };
975
976        let need_compat = !compat::has_same_columns_and_pk_encoding(
977            self.mapper.metadata(),
978            file_range_ctx.read_format().metadata(),
979        );
980        if need_compat {
981            // They have different schema. We need to adapt the batch first so the
982            // mapper can convert it.
983            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
984                let mapper = self.mapper.as_flat().unwrap();
985                FlatCompatBatch::try_new(
986                    mapper,
987                    flat_format.metadata(),
988                    flat_format.format_projection(),
989                    self.compaction,
990                )?
991                .map(CompatBatch::Flat)
992            } else {
993                let compact_batch = PrimaryKeyCompatBatch::new(
994                    &self.mapper,
995                    file_range_ctx.read_format().metadata().clone(),
996                )?;
997                Some(CompatBatch::PrimaryKey(compact_batch))
998            };
999            file_range_ctx.set_compat_batch(compat);
1000        }
1001        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1002    }
1003
1004    /// Scans the input source in another task and sends batches to the sender.
1005    pub(crate) fn spawn_scan_task(
1006        &self,
1007        mut input: Source,
1008        semaphore: Arc<Semaphore>,
1009        sender: mpsc::Sender<Result<Batch>>,
1010    ) {
1011        common_runtime::spawn_global(async move {
1012            loop {
1013                // We release the permit before sending result to avoid the task waiting on
1014                // the channel with the permit held.
1015                let maybe_batch = {
1016                    // Safety: We never close the semaphore.
1017                    let _permit = semaphore.acquire().await.unwrap();
1018                    input.next_batch().await
1019                };
1020                match maybe_batch {
1021                    Ok(Some(batch)) => {
1022                        let _ = sender.send(Ok(batch)).await;
1023                    }
1024                    Ok(None) => break,
1025                    Err(e) => {
1026                        let _ = sender.send(Err(e)).await;
1027                        break;
1028                    }
1029                }
1030            }
1031        });
1032    }
1033
1034    /// Scans flat sources (RecordBatch streams) in parallel.
1035    ///
1036    /// # Panics if the input doesn't allow parallel scan.
1037    pub(crate) fn create_parallel_flat_sources(
1038        &self,
1039        sources: Vec<BoxedRecordBatchStream>,
1040        semaphore: Arc<Semaphore>,
1041    ) -> Result<Vec<BoxedRecordBatchStream>> {
1042        if sources.len() <= 1 {
1043            return Ok(sources);
1044        }
1045
1046        // Spawn a task for each source.
1047        let sources = sources
1048            .into_iter()
1049            .map(|source| {
1050                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1051                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1052                let stream = Box::pin(ReceiverStream::new(receiver));
1053                Box::pin(stream) as _
1054            })
1055            .collect();
1056        Ok(sources)
1057    }
1058
1059    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1060    pub(crate) fn spawn_flat_scan_task(
1061        &self,
1062        mut input: BoxedRecordBatchStream,
1063        semaphore: Arc<Semaphore>,
1064        sender: mpsc::Sender<Result<RecordBatch>>,
1065    ) {
1066        common_runtime::spawn_global(async move {
1067            loop {
1068                // We release the permit before sending result to avoid the task waiting on
1069                // the channel with the permit held.
1070                let maybe_batch = {
1071                    // Safety: We never close the semaphore.
1072                    let _permit = semaphore.acquire().await.unwrap();
1073                    input.next().await
1074                };
1075                match maybe_batch {
1076                    Some(Ok(batch)) => {
1077                        let _ = sender.send(Ok(batch)).await;
1078                    }
1079                    Some(Err(e)) => {
1080                        let _ = sender.send(Err(e)).await;
1081                        break;
1082                    }
1083                    None => break,
1084                }
1085            }
1086        });
1087    }
1088
1089    pub(crate) fn total_rows(&self) -> usize {
1090        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1091        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1092
1093        let rows = rows_in_files + rows_in_memtables;
1094        #[cfg(feature = "enterprise")]
1095        let rows = rows
1096            + self
1097                .extension_ranges
1098                .iter()
1099                .map(|x| x.num_rows())
1100                .sum::<u64>() as usize;
1101        rows
1102    }
1103
1104    /// Returns table predicate of all exprs.
1105    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1106        self.predicate.predicate()
1107    }
1108
1109    /// Returns number of memtables to scan.
1110    pub(crate) fn num_memtables(&self) -> usize {
1111        self.memtables.len()
1112    }
1113
1114    /// Returns number of SST files to scan.
1115    pub(crate) fn num_files(&self) -> usize {
1116        self.files.len()
1117    }
1118
1119    pub fn region_metadata(&self) -> &RegionMetadataRef {
1120        self.mapper.metadata()
1121    }
1122}
1123
1124#[cfg(feature = "enterprise")]
1125impl ScanInput {
1126    #[must_use]
1127    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1128        Self {
1129            extension_ranges,
1130            ..self
1131        }
1132    }
1133
1134    #[cfg(feature = "enterprise")]
1135    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1136        &self.extension_ranges
1137    }
1138
1139    /// Get a boxed [ExtensionRange] by the index in all ranges.
1140    #[cfg(feature = "enterprise")]
1141    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1142        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1143    }
1144}
1145
1146#[cfg(test)]
1147impl ScanInput {
1148    /// Returns SST file ids to scan.
1149    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1150        self.files.iter().map(|file| file.file_id()).collect()
1151    }
1152}
1153
1154/// Context shared by different streams from a scanner.
1155/// It contains the input and ranges to scan.
1156pub struct StreamContext {
1157    /// Input memtables and files.
1158    pub input: ScanInput,
1159    /// Metadata for partition ranges.
1160    pub(crate) ranges: Vec<RangeMeta>,
1161
1162    // Metrics:
1163    /// The start time of the query.
1164    pub(crate) query_start: Instant,
1165}
1166
1167impl StreamContext {
1168    /// Creates a new [StreamContext] for [SeqScan].
1169    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1170        let query_start = input.query_start.unwrap_or_else(Instant::now);
1171        let ranges = RangeMeta::seq_scan_ranges(&input);
1172        READ_SST_COUNT.observe(input.num_files() as f64);
1173
1174        Self {
1175            input,
1176            ranges,
1177            query_start,
1178        }
1179    }
1180
1181    /// Creates a new [StreamContext] for [UnorderedScan].
1182    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1183        let query_start = input.query_start.unwrap_or_else(Instant::now);
1184        let ranges = RangeMeta::unordered_scan_ranges(&input);
1185        READ_SST_COUNT.observe(input.num_files() as f64);
1186
1187        Self {
1188            input,
1189            ranges,
1190            query_start,
1191        }
1192    }
1193
1194    /// Returns true if the index refers to a memtable.
1195    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1196        self.input.num_memtables() > index.index
1197    }
1198
1199    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1200        !self.is_mem_range_index(index)
1201            && index.index < self.input.num_files() + self.input.num_memtables()
1202    }
1203
1204    /// Retrieves the partition ranges.
1205    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1206        self.ranges
1207            .iter()
1208            .enumerate()
1209            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1210            .collect()
1211    }
1212
1213    /// Format the context for explain.
1214    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1215        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1216        for range_meta in &self.ranges {
1217            for idx in &range_meta.row_group_indices {
1218                if self.is_mem_range_index(*idx) {
1219                    num_mem_ranges += 1;
1220                } else if self.is_file_range_index(*idx) {
1221                    num_file_ranges += 1;
1222                } else {
1223                    num_other_ranges += 1;
1224                }
1225            }
1226        }
1227        if verbose {
1228            write!(f, "{{")?;
1229        }
1230        write!(
1231            f,
1232            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1233            self.ranges.len(),
1234            num_mem_ranges,
1235            self.input.num_files(),
1236            num_file_ranges,
1237        )?;
1238        if num_other_ranges > 0 {
1239            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1240        }
1241        write!(f, "}}")?;
1242
1243        if let Some(selector) = &self.input.series_row_selector {
1244            write!(f, ", \"selector\":\"{}\"", selector)?;
1245        }
1246        if let Some(distribution) = &self.input.distribution {
1247            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1248        }
1249
1250        if verbose {
1251            self.format_verbose_content(f)?;
1252        }
1253
1254        Ok(())
1255    }
1256
1257    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1258        struct FileWrapper<'a> {
1259            file: &'a FileHandle,
1260        }
1261
1262        impl fmt::Debug for FileWrapper<'_> {
1263            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1264                let (start, end) = self.file.time_range();
1265                write!(
1266                    f,
1267                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1268                    self.file.file_id(),
1269                    start.value(),
1270                    start.unit(),
1271                    end.value(),
1272                    end.unit(),
1273                    self.file.num_rows(),
1274                    self.file.size(),
1275                    self.file.index_size()
1276                )
1277            }
1278        }
1279
1280        struct InputWrapper<'a> {
1281            input: &'a ScanInput,
1282        }
1283
1284        #[cfg(feature = "enterprise")]
1285        impl InputWrapper<'_> {
1286            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1287                if self.input.extension_ranges.is_empty() {
1288                    return Ok(());
1289                }
1290
1291                let mut delimiter = "";
1292                write!(f, ", extension_ranges: [")?;
1293                for range in self.input.extension_ranges() {
1294                    write!(f, "{}{:?}", delimiter, range)?;
1295                    delimiter = ", ";
1296                }
1297                write!(f, "]")?;
1298                Ok(())
1299            }
1300        }
1301
1302        impl fmt::Debug for InputWrapper<'_> {
1303            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304                let output_schema = self.input.mapper.output_schema();
1305                if !output_schema.is_empty() {
1306                    let names: Vec<_> = output_schema
1307                        .column_schemas()
1308                        .iter()
1309                        .map(|col| &col.name)
1310                        .collect();
1311                    write!(f, ", \"projection\": {:?}", names)?;
1312                }
1313                if let Some(predicate) = &self.input.predicate.predicate()
1314                    && !predicate.exprs().is_empty()
1315                {
1316                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1317                    write!(f, ", \"filters\": {:?}", exprs)?;
1318                }
1319                if !self.input.files.is_empty() {
1320                    write!(f, ", \"files\": ")?;
1321                    f.debug_list()
1322                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1323                        .finish()?;
1324                }
1325
1326                #[cfg(feature = "enterprise")]
1327                self.format_extension_ranges(f)?;
1328
1329                Ok(())
1330            }
1331        }
1332
1333        write!(f, "{:?}", InputWrapper { input: &self.input })
1334    }
1335}
1336
1337/// Predicates to evaluate.
1338/// It only keeps filters that [SimpleFilterEvaluator] supports.
1339#[derive(Clone, Default)]
1340pub struct PredicateGroup {
1341    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1342    /// Predicate that includes request filters and region partition expr (if any).
1343    predicate_all: Option<Predicate>,
1344    /// Predicate that only includes request filters.
1345    predicate_without_region: Option<Predicate>,
1346    /// Region partition expression restored from metadata.
1347    region_partition_expr: Option<PartitionExpr>,
1348}
1349
1350impl PredicateGroup {
1351    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1352    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1353        let mut combined_exprs = exprs.to_vec();
1354        let mut region_partition_expr = None;
1355
1356        if let Some(expr_json) = metadata.partition_expr.as_ref()
1357            && !expr_json.is_empty()
1358            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1359                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1360        {
1361            let logical_expr = expr
1362                .try_as_logical_expr()
1363                .context(InvalidPartitionExprSnafu {
1364                    expr: expr_json.clone(),
1365                })?;
1366
1367            combined_exprs.push(logical_expr);
1368            region_partition_expr = Some(expr);
1369        }
1370
1371        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1372        // Columns in the expr.
1373        let mut columns = HashSet::new();
1374        for expr in &combined_exprs {
1375            columns.clear();
1376            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1377                continue;
1378            };
1379            time_filters.push(filter);
1380        }
1381        let time_filters = if time_filters.is_empty() {
1382            None
1383        } else {
1384            Some(Arc::new(time_filters))
1385        };
1386
1387        let predicate_all = if combined_exprs.is_empty() {
1388            None
1389        } else {
1390            Some(Predicate::new(combined_exprs))
1391        };
1392        let predicate_without_region = if exprs.is_empty() {
1393            None
1394        } else {
1395            Some(Predicate::new(exprs.to_vec()))
1396        };
1397
1398        Ok(Self {
1399            time_filters,
1400            predicate_all,
1401            predicate_without_region,
1402            region_partition_expr,
1403        })
1404    }
1405
1406    /// Returns time filters.
1407    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1408        self.time_filters.clone()
1409    }
1410
1411    /// Returns predicate of all exprs (including region partition expr if present).
1412    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1413        self.predicate_all.as_ref()
1414    }
1415
1416    /// Returns predicate that excludes region partition expr.
1417    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1418        self.predicate_without_region.as_ref()
1419    }
1420
1421    /// Returns the region partition expr from metadata, if any.
1422    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1423        self.region_partition_expr.as_ref()
1424    }
1425
1426    fn expr_to_filter(
1427        expr: &Expr,
1428        metadata: &RegionMetadata,
1429        columns: &mut HashSet<Column>,
1430    ) -> Option<SimpleFilterEvaluator> {
1431        columns.clear();
1432        // `expr_to_columns` won't return error.
1433        // We still ignore these expressions for safety.
1434        expr_to_columns(expr, columns).ok()?;
1435        if columns.len() > 1 {
1436            // Simple filter doesn't support multiple columns.
1437            return None;
1438        }
1439        let column = columns.iter().next()?;
1440        let column_meta = metadata.column_by_name(&column.name)?;
1441        if column_meta.semantic_type == SemanticType::Timestamp {
1442            SimpleFilterEvaluator::try_new(expr)
1443        } else {
1444            None
1445        }
1446    }
1447}