mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::v1::SemanticType;
23use common_error::ext::BoxedError;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_recordbatch::SendableRecordBatchStream;
26use common_telemetry::{debug, error, tracing, warn};
27use common_time::range::TimestampRange;
28use datafusion_common::Column;
29use datafusion_expr::utils::expr_to_columns;
30use datafusion_expr::Expr;
31use smallvec::SmallVec;
32use store_api::metadata::RegionMetadata;
33use store_api::region_engine::{PartitionRange, RegionScannerRef};
34use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
35use table::predicate::{build_time_range_predicate, Predicate};
36use tokio::sync::{mpsc, Semaphore};
37use tokio_stream::wrappers::ReceiverStream;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheStrategy;
41use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
42use crate::error::Result;
43use crate::memtable::MemtableRange;
44use crate::metrics::READ_SST_COUNT;
45use crate::read::compat::{self, CompatBatch};
46use crate::read::projection::ProjectionMapper;
47use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
48use crate::read::seq_scan::SeqScan;
49use crate::read::unordered_scan::UnorderedScan;
50use crate::read::{Batch, Source};
51use crate::region::options::MergeMode;
52use crate::region::version::VersionRef;
53use crate::sst::file::FileHandle;
54use crate::sst::index::bloom_filter::applier::{
55    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
56};
57use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
58use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
59use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
60use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
61use crate::sst::parquet::reader::ReaderMetrics;
62
63/// A scanner scans a region and returns a [SendableRecordBatchStream].
64pub(crate) enum Scanner {
65    /// Sequential scan.
66    Seq(SeqScan),
67    /// Unordered scan.
68    Unordered(UnorderedScan),
69}
70
71impl Scanner {
72    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
73    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
74    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
75        match self {
76            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
77            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
78        }
79    }
80}
81
82#[cfg(test)]
83impl Scanner {
84    /// Returns number of files to scan.
85    pub(crate) fn num_files(&self) -> usize {
86        match self {
87            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
88            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
89        }
90    }
91
92    /// Returns number of memtables to scan.
93    pub(crate) fn num_memtables(&self) -> usize {
94        match self {
95            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
96            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
97        }
98    }
99
100    /// Returns SST file ids to scan.
101    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
102        match self {
103            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
104            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
105        }
106    }
107
108    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
109    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
110        use store_api::region_engine::{PrepareRequest, RegionScanner};
111
112        let request = PrepareRequest::default().with_target_partitions(target_partitions);
113        match self {
114            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
115            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
116        }
117    }
118}
119
120#[cfg_attr(doc, aquamarine::aquamarine)]
121/// Helper to scans a region by [ScanRequest].
122///
123/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
124/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
125///
126/// ```mermaid
127/// classDiagram
128/// class ScanRegion {
129///     -VersionRef version
130///     -ScanRequest request
131///     ~scanner() Scanner
132///     ~seq_scan() SeqScan
133/// }
134/// class Scanner {
135///     <<enumeration>>
136///     SeqScan
137///     UnorderedScan
138///     +scan() SendableRecordBatchStream
139/// }
140/// class SeqScan {
141///     -ScanInput input
142///     +build() SendableRecordBatchStream
143/// }
144/// class UnorderedScan {
145///     -ScanInput input
146///     +build() SendableRecordBatchStream
147/// }
148/// class ScanInput {
149///     -ProjectionMapper mapper
150///     -Option~TimeRange~ time_range
151///     -Option~Predicate~ predicate
152///     -Vec~MemtableRef~ memtables
153///     -Vec~FileHandle~ files
154/// }
155/// class ProjectionMapper {
156///     ~output_schema() SchemaRef
157///     ~convert(Batch) RecordBatch
158/// }
159/// ScanRegion -- Scanner
160/// ScanRegion o-- ScanRequest
161/// Scanner o-- SeqScan
162/// Scanner o-- UnorderedScan
163/// SeqScan o-- ScanInput
164/// UnorderedScan o-- ScanInput
165/// Scanner -- SendableRecordBatchStream
166/// ScanInput o-- ProjectionMapper
167/// SeqScan -- SendableRecordBatchStream
168/// UnorderedScan -- SendableRecordBatchStream
169/// ```
170pub(crate) struct ScanRegion {
171    /// Version of the region at scan.
172    version: VersionRef,
173    /// Access layer of the region.
174    access_layer: AccessLayerRef,
175    /// Scan request.
176    request: ScanRequest,
177    /// Cache.
178    cache_strategy: CacheStrategy,
179    /// Capacity of the channel to send data from parallel scan tasks to the main task.
180    parallel_scan_channel_size: usize,
181    /// Whether to ignore inverted index.
182    ignore_inverted_index: bool,
183    /// Whether to ignore fulltext index.
184    ignore_fulltext_index: bool,
185    /// Whether to ignore bloom filter.
186    ignore_bloom_filter: bool,
187    /// Start time of the scan task.
188    start_time: Option<Instant>,
189}
190
191impl ScanRegion {
192    /// Creates a [ScanRegion].
193    pub(crate) fn new(
194        version: VersionRef,
195        access_layer: AccessLayerRef,
196        request: ScanRequest,
197        cache_strategy: CacheStrategy,
198    ) -> ScanRegion {
199        ScanRegion {
200            version,
201            access_layer,
202            request,
203            cache_strategy,
204            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
205            ignore_inverted_index: false,
206            ignore_fulltext_index: false,
207            ignore_bloom_filter: false,
208            start_time: None,
209        }
210    }
211
212    /// Sets parallel scan task channel size.
213    #[must_use]
214    pub(crate) fn with_parallel_scan_channel_size(
215        mut self,
216        parallel_scan_channel_size: usize,
217    ) -> Self {
218        self.parallel_scan_channel_size = parallel_scan_channel_size;
219        self
220    }
221
222    /// Sets whether to ignore inverted index.
223    #[must_use]
224    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
225        self.ignore_inverted_index = ignore;
226        self
227    }
228
229    /// Sets whether to ignore fulltext index.
230    #[must_use]
231    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
232        self.ignore_fulltext_index = ignore;
233        self
234    }
235
236    /// Sets whether to ignore bloom filter.
237    #[must_use]
238    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
239        self.ignore_bloom_filter = ignore;
240        self
241    }
242
243    #[must_use]
244    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
245        self.start_time = Some(now);
246        self
247    }
248
249    /// Returns a [Scanner] to scan the region.
250    pub(crate) fn scanner(self) -> Result<Scanner> {
251        if self.use_unordered_scan() {
252            // If table is append only and there is no series row selector, we use unordered scan in query.
253            // We still use seq scan in compaction.
254            self.unordered_scan().map(Scanner::Unordered)
255        } else {
256            self.seq_scan().map(Scanner::Seq)
257        }
258    }
259
260    /// Returns a [RegionScanner] to scan the region.
261    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
262    pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
263        if self.use_unordered_scan() {
264            self.unordered_scan().map(|scanner| Box::new(scanner) as _)
265        } else {
266            self.seq_scan().map(|scanner| Box::new(scanner) as _)
267        }
268    }
269
270    /// Scan sequentially.
271    pub(crate) fn seq_scan(self) -> Result<SeqScan> {
272        let input = self.scan_input(true)?;
273        Ok(SeqScan::new(input, false))
274    }
275
276    /// Unordered scan.
277    pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
278        let input = self.scan_input(true)?;
279        Ok(UnorderedScan::new(input))
280    }
281
282    #[cfg(test)]
283    pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
284        let input = self.scan_input(false)?;
285        Ok(SeqScan::new(input, false))
286    }
287
288    /// Returns true if the region can use unordered scan for current request.
289    fn use_unordered_scan(&self) -> bool {
290        // We use unordered scan when:
291        // 1. The region is in append mode.
292        // 2. There is no series row selector.
293        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
294        //
295        // We still use seq scan in compaction.
296        self.version.options.append_mode
297            && self.request.series_row_selector.is_none()
298            && (self.request.distribution.is_none()
299                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
300    }
301
302    /// Creates a scan input.
303    fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
304        let time_range = self.build_time_range_predicate();
305
306        let ssts = &self.version.ssts;
307        let mut files = Vec::new();
308        for level in ssts.levels() {
309            for file in level.files.values() {
310                // Finds SST files in range.
311                if file_in_range(file, &time_range) {
312                    files.push(file.clone());
313                }
314                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
315                // unless the timing is too good, or the sequence number wouldn't be in file.
316                // and the batch will be filtered out by tree reader anyway.
317            }
318        }
319
320        let memtables = self.version.memtables.list_memtables();
321        // Skip empty memtables and memtables out of time range.
322        let memtables: Vec<_> = memtables
323            .into_iter()
324            .filter(|mem| {
325                // check if memtable is empty by reading stats.
326                let Some((start, end)) = mem.stats().time_range() else {
327                    return false;
328                };
329                // The time range of the memtable is inclusive.
330                let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
331                memtable_range.intersects(&time_range)
332            })
333            .collect();
334
335        debug!(
336            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
337            self.version.metadata.region_id,
338            self.request,
339            time_range,
340            memtables.len(),
341            files.len(),
342            self.version.options.append_mode,
343        );
344
345        // Remove field filters for LastNonNull mode after logging the request.
346        self.maybe_remove_field_filters();
347
348        let inverted_index_applier = self.build_invereted_index_applier();
349        let bloom_filter_applier = self.build_bloom_filter_applier();
350        let fulltext_index_applier = self.build_fulltext_index_applier();
351        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
352        // The mapper always computes projected column ids as the schema of SSTs may change.
353        let mapper = match &self.request.projection {
354            Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
355            None => ProjectionMapper::all(&self.version.metadata)?,
356        };
357        // Get memtable ranges to scan.
358        let memtables = memtables
359            .into_iter()
360            .map(|mem| {
361                let ranges = mem.ranges(
362                    Some(mapper.column_ids()),
363                    predicate.clone(),
364                    self.request.sequence,
365                );
366                MemRangeBuilder::new(ranges)
367            })
368            .collect();
369
370        let input = ScanInput::new(self.access_layer, mapper)
371            .with_time_range(Some(time_range))
372            .with_predicate(predicate)
373            .with_memtables(memtables)
374            .with_files(files)
375            .with_cache(self.cache_strategy)
376            .with_inverted_index_applier(inverted_index_applier)
377            .with_bloom_filter_index_applier(bloom_filter_applier)
378            .with_fulltext_index_applier(fulltext_index_applier)
379            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
380            .with_start_time(self.start_time)
381            .with_append_mode(self.version.options.append_mode)
382            .with_filter_deleted(filter_deleted)
383            .with_merge_mode(self.version.options.merge_mode())
384            .with_series_row_selector(self.request.series_row_selector)
385            .with_distribution(self.request.distribution);
386        Ok(input)
387    }
388
389    /// Build time range predicate from filters.
390    fn build_time_range_predicate(&self) -> TimestampRange {
391        let time_index = self.version.metadata.time_index_column();
392        let unit = time_index
393            .column_schema
394            .data_type
395            .as_timestamp()
396            .expect("Time index must have timestamp-compatible type")
397            .unit();
398        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
399    }
400
401    /// Remove field filters if the merge mode is [MergeMode::LastNonNull].
402    fn maybe_remove_field_filters(&mut self) {
403        if self.version.options.merge_mode() != MergeMode::LastNonNull {
404            return;
405        }
406
407        // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
408        let field_columns = self
409            .version
410            .metadata
411            .field_columns()
412            .map(|col| &col.column_schema.name)
413            .collect::<HashSet<_>>();
414        // Columns in the expr.
415        let mut columns = HashSet::new();
416
417        self.request.filters.retain(|expr| {
418            columns.clear();
419            // `expr_to_columns` won't return error.
420            if expr_to_columns(expr, &mut columns).is_err() {
421                return false;
422            }
423            for column in &columns {
424                if field_columns.contains(&column.name) {
425                    // This expr uses the field column.
426                    return false;
427                }
428            }
429            true
430        });
431    }
432
433    /// Use the latest schema to build the inverted index applier.
434    fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
435        if self.ignore_inverted_index {
436            return None;
437        }
438
439        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
440        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
441
442        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
443
444        InvertedIndexApplierBuilder::new(
445            self.access_layer.region_dir().to_string(),
446            self.access_layer.object_store().clone(),
447            self.version.metadata.as_ref(),
448            self.version.metadata.inverted_indexed_column_ids(
449                self.version
450                    .options
451                    .index_options
452                    .inverted_index
453                    .ignore_column_ids
454                    .iter(),
455            ),
456            self.access_layer.puffin_manager_factory().clone(),
457        )
458        .with_file_cache(file_cache)
459        .with_inverted_index_cache(inverted_index_cache)
460        .with_puffin_metadata_cache(puffin_metadata_cache)
461        .build(&self.request.filters)
462        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
463        .ok()
464        .flatten()
465        .map(Arc::new)
466    }
467
468    /// Use the latest schema to build the bloom filter index applier.
469    fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
470        if self.ignore_bloom_filter {
471            return None;
472        }
473
474        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
475        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
476        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
477
478        BloomFilterIndexApplierBuilder::new(
479            self.access_layer.region_dir().to_string(),
480            self.access_layer.object_store().clone(),
481            self.version.metadata.as_ref(),
482            self.access_layer.puffin_manager_factory().clone(),
483        )
484        .with_file_cache(file_cache)
485        .with_bloom_filter_index_cache(bloom_filter_index_cache)
486        .with_puffin_metadata_cache(puffin_metadata_cache)
487        .build(&self.request.filters)
488        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
489        .ok()
490        .flatten()
491        .map(Arc::new)
492    }
493
494    /// Use the latest schema to build the fulltext index applier.
495    fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
496        if self.ignore_fulltext_index {
497            return None;
498        }
499
500        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
501        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
502        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
503        FulltextIndexApplierBuilder::new(
504            self.access_layer.region_dir().to_string(),
505            self.version.metadata.region_id,
506            self.access_layer.object_store().clone(),
507            self.access_layer.puffin_manager_factory().clone(),
508            self.version.metadata.as_ref(),
509        )
510        .with_file_cache(file_cache)
511        .with_puffin_metadata_cache(puffin_metadata_cache)
512        .with_bloom_filter_cache(bloom_filter_index_cache)
513        .build(&self.request.filters)
514        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
515        .ok()
516        .flatten()
517        .map(Arc::new)
518    }
519}
520
521/// Returns true if the time range of a SST `file` matches the `predicate`.
522fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
523    if predicate == &TimestampRange::min_to_max() {
524        return true;
525    }
526    // end timestamp of a SST is inclusive.
527    let (start, end) = file.time_range();
528    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
529    file_ts_range.intersects(predicate)
530}
531
532/// Common input for different scanners.
533pub(crate) struct ScanInput {
534    /// Region SST access layer.
535    access_layer: AccessLayerRef,
536    /// Maps projected Batches to RecordBatches.
537    pub(crate) mapper: Arc<ProjectionMapper>,
538    /// Time range filter for time index.
539    time_range: Option<TimestampRange>,
540    /// Predicate to push down.
541    pub(crate) predicate: PredicateGroup,
542    /// Memtable range builders for memtables in the time range..
543    pub(crate) memtables: Vec<MemRangeBuilder>,
544    /// Handles to SST files to scan.
545    pub(crate) files: Vec<FileHandle>,
546    /// Cache.
547    pub(crate) cache_strategy: CacheStrategy,
548    /// Ignores file not found error.
549    ignore_file_not_found: bool,
550    /// Capacity of the channel to send data from parallel scan tasks to the main task.
551    pub(crate) parallel_scan_channel_size: usize,
552    /// Index appliers.
553    inverted_index_applier: Option<InvertedIndexApplierRef>,
554    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
555    fulltext_index_applier: Option<FulltextIndexApplierRef>,
556    /// Start time of the query.
557    pub(crate) query_start: Option<Instant>,
558    /// The region is using append mode.
559    pub(crate) append_mode: bool,
560    /// Whether to remove deletion markers.
561    pub(crate) filter_deleted: bool,
562    /// Mode to merge duplicate rows.
563    pub(crate) merge_mode: MergeMode,
564    /// Hint to select rows from time series.
565    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
566    /// Hint for the required distribution of the scanner.
567    pub(crate) distribution: Option<TimeSeriesDistribution>,
568}
569
570impl ScanInput {
571    /// Creates a new [ScanInput].
572    #[must_use]
573    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
574        ScanInput {
575            access_layer,
576            mapper: Arc::new(mapper),
577            time_range: None,
578            predicate: PredicateGroup::default(),
579            memtables: Vec::new(),
580            files: Vec::new(),
581            cache_strategy: CacheStrategy::Disabled,
582            ignore_file_not_found: false,
583            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
584            inverted_index_applier: None,
585            bloom_filter_index_applier: None,
586            fulltext_index_applier: None,
587            query_start: None,
588            append_mode: false,
589            filter_deleted: true,
590            merge_mode: MergeMode::default(),
591            series_row_selector: None,
592            distribution: None,
593        }
594    }
595
596    /// Sets time range filter for time index.
597    #[must_use]
598    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
599        self.time_range = time_range;
600        self
601    }
602
603    /// Sets predicate to push down.
604    #[must_use]
605    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
606        self.predicate = predicate;
607        self
608    }
609
610    /// Sets memtable range builders.
611    #[must_use]
612    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
613        self.memtables = memtables;
614        self
615    }
616
617    /// Sets files to read.
618    #[must_use]
619    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
620        self.files = files;
621        self
622    }
623
624    /// Sets cache for this query.
625    #[must_use]
626    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
627        self.cache_strategy = cache;
628        self
629    }
630
631    /// Ignores file not found error.
632    #[must_use]
633    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
634        self.ignore_file_not_found = ignore;
635        self
636    }
637
638    /// Sets scan task channel size.
639    #[must_use]
640    pub(crate) fn with_parallel_scan_channel_size(
641        mut self,
642        parallel_scan_channel_size: usize,
643    ) -> Self {
644        self.parallel_scan_channel_size = parallel_scan_channel_size;
645        self
646    }
647
648    /// Sets invereted index applier.
649    #[must_use]
650    pub(crate) fn with_inverted_index_applier(
651        mut self,
652        applier: Option<InvertedIndexApplierRef>,
653    ) -> Self {
654        self.inverted_index_applier = applier;
655        self
656    }
657
658    /// Sets bloom filter applier.
659    #[must_use]
660    pub(crate) fn with_bloom_filter_index_applier(
661        mut self,
662        applier: Option<BloomFilterIndexApplierRef>,
663    ) -> Self {
664        self.bloom_filter_index_applier = applier;
665        self
666    }
667
668    /// Sets fulltext index applier.
669    #[must_use]
670    pub(crate) fn with_fulltext_index_applier(
671        mut self,
672        applier: Option<FulltextIndexApplierRef>,
673    ) -> Self {
674        self.fulltext_index_applier = applier;
675        self
676    }
677
678    /// Sets start time of the query.
679    #[must_use]
680    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
681        self.query_start = now;
682        self
683    }
684
685    #[must_use]
686    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
687        self.append_mode = is_append_mode;
688        self
689    }
690
691    /// Sets whether to remove deletion markers during scan.
692    #[must_use]
693    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
694        self.filter_deleted = filter_deleted;
695        self
696    }
697
698    /// Sets the merge mode.
699    #[must_use]
700    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
701        self.merge_mode = merge_mode;
702        self
703    }
704
705    /// Sets the distribution hint.
706    #[must_use]
707    pub(crate) fn with_distribution(
708        mut self,
709        distribution: Option<TimeSeriesDistribution>,
710    ) -> Self {
711        self.distribution = distribution;
712        self
713    }
714
715    /// Sets the time series row selector.
716    #[must_use]
717    pub(crate) fn with_series_row_selector(
718        mut self,
719        series_row_selector: Option<TimeSeriesRowSelector>,
720    ) -> Self {
721        self.series_row_selector = series_row_selector;
722        self
723    }
724
725    /// Scans sources in parallel.
726    ///
727    /// # Panics if the input doesn't allow parallel scan.
728    pub(crate) fn create_parallel_sources(
729        &self,
730        sources: Vec<Source>,
731        semaphore: Arc<Semaphore>,
732    ) -> Result<Vec<Source>> {
733        if sources.len() <= 1 {
734            return Ok(sources);
735        }
736
737        // Spawn a task for each source.
738        let sources = sources
739            .into_iter()
740            .map(|source| {
741                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
742                self.spawn_scan_task(source, semaphore.clone(), sender);
743                let stream = Box::pin(ReceiverStream::new(receiver));
744                Source::Stream(stream)
745            })
746            .collect();
747        Ok(sources)
748    }
749
750    /// Builds memtable ranges to scan by `index`.
751    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
752        let memtable = &self.memtables[index.index];
753        let mut ranges = SmallVec::new();
754        memtable.build_ranges(index.row_group_index, &mut ranges);
755        ranges
756    }
757
758    /// Prunes a file to scan and returns the builder to build readers.
759    pub(crate) async fn prune_file(
760        &self,
761        file_index: usize,
762        reader_metrics: &mut ReaderMetrics,
763    ) -> Result<FileRangeBuilder> {
764        let file = &self.files[file_index];
765        let res = self
766            .access_layer
767            .read_sst(file.clone())
768            .predicate(self.predicate.predicate().cloned())
769            .projection(Some(self.mapper.column_ids().to_vec()))
770            .cache(self.cache_strategy.clone())
771            .inverted_index_applier(self.inverted_index_applier.clone())
772            .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
773            .fulltext_index_applier(self.fulltext_index_applier.clone())
774            .expected_metadata(Some(self.mapper.metadata().clone()))
775            .build_reader_input(reader_metrics)
776            .await;
777        let (mut file_range_ctx, row_groups) = match res {
778            Ok(x) => x,
779            Err(e) => {
780                if e.is_object_not_found() && self.ignore_file_not_found {
781                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
782                    return Ok(FileRangeBuilder::default());
783                } else {
784                    return Err(e);
785                }
786            }
787        };
788        if !compat::has_same_columns_and_pk_encoding(
789            self.mapper.metadata(),
790            file_range_ctx.read_format().metadata(),
791        ) {
792            // They have different schema. We need to adapt the batch first so the
793            // mapper can convert it.
794            let compat = CompatBatch::new(
795                &self.mapper,
796                file_range_ctx.read_format().metadata().clone(),
797            )?;
798            file_range_ctx.set_compat_batch(Some(compat));
799        }
800        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
801    }
802
803    /// Scans the input source in another task and sends batches to the sender.
804    pub(crate) fn spawn_scan_task(
805        &self,
806        mut input: Source,
807        semaphore: Arc<Semaphore>,
808        sender: mpsc::Sender<Result<Batch>>,
809    ) {
810        common_runtime::spawn_global(async move {
811            loop {
812                // We release the permit before sending result to avoid the task waiting on
813                // the channel with the permit held.
814                let maybe_batch = {
815                    // Safety: We never close the semaphore.
816                    let _permit = semaphore.acquire().await.unwrap();
817                    input.next_batch().await
818                };
819                match maybe_batch {
820                    Ok(Some(batch)) => {
821                        let _ = sender.send(Ok(batch)).await;
822                    }
823                    Ok(None) => break,
824                    Err(e) => {
825                        let _ = sender.send(Err(e)).await;
826                        break;
827                    }
828                }
829            }
830        });
831    }
832
833    pub(crate) fn total_rows(&self) -> usize {
834        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
835        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
836        rows_in_files + rows_in_memtables
837    }
838
839    /// Returns table predicate of all exprs.
840    pub(crate) fn predicate(&self) -> Option<&Predicate> {
841        self.predicate.predicate()
842    }
843
844    /// Returns number of memtables to scan.
845    pub(crate) fn num_memtables(&self) -> usize {
846        self.memtables.len()
847    }
848
849    /// Returns number of SST files to scan.
850    pub(crate) fn num_files(&self) -> usize {
851        self.files.len()
852    }
853}
854
855#[cfg(test)]
856impl ScanInput {
857    /// Returns SST file ids to scan.
858    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
859        self.files.iter().map(|file| file.file_id()).collect()
860    }
861}
862
863/// Context shared by different streams from a scanner.
864/// It contains the input and ranges to scan.
865pub(crate) struct StreamContext {
866    /// Input memtables and files.
867    pub(crate) input: ScanInput,
868    /// Metadata for partition ranges.
869    pub(crate) ranges: Vec<RangeMeta>,
870
871    // Metrics:
872    /// The start time of the query.
873    pub(crate) query_start: Instant,
874}
875
876impl StreamContext {
877    /// Creates a new [StreamContext] for [SeqScan].
878    pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
879        let query_start = input.query_start.unwrap_or_else(Instant::now);
880        let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
881        READ_SST_COUNT.observe(input.num_files() as f64);
882
883        Self {
884            input,
885            ranges,
886            query_start,
887        }
888    }
889
890    /// Creates a new [StreamContext] for [UnorderedScan].
891    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
892        let query_start = input.query_start.unwrap_or_else(Instant::now);
893        let ranges = RangeMeta::unordered_scan_ranges(&input);
894        READ_SST_COUNT.observe(input.num_files() as f64);
895
896        Self {
897            input,
898            ranges,
899            query_start,
900        }
901    }
902
903    /// Returns true if the index refers to a memtable.
904    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
905        self.input.num_memtables() > index.index
906    }
907
908    /// Retrieves the partition ranges.
909    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
910        self.ranges
911            .iter()
912            .enumerate()
913            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
914            .collect()
915    }
916
917    /// Format the context for explain.
918    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
919        let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
920        for range_meta in &self.ranges {
921            for idx in &range_meta.row_group_indices {
922                if self.is_mem_range_index(*idx) {
923                    num_mem_ranges += 1;
924                } else {
925                    num_file_ranges += 1;
926                }
927            }
928        }
929        write!(
930            f,
931            "partition_count={} ({} memtable ranges, {} file {} ranges)",
932            self.ranges.len(),
933            num_mem_ranges,
934            self.input.num_files(),
935            num_file_ranges,
936        )?;
937        if let Some(selector) = &self.input.series_row_selector {
938            write!(f, ", selector={}", selector)?;
939        }
940        if let Some(distribution) = &self.input.distribution {
941            write!(f, ", distribution={}", distribution)?;
942        }
943
944        if verbose {
945            self.format_verbose_content(f)?;
946        }
947
948        Ok(())
949    }
950
951    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
952        struct FileWrapper<'a> {
953            file: &'a FileHandle,
954        }
955
956        impl fmt::Debug for FileWrapper<'_> {
957            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
958                write!(
959                    f,
960                    "[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
961                    self.file.file_id(),
962                    self.file.time_range().0.value(),
963                    self.file.time_range().0.unit(),
964                    self.file.time_range().1.value(),
965                    self.file.time_range().1.unit(),
966                    self.file.num_rows(),
967                    self.file.size(),
968                    self.file.index_size()
969                )
970            }
971        }
972
973        struct InputWrapper<'a> {
974            input: &'a ScanInput,
975        }
976
977        impl fmt::Debug for InputWrapper<'_> {
978            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
979                let output_schema = self.input.mapper.output_schema();
980                if !output_schema.is_empty() {
981                    write!(f, ", projection=")?;
982                    f.debug_list()
983                        .entries(output_schema.column_schemas().iter().map(|col| &col.name))
984                        .finish()?;
985                }
986                if let Some(predicate) = &self.input.predicate.predicate() {
987                    if !predicate.exprs().is_empty() {
988                        write!(f, ", filters=[")?;
989                        for (i, expr) in predicate.exprs().iter().enumerate() {
990                            if i == predicate.exprs().len() - 1 {
991                                write!(f, "{}]", expr)?;
992                            } else {
993                                write!(f, "{}, ", expr)?;
994                            }
995                        }
996                    }
997                }
998                if !self.input.files.is_empty() {
999                    write!(f, ", files=")?;
1000                    f.debug_list()
1001                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1002                        .finish()?;
1003                }
1004
1005                Ok(())
1006            }
1007        }
1008
1009        write!(f, "{:?}", InputWrapper { input: &self.input })
1010    }
1011}
1012
1013/// Predicates to evaluate.
1014/// It only keeps filters that [SimpleFilterEvaluator] supports.
1015#[derive(Clone, Default)]
1016pub struct PredicateGroup {
1017    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1018
1019    /// Table predicate for all logical exprs to evaluate.
1020    /// Parquet reader uses it to prune row groups.
1021    predicate: Option<Predicate>,
1022}
1023
1024impl PredicateGroup {
1025    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1026    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1027        let mut time_filters = Vec::with_capacity(exprs.len());
1028        // Columns in the expr.
1029        let mut columns = HashSet::new();
1030        for expr in exprs {
1031            columns.clear();
1032            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1033                continue;
1034            };
1035            time_filters.push(filter);
1036        }
1037        let time_filters = if time_filters.is_empty() {
1038            None
1039        } else {
1040            Some(Arc::new(time_filters))
1041        };
1042        let predicate = Predicate::new(exprs.to_vec());
1043
1044        Self {
1045            time_filters,
1046            predicate: Some(predicate),
1047        }
1048    }
1049
1050    /// Returns time filters.
1051    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1052        self.time_filters.clone()
1053    }
1054
1055    /// Returns predicate of all exprs.
1056    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1057        self.predicate.as_ref()
1058    }
1059
1060    fn expr_to_filter(
1061        expr: &Expr,
1062        metadata: &RegionMetadata,
1063        columns: &mut HashSet<Column>,
1064    ) -> Option<SimpleFilterEvaluator> {
1065        columns.clear();
1066        // `expr_to_columns` won't return error.
1067        // We still ignore these expressions for safety.
1068        expr_to_columns(expr, columns).ok()?;
1069        if columns.len() > 1 {
1070            // Simple filter doesn't support multiple columns.
1071            return None;
1072        }
1073        let column = columns.iter().next()?;
1074        let column_meta = metadata.column_by_name(&column.name)?;
1075        if column_meta.semantic_type == SemanticType::Timestamp {
1076            SimpleFilterEvaluator::try_new(expr)
1077        } else {
1078            None
1079        }
1080    }
1081}