mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use futures::StreamExt;
33use smallvec::SmallVec;
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{PartitionRange, RegionScannerRef};
36use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
37use table::predicate::{build_time_range_predicate, Predicate};
38use tokio::sync::{mpsc, Semaphore};
39use tokio_stream::wrappers::ReceiverStream;
40
41use crate::access_layer::AccessLayerRef;
42use crate::cache::CacheStrategy;
43use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
44use crate::error::Result;
45#[cfg(feature = "enterprise")]
46use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
47use crate::memtable::MemtableRange;
48use crate::metrics::READ_SST_COUNT;
49use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
50use crate::read::projection::ProjectionMapper;
51use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
52use crate::read::seq_scan::SeqScan;
53use crate::read::series_scan::SeriesScan;
54use crate::read::stream::ScanBatchStream;
55use crate::read::unordered_scan::UnorderedScan;
56use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
57use crate::region::options::MergeMode;
58use crate::region::version::VersionRef;
59use crate::sst::file::FileHandle;
60use crate::sst::index::bloom_filter::applier::{
61    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
62};
63use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
64use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
65use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
66use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
67use crate::sst::parquet::reader::ReaderMetrics;
68
69/// Parallel scan channel size for flat format.
70const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
71
72/// A scanner scans a region and returns a [SendableRecordBatchStream].
73pub(crate) enum Scanner {
74    /// Sequential scan.
75    Seq(SeqScan),
76    /// Unordered scan.
77    Unordered(UnorderedScan),
78    /// Per-series scan.
79    Series(SeriesScan),
80}
81
82impl Scanner {
83    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
84    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
85    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
86        match self {
87            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
88            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
89            Scanner::Series(series_scan) => series_scan.build_stream().await,
90        }
91    }
92
93    /// Create a stream of [`Batch`] by this scanner.
94    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
95        match self {
96            Scanner::Seq(x) => x.scan_all_partitions(),
97            Scanner::Unordered(x) => x.scan_all_partitions(),
98            Scanner::Series(x) => x.scan_all_partitions(),
99        }
100    }
101}
102
103#[cfg(test)]
104impl Scanner {
105    /// Returns number of files to scan.
106    pub(crate) fn num_files(&self) -> usize {
107        match self {
108            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
109            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
110            Scanner::Series(series_scan) => series_scan.input().num_files(),
111        }
112    }
113
114    /// Returns number of memtables to scan.
115    pub(crate) fn num_memtables(&self) -> usize {
116        match self {
117            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
118            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
119            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
120        }
121    }
122
123    /// Returns SST file ids to scan.
124    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
125        match self {
126            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
127            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
128            Scanner::Series(series_scan) => series_scan.input().file_ids(),
129        }
130    }
131
132    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
133    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
134        use store_api::region_engine::{PrepareRequest, RegionScanner};
135
136        let request = PrepareRequest::default().with_target_partitions(target_partitions);
137        match self {
138            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
139            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
140            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
141        }
142    }
143}
144
145#[cfg_attr(doc, aquamarine::aquamarine)]
146/// Helper to scans a region by [ScanRequest].
147///
148/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
149/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
150///
151/// ```mermaid
152/// classDiagram
153/// class ScanRegion {
154///     -VersionRef version
155///     -ScanRequest request
156///     ~scanner() Scanner
157///     ~seq_scan() SeqScan
158/// }
159/// class Scanner {
160///     <<enumeration>>
161///     SeqScan
162///     UnorderedScan
163///     +scan() SendableRecordBatchStream
164/// }
165/// class SeqScan {
166///     -ScanInput input
167///     +build() SendableRecordBatchStream
168/// }
169/// class UnorderedScan {
170///     -ScanInput input
171///     +build() SendableRecordBatchStream
172/// }
173/// class ScanInput {
174///     -ProjectionMapper mapper
175///     -Option~TimeRange~ time_range
176///     -Option~Predicate~ predicate
177///     -Vec~MemtableRef~ memtables
178///     -Vec~FileHandle~ files
179/// }
180/// class ProjectionMapper {
181///     ~output_schema() SchemaRef
182///     ~convert(Batch) RecordBatch
183/// }
184/// ScanRegion -- Scanner
185/// ScanRegion o-- ScanRequest
186/// Scanner o-- SeqScan
187/// Scanner o-- UnorderedScan
188/// SeqScan o-- ScanInput
189/// UnorderedScan o-- ScanInput
190/// Scanner -- SendableRecordBatchStream
191/// ScanInput o-- ProjectionMapper
192/// SeqScan -- SendableRecordBatchStream
193/// UnorderedScan -- SendableRecordBatchStream
194/// ```
195pub(crate) struct ScanRegion {
196    /// Version of the region at scan.
197    version: VersionRef,
198    /// Access layer of the region.
199    access_layer: AccessLayerRef,
200    /// Scan request.
201    request: ScanRequest,
202    /// Cache.
203    cache_strategy: CacheStrategy,
204    /// Capacity of the channel to send data from parallel scan tasks to the main task.
205    parallel_scan_channel_size: usize,
206    /// Maximum number of SST files to scan concurrently.
207    max_concurrent_scan_files: usize,
208    /// Whether to ignore inverted index.
209    ignore_inverted_index: bool,
210    /// Whether to ignore fulltext index.
211    ignore_fulltext_index: bool,
212    /// Whether to ignore bloom filter.
213    ignore_bloom_filter: bool,
214    /// Start time of the scan task.
215    start_time: Option<Instant>,
216    /// Whether to filter out the deleted rows.
217    /// Usually true for normal read, and false for scan for compaction.
218    filter_deleted: bool,
219    /// Whether to use flat format.
220    flat_format: bool,
221    #[cfg(feature = "enterprise")]
222    extension_range_provider: Option<BoxedExtensionRangeProvider>,
223}
224
225impl ScanRegion {
226    /// Creates a [ScanRegion].
227    pub(crate) fn new(
228        version: VersionRef,
229        access_layer: AccessLayerRef,
230        request: ScanRequest,
231        cache_strategy: CacheStrategy,
232    ) -> ScanRegion {
233        ScanRegion {
234            version,
235            access_layer,
236            request,
237            cache_strategy,
238            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
239            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
240            ignore_inverted_index: false,
241            ignore_fulltext_index: false,
242            ignore_bloom_filter: false,
243            start_time: None,
244            filter_deleted: true,
245            flat_format: false,
246            #[cfg(feature = "enterprise")]
247            extension_range_provider: None,
248        }
249    }
250
251    /// Sets parallel scan task channel size.
252    #[must_use]
253    pub(crate) fn with_parallel_scan_channel_size(
254        mut self,
255        parallel_scan_channel_size: usize,
256    ) -> Self {
257        self.parallel_scan_channel_size = parallel_scan_channel_size;
258        self
259    }
260
261    /// Sets maximum number of SST files to scan concurrently.
262    #[must_use]
263    pub(crate) fn with_max_concurrent_scan_files(
264        mut self,
265        max_concurrent_scan_files: usize,
266    ) -> Self {
267        self.max_concurrent_scan_files = max_concurrent_scan_files;
268        self
269    }
270
271    /// Sets whether to ignore inverted index.
272    #[must_use]
273    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
274        self.ignore_inverted_index = ignore;
275        self
276    }
277
278    /// Sets whether to ignore fulltext index.
279    #[must_use]
280    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
281        self.ignore_fulltext_index = ignore;
282        self
283    }
284
285    /// Sets whether to ignore bloom filter.
286    #[must_use]
287    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
288        self.ignore_bloom_filter = ignore;
289        self
290    }
291
292    #[must_use]
293    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
294        self.start_time = Some(now);
295        self
296    }
297
298    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
299        self.filter_deleted = filter_deleted;
300    }
301
302    /// Sets whether to use flat format.
303    #[must_use]
304    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
305        self.flat_format = flat_format;
306        self
307    }
308
309    #[cfg(feature = "enterprise")]
310    pub(crate) fn set_extension_range_provider(
311        &mut self,
312        extension_range_provider: BoxedExtensionRangeProvider,
313    ) {
314        self.extension_range_provider = Some(extension_range_provider);
315    }
316
317    /// Returns a [Scanner] to scan the region.
318    pub(crate) async fn scanner(self) -> Result<Scanner> {
319        if self.use_series_scan() {
320            self.series_scan().await.map(Scanner::Series)
321        } else if self.use_unordered_scan() {
322            // If table is append only and there is no series row selector, we use unordered scan in query.
323            // We still use seq scan in compaction.
324            self.unordered_scan().await.map(Scanner::Unordered)
325        } else {
326            self.seq_scan().await.map(Scanner::Seq)
327        }
328    }
329
330    /// Returns a [RegionScanner] to scan the region.
331    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
332    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
333        if self.use_series_scan() {
334            self.series_scan()
335                .await
336                .map(|scanner| Box::new(scanner) as _)
337        } else if self.use_unordered_scan() {
338            self.unordered_scan()
339                .await
340                .map(|scanner| Box::new(scanner) as _)
341        } else {
342            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
343        }
344    }
345
346    /// Scan sequentially.
347    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
348        let input = self.scan_input().await?;
349        Ok(SeqScan::new(input, false))
350    }
351
352    /// Unordered scan.
353    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
354        let input = self.scan_input().await?;
355        Ok(UnorderedScan::new(input))
356    }
357
358    /// Scans by series.
359    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
360        let input = self.scan_input().await?;
361        Ok(SeriesScan::new(input))
362    }
363
364    /// Returns true if the region can use unordered scan for current request.
365    fn use_unordered_scan(&self) -> bool {
366        // We use unordered scan when:
367        // 1. The region is in append mode.
368        // 2. There is no series row selector.
369        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
370        //
371        // We still use seq scan in compaction.
372        self.version.options.append_mode
373            && self.request.series_row_selector.is_none()
374            && (self.request.distribution.is_none()
375                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
376    }
377
378    /// Returns true if the region can use series scan for current request.
379    fn use_series_scan(&self) -> bool {
380        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
381    }
382
383    /// Creates a scan input.
384    async fn scan_input(mut self) -> Result<ScanInput> {
385        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
386        let time_range = self.build_time_range_predicate();
387        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
388
389        // The mapper always computes projected column ids as the schema of SSTs may change.
390        let mapper = match &self.request.projection {
391            Some(p) => {
392                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
393            }
394            None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
395        };
396
397        let ssts = &self.version.ssts;
398        let mut files = Vec::new();
399        for level in ssts.levels() {
400            for file in level.files.values() {
401                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
402                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
403                    // If the file's sequence is None (or actually is zero), it could mean the file
404                    // is generated and added to the region "directly". In this case, its data should
405                    // be considered as fresh as the memtable. So its sequence is treated greater than
406                    // the min_sequence, whatever the value of min_sequence is. Hence the default
407                    // "true" in this arm.
408                    (Some(_), None) => true,
409                    (None, _) => true,
410                };
411
412                // Finds SST files in range.
413                if exceed_min_sequence && file_in_range(file, &time_range) {
414                    files.push(file.clone());
415                }
416                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
417                // unless the timing is too good, or the sequence number wouldn't be in file.
418                // and the batch will be filtered out by tree reader anyway.
419            }
420        }
421
422        let memtables = self.version.memtables.list_memtables();
423        // Skip empty memtables and memtables out of time range.
424        let mut mem_range_builders = Vec::new();
425
426        for m in memtables {
427            // check if memtable is empty by reading stats.
428            let Some((start, end)) = m.stats().time_range() else {
429                continue;
430            };
431            // The time range of the memtable is inclusive.
432            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
433            if !memtable_range.intersects(&time_range) {
434                continue;
435            }
436            let ranges_in_memtable = m.ranges(
437                Some(mapper.column_ids()),
438                predicate.clone(),
439                self.request.sequence,
440            )?;
441            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
442                // todo: we should add stats to MemtableRange
443                let mut stats = ranges_in_memtable.stats.clone();
444                stats.num_ranges = 1;
445                stats.num_rows = v.num_rows();
446                MemRangeBuilder::new(v, stats)
447            }));
448        }
449
450        let region_id = self.region_id();
451        debug!(
452            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
453            region_id,
454            self.request,
455            time_range,
456            mem_range_builders.len(),
457            files.len(),
458            self.version.options.append_mode,
459        );
460
461        // Remove field filters for LastNonNull mode after logging the request.
462        self.maybe_remove_field_filters();
463
464        let inverted_index_applier = self.build_invereted_index_applier();
465        let bloom_filter_applier = self.build_bloom_filter_applier();
466        let fulltext_index_applier = self.build_fulltext_index_applier();
467        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
468
469        if self.flat_format {
470            // The batch is already large enough so we use a small channel size here.
471            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
472        }
473
474        let input = ScanInput::new(self.access_layer, mapper)
475            .with_time_range(Some(time_range))
476            .with_predicate(predicate)
477            .with_memtables(mem_range_builders)
478            .with_files(files)
479            .with_cache(self.cache_strategy)
480            .with_inverted_index_applier(inverted_index_applier)
481            .with_bloom_filter_index_applier(bloom_filter_applier)
482            .with_fulltext_index_applier(fulltext_index_applier)
483            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
484            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
485            .with_start_time(self.start_time)
486            .with_append_mode(self.version.options.append_mode)
487            .with_filter_deleted(self.filter_deleted)
488            .with_merge_mode(self.version.options.merge_mode())
489            .with_series_row_selector(self.request.series_row_selector)
490            .with_distribution(self.request.distribution)
491            .with_flat_format(self.flat_format);
492
493        #[cfg(feature = "enterprise")]
494        let input = if let Some(provider) = self.extension_range_provider {
495            let ranges = provider
496                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
497                .await?;
498            debug!("Find extension ranges: {ranges:?}");
499            input.with_extension_ranges(ranges)
500        } else {
501            input
502        };
503        Ok(input)
504    }
505
506    fn region_id(&self) -> RegionId {
507        self.version.metadata.region_id
508    }
509
510    /// Build time range predicate from filters.
511    fn build_time_range_predicate(&self) -> TimestampRange {
512        let time_index = self.version.metadata.time_index_column();
513        let unit = time_index
514            .column_schema
515            .data_type
516            .as_timestamp()
517            .expect("Time index must have timestamp-compatible type")
518            .unit();
519        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
520    }
521
522    /// Remove field filters if the merge mode is [MergeMode::LastNonNull].
523    fn maybe_remove_field_filters(&mut self) {
524        if self.version.options.merge_mode() != MergeMode::LastNonNull {
525            return;
526        }
527
528        // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
529        let field_columns = self
530            .version
531            .metadata
532            .field_columns()
533            .map(|col| &col.column_schema.name)
534            .collect::<HashSet<_>>();
535        // Columns in the expr.
536        let mut columns = HashSet::new();
537
538        self.request.filters.retain(|expr| {
539            columns.clear();
540            // `expr_to_columns` won't return error.
541            if expr_to_columns(expr, &mut columns).is_err() {
542                return false;
543            }
544            for column in &columns {
545                if field_columns.contains(&column.name) {
546                    // This expr uses the field column.
547                    return false;
548                }
549            }
550            true
551        });
552    }
553
554    /// Use the latest schema to build the inverted index applier.
555    fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
556        if self.ignore_inverted_index {
557            return None;
558        }
559
560        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
561        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
562
563        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
564
565        InvertedIndexApplierBuilder::new(
566            self.access_layer.table_dir().to_string(),
567            self.access_layer.path_type(),
568            self.access_layer.object_store().clone(),
569            self.version.metadata.as_ref(),
570            self.version.metadata.inverted_indexed_column_ids(
571                self.version
572                    .options
573                    .index_options
574                    .inverted_index
575                    .ignore_column_ids
576                    .iter(),
577            ),
578            self.access_layer.puffin_manager_factory().clone(),
579        )
580        .with_file_cache(file_cache)
581        .with_inverted_index_cache(inverted_index_cache)
582        .with_puffin_metadata_cache(puffin_metadata_cache)
583        .build(&self.request.filters)
584        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
585        .ok()
586        .flatten()
587        .map(Arc::new)
588    }
589
590    /// Use the latest schema to build the bloom filter index applier.
591    fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
592        if self.ignore_bloom_filter {
593            return None;
594        }
595
596        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
597        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
598        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
599
600        BloomFilterIndexApplierBuilder::new(
601            self.access_layer.table_dir().to_string(),
602            self.access_layer.path_type(),
603            self.access_layer.object_store().clone(),
604            self.version.metadata.as_ref(),
605            self.access_layer.puffin_manager_factory().clone(),
606        )
607        .with_file_cache(file_cache)
608        .with_bloom_filter_index_cache(bloom_filter_index_cache)
609        .with_puffin_metadata_cache(puffin_metadata_cache)
610        .build(&self.request.filters)
611        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
612        .ok()
613        .flatten()
614        .map(Arc::new)
615    }
616
617    /// Use the latest schema to build the fulltext index applier.
618    fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
619        if self.ignore_fulltext_index {
620            return None;
621        }
622
623        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
624        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
625        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
626        FulltextIndexApplierBuilder::new(
627            self.access_layer.table_dir().to_string(),
628            self.access_layer.path_type(),
629            self.access_layer.object_store().clone(),
630            self.access_layer.puffin_manager_factory().clone(),
631            self.version.metadata.as_ref(),
632        )
633        .with_file_cache(file_cache)
634        .with_puffin_metadata_cache(puffin_metadata_cache)
635        .with_bloom_filter_cache(bloom_filter_index_cache)
636        .build(&self.request.filters)
637        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
638        .ok()
639        .flatten()
640        .map(Arc::new)
641    }
642}
643
644/// Returns true if the time range of a SST `file` matches the `predicate`.
645fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
646    if predicate == &TimestampRange::min_to_max() {
647        return true;
648    }
649    // end timestamp of a SST is inclusive.
650    let (start, end) = file.time_range();
651    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
652    file_ts_range.intersects(predicate)
653}
654
655/// Common input for different scanners.
656pub struct ScanInput {
657    /// Region SST access layer.
658    access_layer: AccessLayerRef,
659    /// Maps projected Batches to RecordBatches.
660    pub(crate) mapper: Arc<ProjectionMapper>,
661    /// Time range filter for time index.
662    time_range: Option<TimestampRange>,
663    /// Predicate to push down.
664    pub(crate) predicate: PredicateGroup,
665    /// Memtable range builders for memtables in the time range..
666    pub(crate) memtables: Vec<MemRangeBuilder>,
667    /// Handles to SST files to scan.
668    pub(crate) files: Vec<FileHandle>,
669    /// Cache.
670    pub(crate) cache_strategy: CacheStrategy,
671    /// Ignores file not found error.
672    ignore_file_not_found: bool,
673    /// Capacity of the channel to send data from parallel scan tasks to the main task.
674    pub(crate) parallel_scan_channel_size: usize,
675    /// Maximum number of SST files to scan concurrently.
676    pub(crate) max_concurrent_scan_files: usize,
677    /// Index appliers.
678    inverted_index_applier: Option<InvertedIndexApplierRef>,
679    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
680    fulltext_index_applier: Option<FulltextIndexApplierRef>,
681    /// Start time of the query.
682    pub(crate) query_start: Option<Instant>,
683    /// The region is using append mode.
684    pub(crate) append_mode: bool,
685    /// Whether to remove deletion markers.
686    pub(crate) filter_deleted: bool,
687    /// Mode to merge duplicate rows.
688    pub(crate) merge_mode: MergeMode,
689    /// Hint to select rows from time series.
690    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
691    /// Hint for the required distribution of the scanner.
692    pub(crate) distribution: Option<TimeSeriesDistribution>,
693    /// Whether to use flat format.
694    pub(crate) flat_format: bool,
695    #[cfg(feature = "enterprise")]
696    extension_ranges: Vec<BoxedExtensionRange>,
697}
698
699impl ScanInput {
700    /// Creates a new [ScanInput].
701    #[must_use]
702    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
703        ScanInput {
704            access_layer,
705            mapper: Arc::new(mapper),
706            time_range: None,
707            predicate: PredicateGroup::default(),
708            memtables: Vec::new(),
709            files: Vec::new(),
710            cache_strategy: CacheStrategy::Disabled,
711            ignore_file_not_found: false,
712            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
713            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
714            inverted_index_applier: None,
715            bloom_filter_index_applier: None,
716            fulltext_index_applier: None,
717            query_start: None,
718            append_mode: false,
719            filter_deleted: true,
720            merge_mode: MergeMode::default(),
721            series_row_selector: None,
722            distribution: None,
723            flat_format: false,
724            #[cfg(feature = "enterprise")]
725            extension_ranges: Vec::new(),
726        }
727    }
728
729    /// Sets time range filter for time index.
730    #[must_use]
731    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
732        self.time_range = time_range;
733        self
734    }
735
736    /// Sets predicate to push down.
737    #[must_use]
738    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
739        self.predicate = predicate;
740        self
741    }
742
743    /// Sets memtable range builders.
744    #[must_use]
745    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
746        self.memtables = memtables;
747        self
748    }
749
750    /// Sets files to read.
751    #[must_use]
752    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
753        self.files = files;
754        self
755    }
756
757    /// Sets cache for this query.
758    #[must_use]
759    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
760        self.cache_strategy = cache;
761        self
762    }
763
764    /// Ignores file not found error.
765    #[must_use]
766    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
767        self.ignore_file_not_found = ignore;
768        self
769    }
770
771    /// Sets scan task channel size.
772    #[must_use]
773    pub(crate) fn with_parallel_scan_channel_size(
774        mut self,
775        parallel_scan_channel_size: usize,
776    ) -> Self {
777        self.parallel_scan_channel_size = parallel_scan_channel_size;
778        self
779    }
780
781    /// Sets maximum number of SST files to scan concurrently.
782    #[must_use]
783    pub(crate) fn with_max_concurrent_scan_files(
784        mut self,
785        max_concurrent_scan_files: usize,
786    ) -> Self {
787        self.max_concurrent_scan_files = max_concurrent_scan_files;
788        self
789    }
790
791    /// Sets invereted index applier.
792    #[must_use]
793    pub(crate) fn with_inverted_index_applier(
794        mut self,
795        applier: Option<InvertedIndexApplierRef>,
796    ) -> Self {
797        self.inverted_index_applier = applier;
798        self
799    }
800
801    /// Sets bloom filter applier.
802    #[must_use]
803    pub(crate) fn with_bloom_filter_index_applier(
804        mut self,
805        applier: Option<BloomFilterIndexApplierRef>,
806    ) -> Self {
807        self.bloom_filter_index_applier = applier;
808        self
809    }
810
811    /// Sets fulltext index applier.
812    #[must_use]
813    pub(crate) fn with_fulltext_index_applier(
814        mut self,
815        applier: Option<FulltextIndexApplierRef>,
816    ) -> Self {
817        self.fulltext_index_applier = applier;
818        self
819    }
820
821    /// Sets start time of the query.
822    #[must_use]
823    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
824        self.query_start = now;
825        self
826    }
827
828    #[must_use]
829    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
830        self.append_mode = is_append_mode;
831        self
832    }
833
834    /// Sets whether to remove deletion markers during scan.
835    #[must_use]
836    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
837        self.filter_deleted = filter_deleted;
838        self
839    }
840
841    /// Sets the merge mode.
842    #[must_use]
843    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
844        self.merge_mode = merge_mode;
845        self
846    }
847
848    /// Sets the distribution hint.
849    #[must_use]
850    pub(crate) fn with_distribution(
851        mut self,
852        distribution: Option<TimeSeriesDistribution>,
853    ) -> Self {
854        self.distribution = distribution;
855        self
856    }
857
858    /// Sets the time series row selector.
859    #[must_use]
860    pub(crate) fn with_series_row_selector(
861        mut self,
862        series_row_selector: Option<TimeSeriesRowSelector>,
863    ) -> Self {
864        self.series_row_selector = series_row_selector;
865        self
866    }
867
868    /// Sets whether to use flat format.
869    #[must_use]
870    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
871        self.flat_format = flat_format;
872        self
873    }
874
875    /// Scans sources in parallel.
876    ///
877    /// # Panics if the input doesn't allow parallel scan.
878    pub(crate) fn create_parallel_sources(
879        &self,
880        sources: Vec<Source>,
881        semaphore: Arc<Semaphore>,
882    ) -> Result<Vec<Source>> {
883        if sources.len() <= 1 {
884            return Ok(sources);
885        }
886
887        // Spawn a task for each source.
888        let sources = sources
889            .into_iter()
890            .map(|source| {
891                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
892                self.spawn_scan_task(source, semaphore.clone(), sender);
893                let stream = Box::pin(ReceiverStream::new(receiver));
894                Source::Stream(stream)
895            })
896            .collect();
897        Ok(sources)
898    }
899
900    /// Builds memtable ranges to scan by `index`.
901    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
902        let memtable = &self.memtables[index.index];
903        let mut ranges = SmallVec::new();
904        memtable.build_ranges(index.row_group_index, &mut ranges);
905        ranges
906    }
907
908    /// Prunes a file to scan and returns the builder to build readers.
909    pub async fn prune_file(
910        &self,
911        file: &FileHandle,
912        reader_metrics: &mut ReaderMetrics,
913    ) -> Result<FileRangeBuilder> {
914        let res = self
915            .access_layer
916            .read_sst(file.clone())
917            .predicate(self.predicate.predicate().cloned())
918            .projection(Some(self.mapper.column_ids().to_vec()))
919            .cache(self.cache_strategy.clone())
920            .inverted_index_applier(self.inverted_index_applier.clone())
921            .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
922            .fulltext_index_applier(self.fulltext_index_applier.clone())
923            .expected_metadata(Some(self.mapper.metadata().clone()))
924            .flat_format(self.flat_format)
925            .build_reader_input(reader_metrics)
926            .await;
927        let (mut file_range_ctx, selection) = match res {
928            Ok(x) => x,
929            Err(e) => {
930                if e.is_object_not_found() && self.ignore_file_not_found {
931                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
932                    return Ok(FileRangeBuilder::default());
933                } else {
934                    return Err(e);
935                }
936            }
937        };
938
939        let need_compat = !compat::has_same_columns_and_pk_encoding(
940            self.mapper.metadata(),
941            file_range_ctx.read_format().metadata(),
942        );
943        if need_compat {
944            // They have different schema. We need to adapt the batch first so the
945            // mapper can convert it.
946            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
947                let mapper = self.mapper.as_flat().unwrap();
948                Some(CompatBatch::Flat(FlatCompatBatch::try_new(
949                    mapper,
950                    flat_format.metadata(),
951                    flat_format.format_projection(),
952                )?))
953            } else {
954                let compact_batch = PrimaryKeyCompatBatch::new(
955                    &self.mapper,
956                    file_range_ctx.read_format().metadata().clone(),
957                )?;
958                Some(CompatBatch::PrimaryKey(compact_batch))
959            };
960            file_range_ctx.set_compat_batch(compat);
961        }
962        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
963    }
964
965    /// Scans the input source in another task and sends batches to the sender.
966    pub(crate) fn spawn_scan_task(
967        &self,
968        mut input: Source,
969        semaphore: Arc<Semaphore>,
970        sender: mpsc::Sender<Result<Batch>>,
971    ) {
972        common_runtime::spawn_global(async move {
973            loop {
974                // We release the permit before sending result to avoid the task waiting on
975                // the channel with the permit held.
976                let maybe_batch = {
977                    // Safety: We never close the semaphore.
978                    let _permit = semaphore.acquire().await.unwrap();
979                    input.next_batch().await
980                };
981                match maybe_batch {
982                    Ok(Some(batch)) => {
983                        let _ = sender.send(Ok(batch)).await;
984                    }
985                    Ok(None) => break,
986                    Err(e) => {
987                        let _ = sender.send(Err(e)).await;
988                        break;
989                    }
990                }
991            }
992        });
993    }
994
995    /// Scans flat sources (RecordBatch streams) in parallel.
996    ///
997    /// # Panics if the input doesn't allow parallel scan.
998    pub(crate) fn create_parallel_flat_sources(
999        &self,
1000        sources: Vec<BoxedRecordBatchStream>,
1001        semaphore: Arc<Semaphore>,
1002    ) -> Result<Vec<BoxedRecordBatchStream>> {
1003        if sources.len() <= 1 {
1004            return Ok(sources);
1005        }
1006
1007        // Spawn a task for each source.
1008        let sources = sources
1009            .into_iter()
1010            .map(|source| {
1011                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1012                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1013                let stream = Box::pin(ReceiverStream::new(receiver));
1014                Box::pin(stream) as _
1015            })
1016            .collect();
1017        Ok(sources)
1018    }
1019
1020    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1021    pub(crate) fn spawn_flat_scan_task(
1022        &self,
1023        mut input: BoxedRecordBatchStream,
1024        semaphore: Arc<Semaphore>,
1025        sender: mpsc::Sender<Result<RecordBatch>>,
1026    ) {
1027        common_runtime::spawn_global(async move {
1028            loop {
1029                // We release the permit before sending result to avoid the task waiting on
1030                // the channel with the permit held.
1031                let maybe_batch = {
1032                    // Safety: We never close the semaphore.
1033                    let _permit = semaphore.acquire().await.unwrap();
1034                    input.next().await
1035                };
1036                match maybe_batch {
1037                    Some(Ok(batch)) => {
1038                        let _ = sender.send(Ok(batch)).await;
1039                    }
1040                    Some(Err(e)) => {
1041                        let _ = sender.send(Err(e)).await;
1042                        break;
1043                    }
1044                    None => break,
1045                }
1046            }
1047        });
1048    }
1049
1050    pub(crate) fn total_rows(&self) -> usize {
1051        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1052        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1053
1054        let rows = rows_in_files + rows_in_memtables;
1055        #[cfg(feature = "enterprise")]
1056        let rows = rows
1057            + self
1058                .extension_ranges
1059                .iter()
1060                .map(|x| x.num_rows())
1061                .sum::<u64>() as usize;
1062        rows
1063    }
1064
1065    /// Returns table predicate of all exprs.
1066    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1067        self.predicate.predicate()
1068    }
1069
1070    /// Returns number of memtables to scan.
1071    pub(crate) fn num_memtables(&self) -> usize {
1072        self.memtables.len()
1073    }
1074
1075    /// Returns number of SST files to scan.
1076    pub(crate) fn num_files(&self) -> usize {
1077        self.files.len()
1078    }
1079
1080    pub fn region_metadata(&self) -> &RegionMetadataRef {
1081        self.mapper.metadata()
1082    }
1083}
1084
1085#[cfg(feature = "enterprise")]
1086impl ScanInput {
1087    #[must_use]
1088    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1089        Self {
1090            extension_ranges,
1091            ..self
1092        }
1093    }
1094
1095    #[cfg(feature = "enterprise")]
1096    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1097        &self.extension_ranges
1098    }
1099
1100    /// Get a boxed [ExtensionRange] by the index in all ranges.
1101    #[cfg(feature = "enterprise")]
1102    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1103        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1104    }
1105}
1106
1107#[cfg(test)]
1108impl ScanInput {
1109    /// Returns SST file ids to scan.
1110    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1111        self.files.iter().map(|file| file.file_id()).collect()
1112    }
1113}
1114
1115/// Context shared by different streams from a scanner.
1116/// It contains the input and ranges to scan.
1117pub struct StreamContext {
1118    /// Input memtables and files.
1119    pub input: ScanInput,
1120    /// Metadata for partition ranges.
1121    pub(crate) ranges: Vec<RangeMeta>,
1122
1123    // Metrics:
1124    /// The start time of the query.
1125    pub(crate) query_start: Instant,
1126}
1127
1128impl StreamContext {
1129    /// Creates a new [StreamContext] for [SeqScan].
1130    pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
1131        let query_start = input.query_start.unwrap_or_else(Instant::now);
1132        let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
1133        READ_SST_COUNT.observe(input.num_files() as f64);
1134
1135        Self {
1136            input,
1137            ranges,
1138            query_start,
1139        }
1140    }
1141
1142    /// Creates a new [StreamContext] for [UnorderedScan].
1143    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1144        let query_start = input.query_start.unwrap_or_else(Instant::now);
1145        let ranges = RangeMeta::unordered_scan_ranges(&input);
1146        READ_SST_COUNT.observe(input.num_files() as f64);
1147
1148        Self {
1149            input,
1150            ranges,
1151            query_start,
1152        }
1153    }
1154
1155    /// Returns true if the index refers to a memtable.
1156    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1157        self.input.num_memtables() > index.index
1158    }
1159
1160    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1161        !self.is_mem_range_index(index)
1162            && index.index < self.input.num_files() + self.input.num_memtables()
1163    }
1164
1165    /// Retrieves the partition ranges.
1166    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1167        self.ranges
1168            .iter()
1169            .enumerate()
1170            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1171            .collect()
1172    }
1173
1174    /// Format the context for explain.
1175    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1176        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1177        for range_meta in &self.ranges {
1178            for idx in &range_meta.row_group_indices {
1179                if self.is_mem_range_index(*idx) {
1180                    num_mem_ranges += 1;
1181                } else if self.is_file_range_index(*idx) {
1182                    num_file_ranges += 1;
1183                } else {
1184                    num_other_ranges += 1;
1185                }
1186            }
1187        }
1188        if verbose {
1189            write!(f, "{{")?;
1190        }
1191        write!(
1192            f,
1193            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1194            self.ranges.len(),
1195            num_mem_ranges,
1196            self.input.num_files(),
1197            num_file_ranges,
1198        )?;
1199        if num_other_ranges > 0 {
1200            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1201        }
1202        write!(f, "}}")?;
1203
1204        if let Some(selector) = &self.input.series_row_selector {
1205            write!(f, ", \"selector\":\"{}\"", selector)?;
1206        }
1207        if let Some(distribution) = &self.input.distribution {
1208            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1209        }
1210
1211        if verbose {
1212            self.format_verbose_content(f)?;
1213        }
1214
1215        Ok(())
1216    }
1217
1218    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1219        struct FileWrapper<'a> {
1220            file: &'a FileHandle,
1221        }
1222
1223        impl fmt::Debug for FileWrapper<'_> {
1224            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1225                let (start, end) = self.file.time_range();
1226                write!(
1227                    f,
1228                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1229                    self.file.file_id(),
1230                    start.value(),
1231                    start.unit(),
1232                    end.value(),
1233                    end.unit(),
1234                    self.file.num_rows(),
1235                    self.file.size(),
1236                    self.file.index_size()
1237                )
1238            }
1239        }
1240
1241        struct InputWrapper<'a> {
1242            input: &'a ScanInput,
1243        }
1244
1245        #[cfg(feature = "enterprise")]
1246        impl InputWrapper<'_> {
1247            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1248                if self.input.extension_ranges.is_empty() {
1249                    return Ok(());
1250                }
1251
1252                let mut delimiter = "";
1253                write!(f, ", extension_ranges: [")?;
1254                for range in self.input.extension_ranges() {
1255                    write!(f, "{}{:?}", delimiter, range)?;
1256                    delimiter = ", ";
1257                }
1258                write!(f, "]")?;
1259                Ok(())
1260            }
1261        }
1262
1263        impl fmt::Debug for InputWrapper<'_> {
1264            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1265                let output_schema = self.input.mapper.output_schema();
1266                if !output_schema.is_empty() {
1267                    let names: Vec<_> = output_schema
1268                        .column_schemas()
1269                        .iter()
1270                        .map(|col| &col.name)
1271                        .collect();
1272                    write!(f, ", \"projection\": {:?}", names)?;
1273                }
1274                if let Some(predicate) = &self.input.predicate.predicate() {
1275                    if !predicate.exprs().is_empty() {
1276                        let exprs: Vec<_> =
1277                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1278                        write!(f, ", \"filters\": {:?}", exprs)?;
1279                    }
1280                }
1281                if !self.input.files.is_empty() {
1282                    write!(f, ", \"files\": ")?;
1283                    f.debug_list()
1284                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1285                        .finish()?;
1286                }
1287
1288                #[cfg(feature = "enterprise")]
1289                self.format_extension_ranges(f)?;
1290
1291                Ok(())
1292            }
1293        }
1294
1295        write!(f, "{:?}", InputWrapper { input: &self.input })
1296    }
1297}
1298
1299/// Predicates to evaluate.
1300/// It only keeps filters that [SimpleFilterEvaluator] supports.
1301#[derive(Clone, Default)]
1302pub struct PredicateGroup {
1303    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1304
1305    /// Table predicate for all logical exprs to evaluate.
1306    /// Parquet reader uses it to prune row groups.
1307    predicate: Option<Predicate>,
1308}
1309
1310impl PredicateGroup {
1311    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1312    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1313        let mut time_filters = Vec::with_capacity(exprs.len());
1314        // Columns in the expr.
1315        let mut columns = HashSet::new();
1316        for expr in exprs {
1317            columns.clear();
1318            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1319                continue;
1320            };
1321            time_filters.push(filter);
1322        }
1323        let time_filters = if time_filters.is_empty() {
1324            None
1325        } else {
1326            Some(Arc::new(time_filters))
1327        };
1328        let predicate = Predicate::new(exprs.to_vec());
1329
1330        Self {
1331            time_filters,
1332            predicate: Some(predicate),
1333        }
1334    }
1335
1336    /// Returns time filters.
1337    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1338        self.time_filters.clone()
1339    }
1340
1341    /// Returns predicate of all exprs.
1342    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1343        self.predicate.as_ref()
1344    }
1345
1346    fn expr_to_filter(
1347        expr: &Expr,
1348        metadata: &RegionMetadata,
1349        columns: &mut HashSet<Column>,
1350    ) -> Option<SimpleFilterEvaluator> {
1351        columns.clear();
1352        // `expr_to_columns` won't return error.
1353        // We still ignore these expressions for safety.
1354        expr_to_columns(expr, columns).ok()?;
1355        if columns.len() > 1 {
1356            // Simple filter doesn't support multiple columns.
1357            return None;
1358        }
1359        let column = columns.iter().next()?;
1360        let column_meta = metadata.column_by_name(&column.name)?;
1361        if column_meta.semantic_type == SemanticType::Timestamp {
1362            SimpleFilterEvaluator::try_new(expr)
1363        } else {
1364            None
1365        }
1366    }
1367}