mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::ResultExt;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40    RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73use crate::sst::parquet::file_range::PreFilterMode;
74use crate::sst::parquet::reader::ReaderMetrics;
75
76/// Parallel scan channel size for flat format.
77const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
78
79/// A scanner scans a region and returns a [SendableRecordBatchStream].
80pub(crate) enum Scanner {
81    /// Sequential scan.
82    Seq(SeqScan),
83    /// Unordered scan.
84    Unordered(UnorderedScan),
85    /// Per-series scan.
86    Series(SeriesScan),
87}
88
89impl Scanner {
90    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
91    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
92    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93        match self {
94            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
95            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
96            Scanner::Series(series_scan) => series_scan.build_stream().await,
97        }
98    }
99
100    /// Create a stream of [`Batch`] by this scanner.
101    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
102        match self {
103            Scanner::Seq(x) => x.scan_all_partitions(),
104            Scanner::Unordered(x) => x.scan_all_partitions(),
105            Scanner::Series(x) => x.scan_all_partitions(),
106        }
107    }
108}
109
110#[cfg(test)]
111impl Scanner {
112    /// Returns number of files to scan.
113    pub(crate) fn num_files(&self) -> usize {
114        match self {
115            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
116            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
117            Scanner::Series(series_scan) => series_scan.input().num_files(),
118        }
119    }
120
121    /// Returns number of memtables to scan.
122    pub(crate) fn num_memtables(&self) -> usize {
123        match self {
124            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
125            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
126            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
127        }
128    }
129
130    /// Returns SST file ids to scan.
131    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
132        match self {
133            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
134            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
135            Scanner::Series(series_scan) => series_scan.input().file_ids(),
136        }
137    }
138
139    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
140        match self {
141            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
142            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
143            Scanner::Series(series_scan) => series_scan.input().index_ids(),
144        }
145    }
146
147    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
148    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
149        use store_api::region_engine::{PrepareRequest, RegionScanner};
150
151        let request = PrepareRequest::default().with_target_partitions(target_partitions);
152        match self {
153            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
154            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
155            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
156        }
157    }
158}
159
160#[cfg_attr(doc, aquamarine::aquamarine)]
161/// Helper to scans a region by [ScanRequest].
162///
163/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
164/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
165///
166/// ```mermaid
167/// classDiagram
168/// class ScanRegion {
169///     -VersionRef version
170///     -ScanRequest request
171///     ~scanner() Scanner
172///     ~seq_scan() SeqScan
173/// }
174/// class Scanner {
175///     <<enumeration>>
176///     SeqScan
177///     UnorderedScan
178///     +scan() SendableRecordBatchStream
179/// }
180/// class SeqScan {
181///     -ScanInput input
182///     +build() SendableRecordBatchStream
183/// }
184/// class UnorderedScan {
185///     -ScanInput input
186///     +build() SendableRecordBatchStream
187/// }
188/// class ScanInput {
189///     -ProjectionMapper mapper
190///     -Option~TimeRange~ time_range
191///     -Option~Predicate~ predicate
192///     -Vec~MemtableRef~ memtables
193///     -Vec~FileHandle~ files
194/// }
195/// class ProjectionMapper {
196///     ~output_schema() SchemaRef
197///     ~convert(Batch) RecordBatch
198/// }
199/// ScanRegion -- Scanner
200/// ScanRegion o-- ScanRequest
201/// Scanner o-- SeqScan
202/// Scanner o-- UnorderedScan
203/// SeqScan o-- ScanInput
204/// UnorderedScan o-- ScanInput
205/// Scanner -- SendableRecordBatchStream
206/// ScanInput o-- ProjectionMapper
207/// SeqScan -- SendableRecordBatchStream
208/// UnorderedScan -- SendableRecordBatchStream
209/// ```
210pub(crate) struct ScanRegion {
211    /// Version of the region at scan.
212    version: VersionRef,
213    /// Access layer of the region.
214    access_layer: AccessLayerRef,
215    /// Scan request.
216    request: ScanRequest,
217    /// Cache.
218    cache_strategy: CacheStrategy,
219    /// Capacity of the channel to send data from parallel scan tasks to the main task.
220    parallel_scan_channel_size: usize,
221    /// Maximum number of SST files to scan concurrently.
222    max_concurrent_scan_files: usize,
223    /// Whether to ignore inverted index.
224    ignore_inverted_index: bool,
225    /// Whether to ignore fulltext index.
226    ignore_fulltext_index: bool,
227    /// Whether to ignore bloom filter.
228    ignore_bloom_filter: bool,
229    /// Start time of the scan task.
230    start_time: Option<Instant>,
231    /// Whether to filter out the deleted rows.
232    /// Usually true for normal read, and false for scan for compaction.
233    filter_deleted: bool,
234    #[cfg(feature = "enterprise")]
235    extension_range_provider: Option<BoxedExtensionRangeProvider>,
236}
237
238impl ScanRegion {
239    /// Creates a [ScanRegion].
240    pub(crate) fn new(
241        version: VersionRef,
242        access_layer: AccessLayerRef,
243        request: ScanRequest,
244        cache_strategy: CacheStrategy,
245    ) -> ScanRegion {
246        ScanRegion {
247            version,
248            access_layer,
249            request,
250            cache_strategy,
251            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
252            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
253            ignore_inverted_index: false,
254            ignore_fulltext_index: false,
255            ignore_bloom_filter: false,
256            start_time: None,
257            filter_deleted: true,
258            #[cfg(feature = "enterprise")]
259            extension_range_provider: None,
260        }
261    }
262
263    /// Sets parallel scan task channel size.
264    #[must_use]
265    pub(crate) fn with_parallel_scan_channel_size(
266        mut self,
267        parallel_scan_channel_size: usize,
268    ) -> Self {
269        self.parallel_scan_channel_size = parallel_scan_channel_size;
270        self
271    }
272
273    /// Sets maximum number of SST files to scan concurrently.
274    #[must_use]
275    pub(crate) fn with_max_concurrent_scan_files(
276        mut self,
277        max_concurrent_scan_files: usize,
278    ) -> Self {
279        self.max_concurrent_scan_files = max_concurrent_scan_files;
280        self
281    }
282
283    /// Sets whether to ignore inverted index.
284    #[must_use]
285    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
286        self.ignore_inverted_index = ignore;
287        self
288    }
289
290    /// Sets whether to ignore fulltext index.
291    #[must_use]
292    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
293        self.ignore_fulltext_index = ignore;
294        self
295    }
296
297    /// Sets whether to ignore bloom filter.
298    #[must_use]
299    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
300        self.ignore_bloom_filter = ignore;
301        self
302    }
303
304    #[must_use]
305    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
306        self.start_time = Some(now);
307        self
308    }
309
310    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
311        self.filter_deleted = filter_deleted;
312    }
313
314    #[cfg(feature = "enterprise")]
315    pub(crate) fn set_extension_range_provider(
316        &mut self,
317        extension_range_provider: BoxedExtensionRangeProvider,
318    ) {
319        self.extension_range_provider = Some(extension_range_provider);
320    }
321
322    /// Returns a [Scanner] to scan the region.
323    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
324    pub(crate) async fn scanner(self) -> Result<Scanner> {
325        if self.use_series_scan() {
326            self.series_scan().await.map(Scanner::Series)
327        } else if self.use_unordered_scan() {
328            // If table is append only and there is no series row selector, we use unordered scan in query.
329            // We still use seq scan in compaction.
330            self.unordered_scan().await.map(Scanner::Unordered)
331        } else {
332            self.seq_scan().await.map(Scanner::Seq)
333        }
334    }
335
336    /// Returns a [RegionScanner] to scan the region.
337    #[tracing::instrument(
338        level = tracing::Level::DEBUG,
339        skip_all,
340        fields(region_id = %self.region_id())
341    )]
342    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
343        if self.use_series_scan() {
344            self.series_scan()
345                .await
346                .map(|scanner| Box::new(scanner) as _)
347        } else if self.use_unordered_scan() {
348            self.unordered_scan()
349                .await
350                .map(|scanner| Box::new(scanner) as _)
351        } else {
352            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
353        }
354    }
355
356    /// Scan sequentially.
357    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
358    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
359        let input = self.scan_input().await?.with_compaction(false);
360        Ok(SeqScan::new(input))
361    }
362
363    /// Unordered scan.
364    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
365    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
366        let input = self.scan_input().await?;
367        Ok(UnorderedScan::new(input))
368    }
369
370    /// Scans by series.
371    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
372    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
373        let input = self.scan_input().await?;
374        Ok(SeriesScan::new(input))
375    }
376
377    /// Returns true if the region can use unordered scan for current request.
378    fn use_unordered_scan(&self) -> bool {
379        // We use unordered scan when:
380        // 1. The region is in append mode.
381        // 2. There is no series row selector.
382        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
383        //
384        // We still use seq scan in compaction.
385        self.version.options.append_mode
386            && self.request.series_row_selector.is_none()
387            && (self.request.distribution.is_none()
388                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
389    }
390
391    /// Returns true if the region can use series scan for current request.
392    fn use_series_scan(&self) -> bool {
393        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
394    }
395
396    /// Returns true if the region use flat format.
397    fn use_flat_format(&self) -> bool {
398        self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
399    }
400
401    /// Creates a scan input.
402    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
403    async fn scan_input(mut self) -> Result<ScanInput> {
404        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
405        let time_range = self.build_time_range_predicate();
406        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
407        let flat_format = self.use_flat_format();
408
409        // The mapper always computes projected column ids as the schema of SSTs may change.
410        let mapper = match &self.request.projection {
411            Some(p) => {
412                ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
413            }
414            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
415        };
416
417        let ssts = &self.version.ssts;
418        let mut files = Vec::new();
419        for level in ssts.levels() {
420            for file in level.files.values() {
421                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
422                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
423                    // If the file's sequence is None (or actually is zero), it could mean the file
424                    // is generated and added to the region "directly". In this case, its data should
425                    // be considered as fresh as the memtable. So its sequence is treated greater than
426                    // the min_sequence, whatever the value of min_sequence is. Hence the default
427                    // "true" in this arm.
428                    (Some(_), None) => true,
429                    (None, _) => true,
430                };
431
432                // Finds SST files in range.
433                if exceed_min_sequence && file_in_range(file, &time_range) {
434                    files.push(file.clone());
435                }
436                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
437                // unless the timing is too good, or the sequence number wouldn't be in file.
438                // and the batch will be filtered out by tree reader anyway.
439            }
440        }
441
442        let memtables = self.version.memtables.list_memtables();
443        // Skip empty memtables and memtables out of time range.
444        let mut mem_range_builders = Vec::new();
445        let filter_mode = pre_filter_mode(
446            self.version.options.append_mode,
447            self.version.options.merge_mode(),
448        );
449
450        for m in memtables {
451            // check if memtable is empty by reading stats.
452            let Some((start, end)) = m.stats().time_range() else {
453                continue;
454            };
455            // The time range of the memtable is inclusive.
456            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
457            if !memtable_range.intersects(&time_range) {
458                continue;
459            }
460            let ranges_in_memtable = m.ranges(
461                Some(mapper.column_ids()),
462                RangesOptions::default()
463                    .with_predicate(predicate.clone())
464                    .with_sequence(SequenceRange::new(
465                        self.request.memtable_min_sequence,
466                        self.request.memtable_max_sequence,
467                    ))
468                    .with_pre_filter_mode(filter_mode),
469            )?;
470            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
471                let stats = v.stats().clone();
472                MemRangeBuilder::new(v, stats)
473            }));
474        }
475
476        let region_id = self.region_id();
477        debug!(
478            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
479            region_id,
480            self.request,
481            time_range,
482            mem_range_builders.len(),
483            files.len(),
484            self.version.options.append_mode,
485            flat_format,
486        );
487
488        let (non_field_filters, field_filters) = self.partition_by_field_filters();
489        let inverted_index_appliers = [
490            self.build_invereted_index_applier(&non_field_filters),
491            self.build_invereted_index_applier(&field_filters),
492        ];
493        let bloom_filter_appliers = [
494            self.build_bloom_filter_applier(&non_field_filters),
495            self.build_bloom_filter_applier(&field_filters),
496        ];
497        let fulltext_index_appliers = [
498            self.build_fulltext_index_applier(&non_field_filters),
499            self.build_fulltext_index_applier(&field_filters),
500        ];
501        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
502
503        if flat_format {
504            // The batch is already large enough so we use a small channel size here.
505            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
506        }
507
508        let input = ScanInput::new(self.access_layer, mapper)
509            .with_time_range(Some(time_range))
510            .with_predicate(predicate)
511            .with_memtables(mem_range_builders)
512            .with_files(files)
513            .with_cache(self.cache_strategy)
514            .with_inverted_index_appliers(inverted_index_appliers)
515            .with_bloom_filter_index_appliers(bloom_filter_appliers)
516            .with_fulltext_index_appliers(fulltext_index_appliers)
517            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
518            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
519            .with_start_time(self.start_time)
520            .with_append_mode(self.version.options.append_mode)
521            .with_filter_deleted(self.filter_deleted)
522            .with_merge_mode(self.version.options.merge_mode())
523            .with_series_row_selector(self.request.series_row_selector)
524            .with_distribution(self.request.distribution)
525            .with_flat_format(flat_format);
526
527        #[cfg(feature = "enterprise")]
528        let input = if let Some(provider) = self.extension_range_provider {
529            let ranges = provider
530                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
531                .await?;
532            debug!("Find extension ranges: {ranges:?}");
533            input.with_extension_ranges(ranges)
534        } else {
535            input
536        };
537        Ok(input)
538    }
539
540    fn region_id(&self) -> RegionId {
541        self.version.metadata.region_id
542    }
543
544    /// Build time range predicate from filters.
545    fn build_time_range_predicate(&self) -> TimestampRange {
546        let time_index = self.version.metadata.time_index_column();
547        let unit = time_index
548            .column_schema
549            .data_type
550            .as_timestamp()
551            .expect("Time index must have timestamp-compatible type")
552            .unit();
553        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
554    }
555
556    /// Partitions filters into two groups: non-field filters and field filters.
557    /// Returns `(non_field_filters, field_filters)`.
558    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
559        let field_columns = self
560            .version
561            .metadata
562            .field_columns()
563            .map(|col| &col.column_schema.name)
564            .collect::<HashSet<_>>();
565
566        let mut columns = HashSet::new();
567
568        self.request.filters.iter().cloned().partition(|expr| {
569            columns.clear();
570            // `expr_to_columns` won't return error.
571            if expr_to_columns(expr, &mut columns).is_err() {
572                // If we can't extract columns, treat it as non-field filter
573                return true;
574            }
575            // Return true for non-field filters (partition puts true cases in first vec)
576            !columns
577                .iter()
578                .any(|column| field_columns.contains(&column.name))
579        })
580    }
581
582    /// Use the latest schema to build the inverted index applier.
583    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
584        if self.ignore_inverted_index {
585            return None;
586        }
587
588        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
589        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
590
591        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
592
593        InvertedIndexApplierBuilder::new(
594            self.access_layer.table_dir().to_string(),
595            self.access_layer.path_type(),
596            self.access_layer.object_store().clone(),
597            self.version.metadata.as_ref(),
598            self.version.metadata.inverted_indexed_column_ids(
599                self.version
600                    .options
601                    .index_options
602                    .inverted_index
603                    .ignore_column_ids
604                    .iter(),
605            ),
606            self.access_layer.puffin_manager_factory().clone(),
607        )
608        .with_file_cache(file_cache)
609        .with_inverted_index_cache(inverted_index_cache)
610        .with_puffin_metadata_cache(puffin_metadata_cache)
611        .build(filters)
612        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
613        .ok()
614        .flatten()
615        .map(Arc::new)
616    }
617
618    /// Use the latest schema to build the bloom filter index applier.
619    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
620        if self.ignore_bloom_filter {
621            return None;
622        }
623
624        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
625        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
626        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
627
628        BloomFilterIndexApplierBuilder::new(
629            self.access_layer.table_dir().to_string(),
630            self.access_layer.path_type(),
631            self.access_layer.object_store().clone(),
632            self.version.metadata.as_ref(),
633            self.access_layer.puffin_manager_factory().clone(),
634        )
635        .with_file_cache(file_cache)
636        .with_bloom_filter_index_cache(bloom_filter_index_cache)
637        .with_puffin_metadata_cache(puffin_metadata_cache)
638        .build(filters)
639        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
640        .ok()
641        .flatten()
642        .map(Arc::new)
643    }
644
645    /// Use the latest schema to build the fulltext index applier.
646    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
647        if self.ignore_fulltext_index {
648            return None;
649        }
650
651        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
652        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
653        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
654        FulltextIndexApplierBuilder::new(
655            self.access_layer.table_dir().to_string(),
656            self.access_layer.path_type(),
657            self.access_layer.object_store().clone(),
658            self.access_layer.puffin_manager_factory().clone(),
659            self.version.metadata.as_ref(),
660        )
661        .with_file_cache(file_cache)
662        .with_puffin_metadata_cache(puffin_metadata_cache)
663        .with_bloom_filter_cache(bloom_filter_index_cache)
664        .build(filters)
665        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
666        .ok()
667        .flatten()
668        .map(Arc::new)
669    }
670}
671
672/// Returns true if the time range of a SST `file` matches the `predicate`.
673fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
674    if predicate == &TimestampRange::min_to_max() {
675        return true;
676    }
677    // end timestamp of a SST is inclusive.
678    let (start, end) = file.time_range();
679    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
680    file_ts_range.intersects(predicate)
681}
682
683/// Common input for different scanners.
684pub struct ScanInput {
685    /// Region SST access layer.
686    access_layer: AccessLayerRef,
687    /// Maps projected Batches to RecordBatches.
688    pub(crate) mapper: Arc<ProjectionMapper>,
689    /// Time range filter for time index.
690    time_range: Option<TimestampRange>,
691    /// Predicate to push down.
692    pub(crate) predicate: PredicateGroup,
693    /// Region partition expr applied at read time.
694    region_partition_expr: Option<PartitionExpr>,
695    /// Memtable range builders for memtables in the time range..
696    pub(crate) memtables: Vec<MemRangeBuilder>,
697    /// Handles to SST files to scan.
698    pub(crate) files: Vec<FileHandle>,
699    /// Cache.
700    pub(crate) cache_strategy: CacheStrategy,
701    /// Ignores file not found error.
702    ignore_file_not_found: bool,
703    /// Capacity of the channel to send data from parallel scan tasks to the main task.
704    pub(crate) parallel_scan_channel_size: usize,
705    /// Maximum number of SST files to scan concurrently.
706    pub(crate) max_concurrent_scan_files: usize,
707    /// Index appliers.
708    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
709    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
710    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
711    /// Start time of the query.
712    pub(crate) query_start: Option<Instant>,
713    /// The region is using append mode.
714    pub(crate) append_mode: bool,
715    /// Whether to remove deletion markers.
716    pub(crate) filter_deleted: bool,
717    /// Mode to merge duplicate rows.
718    pub(crate) merge_mode: MergeMode,
719    /// Hint to select rows from time series.
720    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
721    /// Hint for the required distribution of the scanner.
722    pub(crate) distribution: Option<TimeSeriesDistribution>,
723    /// Whether to use flat format.
724    pub(crate) flat_format: bool,
725    /// Whether this scan is for compaction.
726    pub(crate) compaction: bool,
727    #[cfg(feature = "enterprise")]
728    extension_ranges: Vec<BoxedExtensionRange>,
729}
730
731impl ScanInput {
732    /// Creates a new [ScanInput].
733    #[must_use]
734    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
735        ScanInput {
736            access_layer,
737            mapper: Arc::new(mapper),
738            time_range: None,
739            predicate: PredicateGroup::default(),
740            region_partition_expr: None,
741            memtables: Vec::new(),
742            files: Vec::new(),
743            cache_strategy: CacheStrategy::Disabled,
744            ignore_file_not_found: false,
745            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
746            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
747            inverted_index_appliers: [None, None],
748            bloom_filter_index_appliers: [None, None],
749            fulltext_index_appliers: [None, None],
750            query_start: None,
751            append_mode: false,
752            filter_deleted: true,
753            merge_mode: MergeMode::default(),
754            series_row_selector: None,
755            distribution: None,
756            flat_format: false,
757            compaction: false,
758            #[cfg(feature = "enterprise")]
759            extension_ranges: Vec::new(),
760        }
761    }
762
763    /// Sets time range filter for time index.
764    #[must_use]
765    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
766        self.time_range = time_range;
767        self
768    }
769
770    /// Sets predicate to push down.
771    #[must_use]
772    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
773        self.region_partition_expr = predicate.region_partition_expr().cloned();
774        self.predicate = predicate;
775        self
776    }
777
778    /// Sets memtable range builders.
779    #[must_use]
780    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
781        self.memtables = memtables;
782        self
783    }
784
785    /// Sets files to read.
786    #[must_use]
787    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
788        self.files = files;
789        self
790    }
791
792    /// Sets cache for this query.
793    #[must_use]
794    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
795        self.cache_strategy = cache;
796        self
797    }
798
799    /// Ignores file not found error.
800    #[must_use]
801    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
802        self.ignore_file_not_found = ignore;
803        self
804    }
805
806    /// Sets scan task channel size.
807    #[must_use]
808    pub(crate) fn with_parallel_scan_channel_size(
809        mut self,
810        parallel_scan_channel_size: usize,
811    ) -> Self {
812        self.parallel_scan_channel_size = parallel_scan_channel_size;
813        self
814    }
815
816    /// Sets maximum number of SST files to scan concurrently.
817    #[must_use]
818    pub(crate) fn with_max_concurrent_scan_files(
819        mut self,
820        max_concurrent_scan_files: usize,
821    ) -> Self {
822        self.max_concurrent_scan_files = max_concurrent_scan_files;
823        self
824    }
825
826    /// Sets inverted index appliers.
827    #[must_use]
828    pub(crate) fn with_inverted_index_appliers(
829        mut self,
830        appliers: [Option<InvertedIndexApplierRef>; 2],
831    ) -> Self {
832        self.inverted_index_appliers = appliers;
833        self
834    }
835
836    /// Sets bloom filter appliers.
837    #[must_use]
838    pub(crate) fn with_bloom_filter_index_appliers(
839        mut self,
840        appliers: [Option<BloomFilterIndexApplierRef>; 2],
841    ) -> Self {
842        self.bloom_filter_index_appliers = appliers;
843        self
844    }
845
846    /// Sets fulltext index appliers.
847    #[must_use]
848    pub(crate) fn with_fulltext_index_appliers(
849        mut self,
850        appliers: [Option<FulltextIndexApplierRef>; 2],
851    ) -> Self {
852        self.fulltext_index_appliers = appliers;
853        self
854    }
855
856    /// Sets start time of the query.
857    #[must_use]
858    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
859        self.query_start = now;
860        self
861    }
862
863    #[must_use]
864    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
865        self.append_mode = is_append_mode;
866        self
867    }
868
869    /// Sets whether to remove deletion markers during scan.
870    #[must_use]
871    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
872        self.filter_deleted = filter_deleted;
873        self
874    }
875
876    /// Sets the merge mode.
877    #[must_use]
878    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
879        self.merge_mode = merge_mode;
880        self
881    }
882
883    /// Sets the distribution hint.
884    #[must_use]
885    pub(crate) fn with_distribution(
886        mut self,
887        distribution: Option<TimeSeriesDistribution>,
888    ) -> Self {
889        self.distribution = distribution;
890        self
891    }
892
893    /// Sets the time series row selector.
894    #[must_use]
895    pub(crate) fn with_series_row_selector(
896        mut self,
897        series_row_selector: Option<TimeSeriesRowSelector>,
898    ) -> Self {
899        self.series_row_selector = series_row_selector;
900        self
901    }
902
903    /// Sets whether to use flat format.
904    #[must_use]
905    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
906        self.flat_format = flat_format;
907        self
908    }
909
910    /// Sets whether this scan is for compaction.
911    #[must_use]
912    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
913        self.compaction = compaction;
914        self
915    }
916
917    /// Scans sources in parallel.
918    ///
919    /// # Panics if the input doesn't allow parallel scan.
920    #[tracing::instrument(
921        skip(self, sources, semaphore),
922        fields(
923            region_id = %self.region_metadata().region_id,
924            source_count = sources.len()
925        )
926    )]
927    pub(crate) fn create_parallel_sources(
928        &self,
929        sources: Vec<Source>,
930        semaphore: Arc<Semaphore>,
931    ) -> Result<Vec<Source>> {
932        if sources.len() <= 1 {
933            return Ok(sources);
934        }
935
936        // Spawn a task for each source.
937        let sources = sources
938            .into_iter()
939            .map(|source| {
940                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
941                self.spawn_scan_task(source, semaphore.clone(), sender);
942                let stream = Box::pin(ReceiverStream::new(receiver));
943                Source::Stream(stream)
944            })
945            .collect();
946        Ok(sources)
947    }
948
949    /// Builds memtable ranges to scan by `index`.
950    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
951        let memtable = &self.memtables[index.index];
952        let mut ranges = SmallVec::new();
953        memtable.build_ranges(index.row_group_index, &mut ranges);
954        ranges
955    }
956
957    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
958        if self.should_skip_region_partition(file) {
959            self.predicate.predicate_without_region().cloned()
960        } else {
961            self.predicate.predicate().cloned()
962        }
963    }
964
965    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
966        match (
967            self.region_partition_expr.as_ref(),
968            file.meta_ref().partition_expr.as_ref(),
969        ) {
970            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
971            _ => false,
972        }
973    }
974
975    /// Prunes a file to scan and returns the builder to build readers.
976    #[tracing::instrument(
977        skip_all,
978        fields(
979            region_id = %self.region_metadata().region_id,
980            file_id = %file.file_id()
981        )
982    )]
983    pub async fn prune_file(
984        &self,
985        file: &FileHandle,
986        reader_metrics: &mut ReaderMetrics,
987    ) -> Result<FileRangeBuilder> {
988        let predicate = self.predicate_for_file(file);
989        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
990        let decode_pk_values = !self.compaction && self.mapper.has_tags();
991        let res = self
992            .access_layer
993            .read_sst(file.clone())
994            .predicate(predicate)
995            .projection(Some(self.mapper.column_ids().to_vec()))
996            .cache(self.cache_strategy.clone())
997            .inverted_index_appliers(self.inverted_index_appliers.clone())
998            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
999            .fulltext_index_appliers(self.fulltext_index_appliers.clone())
1000            .expected_metadata(Some(self.mapper.metadata().clone()))
1001            .flat_format(self.flat_format)
1002            .compaction(self.compaction)
1003            .pre_filter_mode(filter_mode)
1004            .decode_primary_key_values(decode_pk_values)
1005            .build_reader_input(reader_metrics)
1006            .await;
1007        let (mut file_range_ctx, selection) = match res {
1008            Ok(x) => x,
1009            Err(e) => {
1010                if e.is_object_not_found() && self.ignore_file_not_found {
1011                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1012                    return Ok(FileRangeBuilder::default());
1013                } else {
1014                    return Err(e);
1015                }
1016            }
1017        };
1018
1019        let need_compat = !compat::has_same_columns_and_pk_encoding(
1020            self.mapper.metadata(),
1021            file_range_ctx.read_format().metadata(),
1022        );
1023        if need_compat {
1024            // They have different schema. We need to adapt the batch first so the
1025            // mapper can convert it.
1026            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1027                let mapper = self.mapper.as_flat().unwrap();
1028                FlatCompatBatch::try_new(
1029                    mapper,
1030                    flat_format.metadata(),
1031                    flat_format.format_projection(),
1032                    self.compaction,
1033                )?
1034                .map(CompatBatch::Flat)
1035            } else {
1036                let compact_batch = PrimaryKeyCompatBatch::new(
1037                    &self.mapper,
1038                    file_range_ctx.read_format().metadata().clone(),
1039                )?;
1040                Some(CompatBatch::PrimaryKey(compact_batch))
1041            };
1042            file_range_ctx.set_compat_batch(compat);
1043        }
1044        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1045    }
1046
1047    /// Scans the input source in another task and sends batches to the sender.
1048    #[tracing::instrument(
1049        skip(self, input, semaphore, sender),
1050        fields(region_id = %self.region_metadata().region_id)
1051    )]
1052    pub(crate) fn spawn_scan_task(
1053        &self,
1054        mut input: Source,
1055        semaphore: Arc<Semaphore>,
1056        sender: mpsc::Sender<Result<Batch>>,
1057    ) {
1058        let region_id = self.region_metadata().region_id;
1059        let span = tracing::info_span!(
1060            "ScanInput::parallel_scan_task",
1061            region_id = %region_id,
1062            stream_kind = "batch"
1063        );
1064        common_runtime::spawn_global(
1065            async move {
1066                loop {
1067                    // We release the permit before sending result to avoid the task waiting on
1068                    // the channel with the permit held.
1069                    let maybe_batch = {
1070                        // Safety: We never close the semaphore.
1071                        let _permit = semaphore.acquire().await.unwrap();
1072                        input.next_batch().await
1073                    };
1074                    match maybe_batch {
1075                        Ok(Some(batch)) => {
1076                            let _ = sender.send(Ok(batch)).await;
1077                        }
1078                        Ok(None) => break,
1079                        Err(e) => {
1080                            let _ = sender.send(Err(e)).await;
1081                            break;
1082                        }
1083                    }
1084                }
1085            }
1086            .instrument(span),
1087        );
1088    }
1089
1090    /// Scans flat sources (RecordBatch streams) in parallel.
1091    ///
1092    /// # Panics if the input doesn't allow parallel scan.
1093    #[tracing::instrument(
1094        skip(self, sources, semaphore),
1095        fields(
1096            region_id = %self.region_metadata().region_id,
1097            source_count = sources.len()
1098        )
1099    )]
1100    pub(crate) fn create_parallel_flat_sources(
1101        &self,
1102        sources: Vec<BoxedRecordBatchStream>,
1103        semaphore: Arc<Semaphore>,
1104    ) -> Result<Vec<BoxedRecordBatchStream>> {
1105        if sources.len() <= 1 {
1106            return Ok(sources);
1107        }
1108
1109        // Spawn a task for each source.
1110        let sources = sources
1111            .into_iter()
1112            .map(|source| {
1113                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1114                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1115                let stream = Box::pin(ReceiverStream::new(receiver));
1116                Box::pin(stream) as _
1117            })
1118            .collect();
1119        Ok(sources)
1120    }
1121
1122    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1123    #[tracing::instrument(
1124        skip(self, input, semaphore, sender),
1125        fields(region_id = %self.region_metadata().region_id)
1126    )]
1127    pub(crate) fn spawn_flat_scan_task(
1128        &self,
1129        mut input: BoxedRecordBatchStream,
1130        semaphore: Arc<Semaphore>,
1131        sender: mpsc::Sender<Result<RecordBatch>>,
1132    ) {
1133        let region_id = self.region_metadata().region_id;
1134        let span = tracing::info_span!(
1135            "ScanInput::parallel_scan_task",
1136            region_id = %region_id,
1137            stream_kind = "flat"
1138        );
1139        common_runtime::spawn_global(
1140            async move {
1141                loop {
1142                    // We release the permit before sending result to avoid the task waiting on
1143                    // the channel with the permit held.
1144                    let maybe_batch = {
1145                        // Safety: We never close the semaphore.
1146                        let _permit = semaphore.acquire().await.unwrap();
1147                        input.next().await
1148                    };
1149                    match maybe_batch {
1150                        Some(Ok(batch)) => {
1151                            let _ = sender.send(Ok(batch)).await;
1152                        }
1153                        Some(Err(e)) => {
1154                            let _ = sender.send(Err(e)).await;
1155                            break;
1156                        }
1157                        None => break,
1158                    }
1159                }
1160            }
1161            .instrument(span),
1162        );
1163    }
1164
1165    pub(crate) fn total_rows(&self) -> usize {
1166        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1167        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1168
1169        let rows = rows_in_files + rows_in_memtables;
1170        #[cfg(feature = "enterprise")]
1171        let rows = rows
1172            + self
1173                .extension_ranges
1174                .iter()
1175                .map(|x| x.num_rows())
1176                .sum::<u64>() as usize;
1177        rows
1178    }
1179
1180    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1181        &self.predicate
1182    }
1183
1184    /// Returns number of memtables to scan.
1185    pub(crate) fn num_memtables(&self) -> usize {
1186        self.memtables.len()
1187    }
1188
1189    /// Returns number of SST files to scan.
1190    pub(crate) fn num_files(&self) -> usize {
1191        self.files.len()
1192    }
1193
1194    /// Gets the file handle from a row group index.
1195    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1196        let file_index = index.index - self.num_memtables();
1197        &self.files[file_index]
1198    }
1199
1200    pub fn region_metadata(&self) -> &RegionMetadataRef {
1201        self.mapper.metadata()
1202    }
1203}
1204
1205#[cfg(feature = "enterprise")]
1206impl ScanInput {
1207    #[must_use]
1208    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1209        Self {
1210            extension_ranges,
1211            ..self
1212        }
1213    }
1214
1215    #[cfg(feature = "enterprise")]
1216    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1217        &self.extension_ranges
1218    }
1219
1220    /// Get a boxed [ExtensionRange] by the index in all ranges.
1221    #[cfg(feature = "enterprise")]
1222    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1223        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1224    }
1225}
1226
1227#[cfg(test)]
1228impl ScanInput {
1229    /// Returns SST file ids to scan.
1230    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1231        self.files.iter().map(|file| file.file_id()).collect()
1232    }
1233
1234    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1235        self.files.iter().map(|file| file.index_id()).collect()
1236    }
1237}
1238
1239fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1240    if append_mode {
1241        return PreFilterMode::All;
1242    }
1243
1244    match merge_mode {
1245        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1246        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1247    }
1248}
1249
1250/// Context shared by different streams from a scanner.
1251/// It contains the input and ranges to scan.
1252pub struct StreamContext {
1253    /// Input memtables and files.
1254    pub input: ScanInput,
1255    /// Metadata for partition ranges.
1256    pub(crate) ranges: Vec<RangeMeta>,
1257
1258    // Metrics:
1259    /// The start time of the query.
1260    pub(crate) query_start: Instant,
1261}
1262
1263impl StreamContext {
1264    /// Creates a new [StreamContext] for [SeqScan].
1265    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1266        let query_start = input.query_start.unwrap_or_else(Instant::now);
1267        let ranges = RangeMeta::seq_scan_ranges(&input);
1268        READ_SST_COUNT.observe(input.num_files() as f64);
1269
1270        Self {
1271            input,
1272            ranges,
1273            query_start,
1274        }
1275    }
1276
1277    /// Creates a new [StreamContext] for [UnorderedScan].
1278    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1279        let query_start = input.query_start.unwrap_or_else(Instant::now);
1280        let ranges = RangeMeta::unordered_scan_ranges(&input);
1281        READ_SST_COUNT.observe(input.num_files() as f64);
1282
1283        Self {
1284            input,
1285            ranges,
1286            query_start,
1287        }
1288    }
1289
1290    /// Returns true if the index refers to a memtable.
1291    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1292        self.input.num_memtables() > index.index
1293    }
1294
1295    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1296        !self.is_mem_range_index(index)
1297            && index.index < self.input.num_files() + self.input.num_memtables()
1298    }
1299
1300    /// Retrieves the partition ranges.
1301    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1302        self.ranges
1303            .iter()
1304            .enumerate()
1305            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1306            .collect()
1307    }
1308
1309    /// Format the context for explain.
1310    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1311        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1312        for range_meta in &self.ranges {
1313            for idx in &range_meta.row_group_indices {
1314                if self.is_mem_range_index(*idx) {
1315                    num_mem_ranges += 1;
1316                } else if self.is_file_range_index(*idx) {
1317                    num_file_ranges += 1;
1318                } else {
1319                    num_other_ranges += 1;
1320                }
1321            }
1322        }
1323        if verbose {
1324            write!(f, "{{")?;
1325        }
1326        write!(
1327            f,
1328            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1329            self.ranges.len(),
1330            num_mem_ranges,
1331            self.input.num_files(),
1332            num_file_ranges,
1333        )?;
1334        if num_other_ranges > 0 {
1335            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1336        }
1337        write!(f, "}}")?;
1338
1339        if let Some(selector) = &self.input.series_row_selector {
1340            write!(f, ", \"selector\":\"{}\"", selector)?;
1341        }
1342        if let Some(distribution) = &self.input.distribution {
1343            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1344        }
1345
1346        if verbose {
1347            self.format_verbose_content(f)?;
1348        }
1349
1350        Ok(())
1351    }
1352
1353    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1354        struct FileWrapper<'a> {
1355            file: &'a FileHandle,
1356        }
1357
1358        impl fmt::Debug for FileWrapper<'_> {
1359            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1360                let (start, end) = self.file.time_range();
1361                write!(
1362                    f,
1363                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1364                    self.file.file_id(),
1365                    start.value(),
1366                    start.unit(),
1367                    end.value(),
1368                    end.unit(),
1369                    self.file.num_rows(),
1370                    self.file.size(),
1371                    self.file.index_size()
1372                )
1373            }
1374        }
1375
1376        struct InputWrapper<'a> {
1377            input: &'a ScanInput,
1378        }
1379
1380        #[cfg(feature = "enterprise")]
1381        impl InputWrapper<'_> {
1382            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1383                if self.input.extension_ranges.is_empty() {
1384                    return Ok(());
1385                }
1386
1387                let mut delimiter = "";
1388                write!(f, ", extension_ranges: [")?;
1389                for range in self.input.extension_ranges() {
1390                    write!(f, "{}{:?}", delimiter, range)?;
1391                    delimiter = ", ";
1392                }
1393                write!(f, "]")?;
1394                Ok(())
1395            }
1396        }
1397
1398        impl fmt::Debug for InputWrapper<'_> {
1399            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1400                let output_schema = self.input.mapper.output_schema();
1401                if !output_schema.is_empty() {
1402                    let names: Vec<_> = output_schema
1403                        .column_schemas()
1404                        .iter()
1405                        .map(|col| &col.name)
1406                        .collect();
1407                    write!(f, ", \"projection\": {:?}", names)?;
1408                }
1409                if let Some(predicate) = &self.input.predicate.predicate()
1410                    && !predicate.exprs().is_empty()
1411                {
1412                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1413                    write!(f, ", \"filters\": {:?}", exprs)?;
1414                }
1415                if !self.input.files.is_empty() {
1416                    write!(f, ", \"files\": ")?;
1417                    f.debug_list()
1418                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1419                        .finish()?;
1420                }
1421
1422                #[cfg(feature = "enterprise")]
1423                self.format_extension_ranges(f)?;
1424
1425                Ok(())
1426            }
1427        }
1428
1429        write!(f, "{:?}", InputWrapper { input: &self.input })
1430    }
1431}
1432
1433/// Predicates to evaluate.
1434/// It only keeps filters that [SimpleFilterEvaluator] supports.
1435#[derive(Clone, Default)]
1436pub struct PredicateGroup {
1437    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1438    /// Predicate that includes request filters and region partition expr (if any).
1439    predicate_all: Option<Predicate>,
1440    /// Predicate that only includes request filters.
1441    predicate_without_region: Option<Predicate>,
1442    /// Region partition expression restored from metadata.
1443    region_partition_expr: Option<PartitionExpr>,
1444}
1445
1446impl PredicateGroup {
1447    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1448    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1449        let mut combined_exprs = exprs.to_vec();
1450        let mut region_partition_expr = None;
1451
1452        if let Some(expr_json) = metadata.partition_expr.as_ref()
1453            && !expr_json.is_empty()
1454            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1455                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1456        {
1457            let logical_expr = expr
1458                .try_as_logical_expr()
1459                .context(InvalidPartitionExprSnafu {
1460                    expr: expr_json.clone(),
1461                })?;
1462
1463            combined_exprs.push(logical_expr);
1464            region_partition_expr = Some(expr);
1465        }
1466
1467        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1468        // Columns in the expr.
1469        let mut columns = HashSet::new();
1470        for expr in &combined_exprs {
1471            columns.clear();
1472            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1473                continue;
1474            };
1475            time_filters.push(filter);
1476        }
1477        let time_filters = if time_filters.is_empty() {
1478            None
1479        } else {
1480            Some(Arc::new(time_filters))
1481        };
1482
1483        let predicate_all = if combined_exprs.is_empty() {
1484            None
1485        } else {
1486            Some(Predicate::new(combined_exprs))
1487        };
1488        let predicate_without_region = if exprs.is_empty() {
1489            None
1490        } else {
1491            Some(Predicate::new(exprs.to_vec()))
1492        };
1493
1494        Ok(Self {
1495            time_filters,
1496            predicate_all,
1497            predicate_without_region,
1498            region_partition_expr,
1499        })
1500    }
1501
1502    /// Returns time filters.
1503    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1504        self.time_filters.clone()
1505    }
1506
1507    /// Returns predicate of all exprs (including region partition expr if present).
1508    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1509        self.predicate_all.as_ref()
1510    }
1511
1512    /// Returns predicate that excludes region partition expr.
1513    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1514        self.predicate_without_region.as_ref()
1515    }
1516
1517    /// Returns the region partition expr from metadata, if any.
1518    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1519        self.region_partition_expr.as_ref()
1520    }
1521
1522    fn expr_to_filter(
1523        expr: &Expr,
1524        metadata: &RegionMetadata,
1525        columns: &mut HashSet<Column>,
1526    ) -> Option<SimpleFilterEvaluator> {
1527        columns.clear();
1528        // `expr_to_columns` won't return error.
1529        // We still ignore these expressions for safety.
1530        expr_to_columns(expr, columns).ok()?;
1531        if columns.len() > 1 {
1532            // Simple filter doesn't support multiple columns.
1533            return None;
1534        }
1535        let column = columns.iter().next()?;
1536        let column_meta = metadata.column_by_name(&column.name)?;
1537        if column_meta.semantic_type == SemanticType::Timestamp {
1538            SimpleFilterEvaluator::try_new(expr)
1539        } else {
1540            None
1541        }
1542    }
1543}