mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use smallvec::SmallVec;
33use store_api::metadata::RegionMetadata;
34use store_api::region_engine::{PartitionRange, RegionScannerRef};
35use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
36use table::predicate::{build_time_range_predicate, Predicate};
37use tokio::sync::{mpsc, Semaphore};
38use tokio_stream::wrappers::ReceiverStream;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheStrategy;
42use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
43use crate::error::Result;
44use crate::memtable::MemtableRange;
45use crate::metrics::READ_SST_COUNT;
46use crate::read::compat::{self, CompatBatch};
47use crate::read::projection::ProjectionMapper;
48use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
49use crate::read::seq_scan::SeqScan;
50use crate::read::series_scan::SeriesScan;
51use crate::read::unordered_scan::UnorderedScan;
52use crate::read::{Batch, Source};
53use crate::region::options::MergeMode;
54use crate::region::version::VersionRef;
55use crate::sst::file::FileHandle;
56use crate::sst::index::bloom_filter::applier::{
57    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
58};
59use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
60use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
61use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
62use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
63use crate::sst::parquet::reader::ReaderMetrics;
64
65/// A scanner scans a region and returns a [SendableRecordBatchStream].
66pub(crate) enum Scanner {
67    /// Sequential scan.
68    Seq(SeqScan),
69    /// Unordered scan.
70    Unordered(UnorderedScan),
71    /// Per-series scan.
72    Series(SeriesScan),
73}
74
75impl Scanner {
76    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
77    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
78    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
79        match self {
80            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
81            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
82            Scanner::Series(series_scan) => series_scan.build_stream().await,
83        }
84    }
85}
86
87#[cfg(test)]
88impl Scanner {
89    /// Returns number of files to scan.
90    pub(crate) fn num_files(&self) -> usize {
91        match self {
92            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
93            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
94            Scanner::Series(series_scan) => series_scan.input().num_files(),
95        }
96    }
97
98    /// Returns number of memtables to scan.
99    pub(crate) fn num_memtables(&self) -> usize {
100        match self {
101            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
102            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
103            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
104        }
105    }
106
107    /// Returns SST file ids to scan.
108    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
109        match self {
110            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
111            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
112            Scanner::Series(series_scan) => series_scan.input().file_ids(),
113        }
114    }
115
116    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
117    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
118        use store_api::region_engine::{PrepareRequest, RegionScanner};
119
120        let request = PrepareRequest::default().with_target_partitions(target_partitions);
121        match self {
122            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
123            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
124            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
125        }
126    }
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130/// Helper to scans a region by [ScanRequest].
131///
132/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
133/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
134///
135/// ```mermaid
136/// classDiagram
137/// class ScanRegion {
138///     -VersionRef version
139///     -ScanRequest request
140///     ~scanner() Scanner
141///     ~seq_scan() SeqScan
142/// }
143/// class Scanner {
144///     <<enumeration>>
145///     SeqScan
146///     UnorderedScan
147///     +scan() SendableRecordBatchStream
148/// }
149/// class SeqScan {
150///     -ScanInput input
151///     +build() SendableRecordBatchStream
152/// }
153/// class UnorderedScan {
154///     -ScanInput input
155///     +build() SendableRecordBatchStream
156/// }
157/// class ScanInput {
158///     -ProjectionMapper mapper
159///     -Option~TimeRange~ time_range
160///     -Option~Predicate~ predicate
161///     -Vec~MemtableRef~ memtables
162///     -Vec~FileHandle~ files
163/// }
164/// class ProjectionMapper {
165///     ~output_schema() SchemaRef
166///     ~convert(Batch) RecordBatch
167/// }
168/// ScanRegion -- Scanner
169/// ScanRegion o-- ScanRequest
170/// Scanner o-- SeqScan
171/// Scanner o-- UnorderedScan
172/// SeqScan o-- ScanInput
173/// UnorderedScan o-- ScanInput
174/// Scanner -- SendableRecordBatchStream
175/// ScanInput o-- ProjectionMapper
176/// SeqScan -- SendableRecordBatchStream
177/// UnorderedScan -- SendableRecordBatchStream
178/// ```
179pub(crate) struct ScanRegion {
180    /// Version of the region at scan.
181    version: VersionRef,
182    /// Access layer of the region.
183    access_layer: AccessLayerRef,
184    /// Scan request.
185    request: ScanRequest,
186    /// Cache.
187    cache_strategy: CacheStrategy,
188    /// Capacity of the channel to send data from parallel scan tasks to the main task.
189    parallel_scan_channel_size: usize,
190    /// Whether to ignore inverted index.
191    ignore_inverted_index: bool,
192    /// Whether to ignore fulltext index.
193    ignore_fulltext_index: bool,
194    /// Whether to ignore bloom filter.
195    ignore_bloom_filter: bool,
196    /// Start time of the scan task.
197    start_time: Option<Instant>,
198}
199
200impl ScanRegion {
201    /// Creates a [ScanRegion].
202    pub(crate) fn new(
203        version: VersionRef,
204        access_layer: AccessLayerRef,
205        request: ScanRequest,
206        cache_strategy: CacheStrategy,
207    ) -> ScanRegion {
208        ScanRegion {
209            version,
210            access_layer,
211            request,
212            cache_strategy,
213            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
214            ignore_inverted_index: false,
215            ignore_fulltext_index: false,
216            ignore_bloom_filter: false,
217            start_time: None,
218        }
219    }
220
221    /// Sets parallel scan task channel size.
222    #[must_use]
223    pub(crate) fn with_parallel_scan_channel_size(
224        mut self,
225        parallel_scan_channel_size: usize,
226    ) -> Self {
227        self.parallel_scan_channel_size = parallel_scan_channel_size;
228        self
229    }
230
231    /// Sets whether to ignore inverted index.
232    #[must_use]
233    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
234        self.ignore_inverted_index = ignore;
235        self
236    }
237
238    /// Sets whether to ignore fulltext index.
239    #[must_use]
240    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
241        self.ignore_fulltext_index = ignore;
242        self
243    }
244
245    /// Sets whether to ignore bloom filter.
246    #[must_use]
247    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
248        self.ignore_bloom_filter = ignore;
249        self
250    }
251
252    #[must_use]
253    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
254        self.start_time = Some(now);
255        self
256    }
257
258    /// Returns a [Scanner] to scan the region.
259    pub(crate) fn scanner(self) -> Result<Scanner> {
260        if self.use_series_scan() {
261            self.series_scan().map(Scanner::Series)
262        } else if self.use_unordered_scan() {
263            // If table is append only and there is no series row selector, we use unordered scan in query.
264            // We still use seq scan in compaction.
265            self.unordered_scan().map(Scanner::Unordered)
266        } else {
267            self.seq_scan().map(Scanner::Seq)
268        }
269    }
270
271    /// Returns a [RegionScanner] to scan the region.
272    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273    pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
274        if self.use_series_scan() {
275            self.series_scan().map(|scanner| Box::new(scanner) as _)
276        } else if self.use_unordered_scan() {
277            self.unordered_scan().map(|scanner| Box::new(scanner) as _)
278        } else {
279            self.seq_scan().map(|scanner| Box::new(scanner) as _)
280        }
281    }
282
283    /// Scan sequentially.
284    pub(crate) fn seq_scan(self) -> Result<SeqScan> {
285        let input = self.scan_input(true)?;
286        Ok(SeqScan::new(input, false))
287    }
288
289    /// Unordered scan.
290    pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
291        let input = self.scan_input(true)?;
292        Ok(UnorderedScan::new(input))
293    }
294
295    /// Scans by series.
296    pub(crate) fn series_scan(self) -> Result<SeriesScan> {
297        let input = self.scan_input(true)?;
298        Ok(SeriesScan::new(input))
299    }
300
301    #[cfg(test)]
302    pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
303        let input = self.scan_input(false)?;
304        Ok(SeqScan::new(input, false))
305    }
306
307    /// Returns true if the region can use unordered scan for current request.
308    fn use_unordered_scan(&self) -> bool {
309        // We use unordered scan when:
310        // 1. The region is in append mode.
311        // 2. There is no series row selector.
312        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
313        //
314        // We still use seq scan in compaction.
315        self.version.options.append_mode
316            && self.request.series_row_selector.is_none()
317            && (self.request.distribution.is_none()
318                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
319    }
320
321    /// Returns true if the region can use series scan for current request.
322    fn use_series_scan(&self) -> bool {
323        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
324    }
325
326    /// Creates a scan input.
327    fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
328        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
329        let time_range = self.build_time_range_predicate();
330
331        let ssts = &self.version.ssts;
332        let mut files = Vec::new();
333        for level in ssts.levels() {
334            for file in level.files.values() {
335                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
336                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
337                    // If the file's sequence is None (or actually is zero), it could mean the file
338                    // is generated and added to the region "directly". In this case, its data should
339                    // be considered as fresh as the memtable. So its sequence is treated greater than
340                    // the min_sequence, whatever the value of min_sequence is. Hence the default
341                    // "true" in this arm.
342                    (Some(_), None) => true,
343                    (None, _) => true,
344                };
345
346                // Finds SST files in range.
347                if exceed_min_sequence && file_in_range(file, &time_range) {
348                    files.push(file.clone());
349                }
350                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
351                // unless the timing is too good, or the sequence number wouldn't be in file.
352                // and the batch will be filtered out by tree reader anyway.
353            }
354        }
355
356        let memtables = self.version.memtables.list_memtables();
357        // Skip empty memtables and memtables out of time range.
358        let memtables: Vec<_> = memtables
359            .into_iter()
360            .filter(|mem| {
361                // check if memtable is empty by reading stats.
362                let Some((start, end)) = mem.stats().time_range() else {
363                    return false;
364                };
365                // The time range of the memtable is inclusive.
366                let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
367                memtable_range.intersects(&time_range)
368            })
369            .collect();
370
371        debug!(
372            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
373            self.version.metadata.region_id,
374            self.request,
375            time_range,
376            memtables.len(),
377            files.len(),
378            self.version.options.append_mode,
379        );
380
381        // Remove field filters for LastNonNull mode after logging the request.
382        self.maybe_remove_field_filters();
383
384        let inverted_index_applier = self.build_invereted_index_applier();
385        let bloom_filter_applier = self.build_bloom_filter_applier();
386        let fulltext_index_applier = self.build_fulltext_index_applier();
387        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
388        // The mapper always computes projected column ids as the schema of SSTs may change.
389        let mapper = match &self.request.projection {
390            Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
391            None => ProjectionMapper::all(&self.version.metadata)?,
392        };
393        // Get memtable ranges to scan.
394        let memtables = memtables
395            .into_iter()
396            .map(|mem| {
397                mem.ranges(
398                    Some(mapper.column_ids()),
399                    predicate.clone(),
400                    self.request.sequence,
401                )
402                .map(MemRangeBuilder::new)
403            })
404            .collect::<Result<Vec<_>>>()?;
405
406        let input = ScanInput::new(self.access_layer, mapper)
407            .with_time_range(Some(time_range))
408            .with_predicate(predicate)
409            .with_memtables(memtables)
410            .with_files(files)
411            .with_cache(self.cache_strategy)
412            .with_inverted_index_applier(inverted_index_applier)
413            .with_bloom_filter_index_applier(bloom_filter_applier)
414            .with_fulltext_index_applier(fulltext_index_applier)
415            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
416            .with_start_time(self.start_time)
417            .with_append_mode(self.version.options.append_mode)
418            .with_filter_deleted(filter_deleted)
419            .with_merge_mode(self.version.options.merge_mode())
420            .with_series_row_selector(self.request.series_row_selector)
421            .with_distribution(self.request.distribution);
422        Ok(input)
423    }
424
425    /// Build time range predicate from filters.
426    fn build_time_range_predicate(&self) -> TimestampRange {
427        let time_index = self.version.metadata.time_index_column();
428        let unit = time_index
429            .column_schema
430            .data_type
431            .as_timestamp()
432            .expect("Time index must have timestamp-compatible type")
433            .unit();
434        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
435    }
436
437    /// Remove field filters if the merge mode is [MergeMode::LastNonNull].
438    fn maybe_remove_field_filters(&mut self) {
439        if self.version.options.merge_mode() != MergeMode::LastNonNull {
440            return;
441        }
442
443        // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
444        let field_columns = self
445            .version
446            .metadata
447            .field_columns()
448            .map(|col| &col.column_schema.name)
449            .collect::<HashSet<_>>();
450        // Columns in the expr.
451        let mut columns = HashSet::new();
452
453        self.request.filters.retain(|expr| {
454            columns.clear();
455            // `expr_to_columns` won't return error.
456            if expr_to_columns(expr, &mut columns).is_err() {
457                return false;
458            }
459            for column in &columns {
460                if field_columns.contains(&column.name) {
461                    // This expr uses the field column.
462                    return false;
463                }
464            }
465            true
466        });
467    }
468
469    /// Use the latest schema to build the inverted index applier.
470    fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
471        if self.ignore_inverted_index {
472            return None;
473        }
474
475        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
476        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
477
478        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
479
480        InvertedIndexApplierBuilder::new(
481            self.access_layer.region_dir().to_string(),
482            self.access_layer.object_store().clone(),
483            self.version.metadata.as_ref(),
484            self.version.metadata.inverted_indexed_column_ids(
485                self.version
486                    .options
487                    .index_options
488                    .inverted_index
489                    .ignore_column_ids
490                    .iter(),
491            ),
492            self.access_layer.puffin_manager_factory().clone(),
493        )
494        .with_file_cache(file_cache)
495        .with_inverted_index_cache(inverted_index_cache)
496        .with_puffin_metadata_cache(puffin_metadata_cache)
497        .build(&self.request.filters)
498        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
499        .ok()
500        .flatten()
501        .map(Arc::new)
502    }
503
504    /// Use the latest schema to build the bloom filter index applier.
505    fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
506        if self.ignore_bloom_filter {
507            return None;
508        }
509
510        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
511        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
512        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
513
514        BloomFilterIndexApplierBuilder::new(
515            self.access_layer.region_dir().to_string(),
516            self.access_layer.object_store().clone(),
517            self.version.metadata.as_ref(),
518            self.access_layer.puffin_manager_factory().clone(),
519        )
520        .with_file_cache(file_cache)
521        .with_bloom_filter_index_cache(bloom_filter_index_cache)
522        .with_puffin_metadata_cache(puffin_metadata_cache)
523        .build(&self.request.filters)
524        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
525        .ok()
526        .flatten()
527        .map(Arc::new)
528    }
529
530    /// Use the latest schema to build the fulltext index applier.
531    fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
532        if self.ignore_fulltext_index {
533            return None;
534        }
535
536        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
537        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
538        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
539        FulltextIndexApplierBuilder::new(
540            self.access_layer.region_dir().to_string(),
541            self.version.metadata.region_id,
542            self.access_layer.object_store().clone(),
543            self.access_layer.puffin_manager_factory().clone(),
544            self.version.metadata.as_ref(),
545        )
546        .with_file_cache(file_cache)
547        .with_puffin_metadata_cache(puffin_metadata_cache)
548        .with_bloom_filter_cache(bloom_filter_index_cache)
549        .build(&self.request.filters)
550        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
551        .ok()
552        .flatten()
553        .map(Arc::new)
554    }
555}
556
557/// Returns true if the time range of a SST `file` matches the `predicate`.
558fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
559    if predicate == &TimestampRange::min_to_max() {
560        return true;
561    }
562    // end timestamp of a SST is inclusive.
563    let (start, end) = file.time_range();
564    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
565    file_ts_range.intersects(predicate)
566}
567
568/// Common input for different scanners.
569pub(crate) struct ScanInput {
570    /// Region SST access layer.
571    access_layer: AccessLayerRef,
572    /// Maps projected Batches to RecordBatches.
573    pub(crate) mapper: Arc<ProjectionMapper>,
574    /// Time range filter for time index.
575    time_range: Option<TimestampRange>,
576    /// Predicate to push down.
577    pub(crate) predicate: PredicateGroup,
578    /// Memtable range builders for memtables in the time range..
579    pub(crate) memtables: Vec<MemRangeBuilder>,
580    /// Handles to SST files to scan.
581    pub(crate) files: Vec<FileHandle>,
582    /// Cache.
583    pub(crate) cache_strategy: CacheStrategy,
584    /// Ignores file not found error.
585    ignore_file_not_found: bool,
586    /// Capacity of the channel to send data from parallel scan tasks to the main task.
587    pub(crate) parallel_scan_channel_size: usize,
588    /// Index appliers.
589    inverted_index_applier: Option<InvertedIndexApplierRef>,
590    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
591    fulltext_index_applier: Option<FulltextIndexApplierRef>,
592    /// Start time of the query.
593    pub(crate) query_start: Option<Instant>,
594    /// The region is using append mode.
595    pub(crate) append_mode: bool,
596    /// Whether to remove deletion markers.
597    pub(crate) filter_deleted: bool,
598    /// Mode to merge duplicate rows.
599    pub(crate) merge_mode: MergeMode,
600    /// Hint to select rows from time series.
601    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
602    /// Hint for the required distribution of the scanner.
603    pub(crate) distribution: Option<TimeSeriesDistribution>,
604}
605
606impl ScanInput {
607    /// Creates a new [ScanInput].
608    #[must_use]
609    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
610        ScanInput {
611            access_layer,
612            mapper: Arc::new(mapper),
613            time_range: None,
614            predicate: PredicateGroup::default(),
615            memtables: Vec::new(),
616            files: Vec::new(),
617            cache_strategy: CacheStrategy::Disabled,
618            ignore_file_not_found: false,
619            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
620            inverted_index_applier: None,
621            bloom_filter_index_applier: None,
622            fulltext_index_applier: None,
623            query_start: None,
624            append_mode: false,
625            filter_deleted: true,
626            merge_mode: MergeMode::default(),
627            series_row_selector: None,
628            distribution: None,
629        }
630    }
631
632    /// Sets time range filter for time index.
633    #[must_use]
634    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
635        self.time_range = time_range;
636        self
637    }
638
639    /// Sets predicate to push down.
640    #[must_use]
641    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
642        self.predicate = predicate;
643        self
644    }
645
646    /// Sets memtable range builders.
647    #[must_use]
648    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
649        self.memtables = memtables;
650        self
651    }
652
653    /// Sets files to read.
654    #[must_use]
655    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
656        self.files = files;
657        self
658    }
659
660    /// Sets cache for this query.
661    #[must_use]
662    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
663        self.cache_strategy = cache;
664        self
665    }
666
667    /// Ignores file not found error.
668    #[must_use]
669    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
670        self.ignore_file_not_found = ignore;
671        self
672    }
673
674    /// Sets scan task channel size.
675    #[must_use]
676    pub(crate) fn with_parallel_scan_channel_size(
677        mut self,
678        parallel_scan_channel_size: usize,
679    ) -> Self {
680        self.parallel_scan_channel_size = parallel_scan_channel_size;
681        self
682    }
683
684    /// Sets invereted index applier.
685    #[must_use]
686    pub(crate) fn with_inverted_index_applier(
687        mut self,
688        applier: Option<InvertedIndexApplierRef>,
689    ) -> Self {
690        self.inverted_index_applier = applier;
691        self
692    }
693
694    /// Sets bloom filter applier.
695    #[must_use]
696    pub(crate) fn with_bloom_filter_index_applier(
697        mut self,
698        applier: Option<BloomFilterIndexApplierRef>,
699    ) -> Self {
700        self.bloom_filter_index_applier = applier;
701        self
702    }
703
704    /// Sets fulltext index applier.
705    #[must_use]
706    pub(crate) fn with_fulltext_index_applier(
707        mut self,
708        applier: Option<FulltextIndexApplierRef>,
709    ) -> Self {
710        self.fulltext_index_applier = applier;
711        self
712    }
713
714    /// Sets start time of the query.
715    #[must_use]
716    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
717        self.query_start = now;
718        self
719    }
720
721    #[must_use]
722    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
723        self.append_mode = is_append_mode;
724        self
725    }
726
727    /// Sets whether to remove deletion markers during scan.
728    #[must_use]
729    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
730        self.filter_deleted = filter_deleted;
731        self
732    }
733
734    /// Sets the merge mode.
735    #[must_use]
736    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
737        self.merge_mode = merge_mode;
738        self
739    }
740
741    /// Sets the distribution hint.
742    #[must_use]
743    pub(crate) fn with_distribution(
744        mut self,
745        distribution: Option<TimeSeriesDistribution>,
746    ) -> Self {
747        self.distribution = distribution;
748        self
749    }
750
751    /// Sets the time series row selector.
752    #[must_use]
753    pub(crate) fn with_series_row_selector(
754        mut self,
755        series_row_selector: Option<TimeSeriesRowSelector>,
756    ) -> Self {
757        self.series_row_selector = series_row_selector;
758        self
759    }
760
761    /// Scans sources in parallel.
762    ///
763    /// # Panics if the input doesn't allow parallel scan.
764    pub(crate) fn create_parallel_sources(
765        &self,
766        sources: Vec<Source>,
767        semaphore: Arc<Semaphore>,
768    ) -> Result<Vec<Source>> {
769        if sources.len() <= 1 {
770            return Ok(sources);
771        }
772
773        // Spawn a task for each source.
774        let sources = sources
775            .into_iter()
776            .map(|source| {
777                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
778                self.spawn_scan_task(source, semaphore.clone(), sender);
779                let stream = Box::pin(ReceiverStream::new(receiver));
780                Source::Stream(stream)
781            })
782            .collect();
783        Ok(sources)
784    }
785
786    /// Builds memtable ranges to scan by `index`.
787    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
788        let memtable = &self.memtables[index.index];
789        let mut ranges = SmallVec::new();
790        memtable.build_ranges(index.row_group_index, &mut ranges);
791        ranges
792    }
793
794    /// Prunes a file to scan and returns the builder to build readers.
795    pub(crate) async fn prune_file(
796        &self,
797        file_index: usize,
798        reader_metrics: &mut ReaderMetrics,
799    ) -> Result<FileRangeBuilder> {
800        let file = &self.files[file_index];
801        let res = self
802            .access_layer
803            .read_sst(file.clone())
804            .predicate(self.predicate.predicate().cloned())
805            .projection(Some(self.mapper.column_ids().to_vec()))
806            .cache(self.cache_strategy.clone())
807            .inverted_index_applier(self.inverted_index_applier.clone())
808            .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
809            .fulltext_index_applier(self.fulltext_index_applier.clone())
810            .expected_metadata(Some(self.mapper.metadata().clone()))
811            .build_reader_input(reader_metrics)
812            .await;
813        let (mut file_range_ctx, selection) = match res {
814            Ok(x) => x,
815            Err(e) => {
816                if e.is_object_not_found() && self.ignore_file_not_found {
817                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
818                    return Ok(FileRangeBuilder::default());
819                } else {
820                    return Err(e);
821                }
822            }
823        };
824        if !compat::has_same_columns_and_pk_encoding(
825            self.mapper.metadata(),
826            file_range_ctx.read_format().metadata(),
827        ) {
828            // They have different schema. We need to adapt the batch first so the
829            // mapper can convert it.
830            let compat = CompatBatch::new(
831                &self.mapper,
832                file_range_ctx.read_format().metadata().clone(),
833            )?;
834            file_range_ctx.set_compat_batch(Some(compat));
835        }
836        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
837    }
838
839    /// Scans the input source in another task and sends batches to the sender.
840    pub(crate) fn spawn_scan_task(
841        &self,
842        mut input: Source,
843        semaphore: Arc<Semaphore>,
844        sender: mpsc::Sender<Result<Batch>>,
845    ) {
846        common_runtime::spawn_global(async move {
847            loop {
848                // We release the permit before sending result to avoid the task waiting on
849                // the channel with the permit held.
850                let maybe_batch = {
851                    // Safety: We never close the semaphore.
852                    let _permit = semaphore.acquire().await.unwrap();
853                    input.next_batch().await
854                };
855                match maybe_batch {
856                    Ok(Some(batch)) => {
857                        let _ = sender.send(Ok(batch)).await;
858                    }
859                    Ok(None) => break,
860                    Err(e) => {
861                        let _ = sender.send(Err(e)).await;
862                        break;
863                    }
864                }
865            }
866        });
867    }
868
869    pub(crate) fn total_rows(&self) -> usize {
870        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
871        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
872        rows_in_files + rows_in_memtables
873    }
874
875    /// Returns table predicate of all exprs.
876    pub(crate) fn predicate(&self) -> Option<&Predicate> {
877        self.predicate.predicate()
878    }
879
880    /// Returns number of memtables to scan.
881    pub(crate) fn num_memtables(&self) -> usize {
882        self.memtables.len()
883    }
884
885    /// Returns number of SST files to scan.
886    pub(crate) fn num_files(&self) -> usize {
887        self.files.len()
888    }
889}
890
891#[cfg(test)]
892impl ScanInput {
893    /// Returns SST file ids to scan.
894    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
895        self.files.iter().map(|file| file.file_id()).collect()
896    }
897}
898
899/// Context shared by different streams from a scanner.
900/// It contains the input and ranges to scan.
901pub(crate) struct StreamContext {
902    /// Input memtables and files.
903    pub(crate) input: ScanInput,
904    /// Metadata for partition ranges.
905    pub(crate) ranges: Vec<RangeMeta>,
906
907    // Metrics:
908    /// The start time of the query.
909    pub(crate) query_start: Instant,
910}
911
912impl StreamContext {
913    /// Creates a new [StreamContext] for [SeqScan].
914    pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
915        let query_start = input.query_start.unwrap_or_else(Instant::now);
916        let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
917        READ_SST_COUNT.observe(input.num_files() as f64);
918
919        Self {
920            input,
921            ranges,
922            query_start,
923        }
924    }
925
926    /// Creates a new [StreamContext] for [UnorderedScan].
927    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
928        let query_start = input.query_start.unwrap_or_else(Instant::now);
929        let ranges = RangeMeta::unordered_scan_ranges(&input);
930        READ_SST_COUNT.observe(input.num_files() as f64);
931
932        Self {
933            input,
934            ranges,
935            query_start,
936        }
937    }
938
939    /// Returns true if the index refers to a memtable.
940    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
941        self.input.num_memtables() > index.index
942    }
943
944    /// Retrieves the partition ranges.
945    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
946        self.ranges
947            .iter()
948            .enumerate()
949            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
950            .collect()
951    }
952
953    /// Format the context for explain.
954    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
955        let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
956        for range_meta in &self.ranges {
957            for idx in &range_meta.row_group_indices {
958                if self.is_mem_range_index(*idx) {
959                    num_mem_ranges += 1;
960                } else {
961                    num_file_ranges += 1;
962                }
963            }
964        }
965        write!(
966            f,
967            "partition_count={} ({} memtable ranges, {} file {} ranges)",
968            self.ranges.len(),
969            num_mem_ranges,
970            self.input.num_files(),
971            num_file_ranges,
972        )?;
973        if let Some(selector) = &self.input.series_row_selector {
974            write!(f, ", selector={}", selector)?;
975        }
976        if let Some(distribution) = &self.input.distribution {
977            write!(f, ", distribution={}", distribution)?;
978        }
979
980        if verbose {
981            self.format_verbose_content(f)?;
982        }
983
984        Ok(())
985    }
986
987    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
988        struct FileWrapper<'a> {
989            file: &'a FileHandle,
990        }
991
992        impl fmt::Debug for FileWrapper<'_> {
993            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
994                write!(
995                    f,
996                    "[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
997                    self.file.file_id(),
998                    self.file.time_range().0.value(),
999                    self.file.time_range().0.unit(),
1000                    self.file.time_range().1.value(),
1001                    self.file.time_range().1.unit(),
1002                    self.file.num_rows(),
1003                    self.file.size(),
1004                    self.file.index_size()
1005                )
1006            }
1007        }
1008
1009        struct InputWrapper<'a> {
1010            input: &'a ScanInput,
1011        }
1012
1013        impl fmt::Debug for InputWrapper<'_> {
1014            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1015                let output_schema = self.input.mapper.output_schema();
1016                if !output_schema.is_empty() {
1017                    write!(f, ", projection=")?;
1018                    f.debug_list()
1019                        .entries(output_schema.column_schemas().iter().map(|col| &col.name))
1020                        .finish()?;
1021                }
1022                if let Some(predicate) = &self.input.predicate.predicate() {
1023                    if !predicate.exprs().is_empty() {
1024                        write!(f, ", filters=[")?;
1025                        for (i, expr) in predicate.exprs().iter().enumerate() {
1026                            if i == predicate.exprs().len() - 1 {
1027                                write!(f, "{}]", expr)?;
1028                            } else {
1029                                write!(f, "{}, ", expr)?;
1030                            }
1031                        }
1032                    }
1033                }
1034                if !self.input.files.is_empty() {
1035                    write!(f, ", files=")?;
1036                    f.debug_list()
1037                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1038                        .finish()?;
1039                }
1040
1041                Ok(())
1042            }
1043        }
1044
1045        write!(f, "{:?}", InputWrapper { input: &self.input })
1046    }
1047}
1048
1049/// Predicates to evaluate.
1050/// It only keeps filters that [SimpleFilterEvaluator] supports.
1051#[derive(Clone, Default)]
1052pub struct PredicateGroup {
1053    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1054
1055    /// Table predicate for all logical exprs to evaluate.
1056    /// Parquet reader uses it to prune row groups.
1057    predicate: Option<Predicate>,
1058}
1059
1060impl PredicateGroup {
1061    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1062    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1063        let mut time_filters = Vec::with_capacity(exprs.len());
1064        // Columns in the expr.
1065        let mut columns = HashSet::new();
1066        for expr in exprs {
1067            columns.clear();
1068            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1069                continue;
1070            };
1071            time_filters.push(filter);
1072        }
1073        let time_filters = if time_filters.is_empty() {
1074            None
1075        } else {
1076            Some(Arc::new(time_filters))
1077        };
1078        let predicate = Predicate::new(exprs.to_vec());
1079
1080        Self {
1081            time_filters,
1082            predicate: Some(predicate),
1083        }
1084    }
1085
1086    /// Returns time filters.
1087    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1088        self.time_filters.clone()
1089    }
1090
1091    /// Returns predicate of all exprs.
1092    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1093        self.predicate.as_ref()
1094    }
1095
1096    fn expr_to_filter(
1097        expr: &Expr,
1098        metadata: &RegionMetadata,
1099        columns: &mut HashSet<Column>,
1100    ) -> Option<SimpleFilterEvaluator> {
1101        columns.clear();
1102        // `expr_to_columns` won't return error.
1103        // We still ignore these expressions for safety.
1104        expr_to_columns(expr, columns).ok()?;
1105        if columns.len() > 1 {
1106            // Simple filter doesn't support multiple columns.
1107            return None;
1108        }
1109        let column = columns.iter().next()?;
1110        let column_meta = metadata.column_by_name(&column.name)?;
1111        if column_meta.semantic_type == SemanticType::Timestamp {
1112            SimpleFilterEvaluator::try_new(expr)
1113        } else {
1114            None
1115        }
1116    }
1117}