mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use smallvec::SmallVec;
33use store_api::metadata::{RegionMetadata, RegionMetadataRef};
34use store_api::region_engine::{PartitionRange, RegionScannerRef};
35use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
36use table::predicate::{build_time_range_predicate, Predicate};
37use tokio::sync::{mpsc, Semaphore};
38use tokio_stream::wrappers::ReceiverStream;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheStrategy;
42use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
43use crate::error::Result;
44#[cfg(feature = "enterprise")]
45use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
46use crate::memtable::MemtableRange;
47use crate::metrics::READ_SST_COUNT;
48use crate::read::compat::{self, CompatBatch};
49use crate::read::projection::ProjectionMapper;
50use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
51use crate::read::seq_scan::SeqScan;
52use crate::read::series_scan::SeriesScan;
53use crate::read::stream::ScanBatchStream;
54use crate::read::unordered_scan::UnorderedScan;
55use crate::read::{Batch, Source};
56use crate::region::options::MergeMode;
57use crate::region::version::VersionRef;
58use crate::sst::file::FileHandle;
59use crate::sst::index::bloom_filter::applier::{
60    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
61};
62use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
63use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
64use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
65use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
66use crate::sst::parquet::reader::ReaderMetrics;
67
68/// A scanner scans a region and returns a [SendableRecordBatchStream].
69pub(crate) enum Scanner {
70    /// Sequential scan.
71    Seq(SeqScan),
72    /// Unordered scan.
73    Unordered(UnorderedScan),
74    /// Per-series scan.
75    Series(SeriesScan),
76}
77
78impl Scanner {
79    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
80    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
81    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
82        match self {
83            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
84            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
85            Scanner::Series(series_scan) => series_scan.build_stream().await,
86        }
87    }
88
89    /// Create a stream of [`Batch`] by this scanner.
90    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
91        match self {
92            Scanner::Seq(x) => x.scan_all_partitions(),
93            Scanner::Unordered(x) => x.scan_all_partitions(),
94            Scanner::Series(x) => x.scan_all_partitions(),
95        }
96    }
97}
98
99#[cfg(test)]
100impl Scanner {
101    /// Returns number of files to scan.
102    pub(crate) fn num_files(&self) -> usize {
103        match self {
104            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
105            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
106            Scanner::Series(series_scan) => series_scan.input().num_files(),
107        }
108    }
109
110    /// Returns number of memtables to scan.
111    pub(crate) fn num_memtables(&self) -> usize {
112        match self {
113            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
114            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
115            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
116        }
117    }
118
119    /// Returns SST file ids to scan.
120    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
121        match self {
122            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
123            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
124            Scanner::Series(series_scan) => series_scan.input().file_ids(),
125        }
126    }
127
128    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
129    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
130        use store_api::region_engine::{PrepareRequest, RegionScanner};
131
132        let request = PrepareRequest::default().with_target_partitions(target_partitions);
133        match self {
134            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
135            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
136            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
137        }
138    }
139}
140
141#[cfg_attr(doc, aquamarine::aquamarine)]
142/// Helper to scans a region by [ScanRequest].
143///
144/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
145/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
146///
147/// ```mermaid
148/// classDiagram
149/// class ScanRegion {
150///     -VersionRef version
151///     -ScanRequest request
152///     ~scanner() Scanner
153///     ~seq_scan() SeqScan
154/// }
155/// class Scanner {
156///     <<enumeration>>
157///     SeqScan
158///     UnorderedScan
159///     +scan() SendableRecordBatchStream
160/// }
161/// class SeqScan {
162///     -ScanInput input
163///     +build() SendableRecordBatchStream
164/// }
165/// class UnorderedScan {
166///     -ScanInput input
167///     +build() SendableRecordBatchStream
168/// }
169/// class ScanInput {
170///     -ProjectionMapper mapper
171///     -Option~TimeRange~ time_range
172///     -Option~Predicate~ predicate
173///     -Vec~MemtableRef~ memtables
174///     -Vec~FileHandle~ files
175/// }
176/// class ProjectionMapper {
177///     ~output_schema() SchemaRef
178///     ~convert(Batch) RecordBatch
179/// }
180/// ScanRegion -- Scanner
181/// ScanRegion o-- ScanRequest
182/// Scanner o-- SeqScan
183/// Scanner o-- UnorderedScan
184/// SeqScan o-- ScanInput
185/// UnorderedScan o-- ScanInput
186/// Scanner -- SendableRecordBatchStream
187/// ScanInput o-- ProjectionMapper
188/// SeqScan -- SendableRecordBatchStream
189/// UnorderedScan -- SendableRecordBatchStream
190/// ```
191pub(crate) struct ScanRegion {
192    /// Version of the region at scan.
193    version: VersionRef,
194    /// Access layer of the region.
195    access_layer: AccessLayerRef,
196    /// Scan request.
197    request: ScanRequest,
198    /// Cache.
199    cache_strategy: CacheStrategy,
200    /// Capacity of the channel to send data from parallel scan tasks to the main task.
201    parallel_scan_channel_size: usize,
202    /// Whether to ignore inverted index.
203    ignore_inverted_index: bool,
204    /// Whether to ignore fulltext index.
205    ignore_fulltext_index: bool,
206    /// Whether to ignore bloom filter.
207    ignore_bloom_filter: bool,
208    /// Start time of the scan task.
209    start_time: Option<Instant>,
210    /// Whether to filter out the deleted rows.
211    /// Usually true for normal read, and false for scan for compaction.
212    filter_deleted: bool,
213    #[cfg(feature = "enterprise")]
214    extension_range_provider: Option<BoxedExtensionRangeProvider>,
215}
216
217impl ScanRegion {
218    /// Creates a [ScanRegion].
219    pub(crate) fn new(
220        version: VersionRef,
221        access_layer: AccessLayerRef,
222        request: ScanRequest,
223        cache_strategy: CacheStrategy,
224    ) -> ScanRegion {
225        ScanRegion {
226            version,
227            access_layer,
228            request,
229            cache_strategy,
230            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
231            ignore_inverted_index: false,
232            ignore_fulltext_index: false,
233            ignore_bloom_filter: false,
234            start_time: None,
235            filter_deleted: true,
236            #[cfg(feature = "enterprise")]
237            extension_range_provider: None,
238        }
239    }
240
241    /// Sets parallel scan task channel size.
242    #[must_use]
243    pub(crate) fn with_parallel_scan_channel_size(
244        mut self,
245        parallel_scan_channel_size: usize,
246    ) -> Self {
247        self.parallel_scan_channel_size = parallel_scan_channel_size;
248        self
249    }
250
251    /// Sets whether to ignore inverted index.
252    #[must_use]
253    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
254        self.ignore_inverted_index = ignore;
255        self
256    }
257
258    /// Sets whether to ignore fulltext index.
259    #[must_use]
260    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
261        self.ignore_fulltext_index = ignore;
262        self
263    }
264
265    /// Sets whether to ignore bloom filter.
266    #[must_use]
267    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
268        self.ignore_bloom_filter = ignore;
269        self
270    }
271
272    #[must_use]
273    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
274        self.start_time = Some(now);
275        self
276    }
277
278    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
279        self.filter_deleted = filter_deleted;
280    }
281
282    #[cfg(feature = "enterprise")]
283    pub(crate) fn set_extension_range_provider(
284        &mut self,
285        extension_range_provider: BoxedExtensionRangeProvider,
286    ) {
287        self.extension_range_provider = Some(extension_range_provider);
288    }
289
290    /// Returns a [Scanner] to scan the region.
291    pub(crate) async fn scanner(self) -> Result<Scanner> {
292        if self.use_series_scan() {
293            self.series_scan().await.map(Scanner::Series)
294        } else if self.use_unordered_scan() {
295            // If table is append only and there is no series row selector, we use unordered scan in query.
296            // We still use seq scan in compaction.
297            self.unordered_scan().await.map(Scanner::Unordered)
298        } else {
299            self.seq_scan().await.map(Scanner::Seq)
300        }
301    }
302
303    /// Returns a [RegionScanner] to scan the region.
304    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
305    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
306        if self.use_series_scan() {
307            self.series_scan()
308                .await
309                .map(|scanner| Box::new(scanner) as _)
310        } else if self.use_unordered_scan() {
311            self.unordered_scan()
312                .await
313                .map(|scanner| Box::new(scanner) as _)
314        } else {
315            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
316        }
317    }
318
319    /// Scan sequentially.
320    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
321        let input = self.scan_input().await?;
322        Ok(SeqScan::new(input, false))
323    }
324
325    /// Unordered scan.
326    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
327        let input = self.scan_input().await?;
328        Ok(UnorderedScan::new(input))
329    }
330
331    /// Scans by series.
332    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
333        let input = self.scan_input().await?;
334        Ok(SeriesScan::new(input))
335    }
336
337    /// Returns true if the region can use unordered scan for current request.
338    fn use_unordered_scan(&self) -> bool {
339        // We use unordered scan when:
340        // 1. The region is in append mode.
341        // 2. There is no series row selector.
342        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
343        //
344        // We still use seq scan in compaction.
345        self.version.options.append_mode
346            && self.request.series_row_selector.is_none()
347            && (self.request.distribution.is_none()
348                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
349    }
350
351    /// Returns true if the region can use series scan for current request.
352    fn use_series_scan(&self) -> bool {
353        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
354    }
355
356    /// Creates a scan input.
357    async fn scan_input(mut self) -> Result<ScanInput> {
358        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
359        let time_range = self.build_time_range_predicate();
360
361        let ssts = &self.version.ssts;
362        let mut files = Vec::new();
363        for level in ssts.levels() {
364            for file in level.files.values() {
365                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
366                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
367                    // If the file's sequence is None (or actually is zero), it could mean the file
368                    // is generated and added to the region "directly". In this case, its data should
369                    // be considered as fresh as the memtable. So its sequence is treated greater than
370                    // the min_sequence, whatever the value of min_sequence is. Hence the default
371                    // "true" in this arm.
372                    (Some(_), None) => true,
373                    (None, _) => true,
374                };
375
376                // Finds SST files in range.
377                if exceed_min_sequence && file_in_range(file, &time_range) {
378                    files.push(file.clone());
379                }
380                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
381                // unless the timing is too good, or the sequence number wouldn't be in file.
382                // and the batch will be filtered out by tree reader anyway.
383            }
384        }
385
386        let memtables = self.version.memtables.list_memtables();
387        // Skip empty memtables and memtables out of time range.
388        let memtables: Vec<_> = memtables
389            .into_iter()
390            .filter(|mem| {
391                // check if memtable is empty by reading stats.
392                let Some((start, end)) = mem.stats().time_range() else {
393                    return false;
394                };
395                // The time range of the memtable is inclusive.
396                let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
397                memtable_range.intersects(&time_range)
398            })
399            .collect();
400
401        let region_id = self.region_id();
402        debug!(
403            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
404            region_id,
405            self.request,
406            time_range,
407            memtables.len(),
408            files.len(),
409            self.version.options.append_mode,
410        );
411
412        // Remove field filters for LastNonNull mode after logging the request.
413        self.maybe_remove_field_filters();
414
415        let inverted_index_applier = self.build_invereted_index_applier();
416        let bloom_filter_applier = self.build_bloom_filter_applier();
417        let fulltext_index_applier = self.build_fulltext_index_applier();
418        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
419        // The mapper always computes projected column ids as the schema of SSTs may change.
420        let mapper = match &self.request.projection {
421            Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
422            None => ProjectionMapper::all(&self.version.metadata)?,
423        };
424        // Get memtable ranges to scan.
425        let memtables = memtables
426            .into_iter()
427            .map(|mem| {
428                mem.ranges(
429                    Some(mapper.column_ids()),
430                    predicate.clone(),
431                    self.request.sequence,
432                )
433                .map(MemRangeBuilder::new)
434            })
435            .collect::<Result<Vec<_>>>()?;
436
437        let input = ScanInput::new(self.access_layer, mapper)
438            .with_time_range(Some(time_range))
439            .with_predicate(predicate)
440            .with_memtables(memtables)
441            .with_files(files)
442            .with_cache(self.cache_strategy)
443            .with_inverted_index_applier(inverted_index_applier)
444            .with_bloom_filter_index_applier(bloom_filter_applier)
445            .with_fulltext_index_applier(fulltext_index_applier)
446            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
447            .with_start_time(self.start_time)
448            .with_append_mode(self.version.options.append_mode)
449            .with_filter_deleted(self.filter_deleted)
450            .with_merge_mode(self.version.options.merge_mode())
451            .with_series_row_selector(self.request.series_row_selector)
452            .with_distribution(self.request.distribution);
453
454        #[cfg(feature = "enterprise")]
455        let input = if let Some(provider) = self.extension_range_provider {
456            let ranges = provider
457                .find_extension_ranges(time_range, &self.request)
458                .await?;
459            input.with_extension_ranges(ranges)
460        } else {
461            input
462        };
463        Ok(input)
464    }
465
466    fn region_id(&self) -> RegionId {
467        self.version.metadata.region_id
468    }
469
470    /// Build time range predicate from filters.
471    fn build_time_range_predicate(&self) -> TimestampRange {
472        let time_index = self.version.metadata.time_index_column();
473        let unit = time_index
474            .column_schema
475            .data_type
476            .as_timestamp()
477            .expect("Time index must have timestamp-compatible type")
478            .unit();
479        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
480    }
481
482    /// Remove field filters if the merge mode is [MergeMode::LastNonNull].
483    fn maybe_remove_field_filters(&mut self) {
484        if self.version.options.merge_mode() != MergeMode::LastNonNull {
485            return;
486        }
487
488        // TODO(yingwen): We can ignore field filters only when there are multiple sources in the same time window.
489        let field_columns = self
490            .version
491            .metadata
492            .field_columns()
493            .map(|col| &col.column_schema.name)
494            .collect::<HashSet<_>>();
495        // Columns in the expr.
496        let mut columns = HashSet::new();
497
498        self.request.filters.retain(|expr| {
499            columns.clear();
500            // `expr_to_columns` won't return error.
501            if expr_to_columns(expr, &mut columns).is_err() {
502                return false;
503            }
504            for column in &columns {
505                if field_columns.contains(&column.name) {
506                    // This expr uses the field column.
507                    return false;
508                }
509            }
510            true
511        });
512    }
513
514    /// Use the latest schema to build the inverted index applier.
515    fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
516        if self.ignore_inverted_index {
517            return None;
518        }
519
520        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
521        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
522
523        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
524
525        InvertedIndexApplierBuilder::new(
526            self.access_layer.region_dir().to_string(),
527            self.access_layer.object_store().clone(),
528            self.version.metadata.as_ref(),
529            self.version.metadata.inverted_indexed_column_ids(
530                self.version
531                    .options
532                    .index_options
533                    .inverted_index
534                    .ignore_column_ids
535                    .iter(),
536            ),
537            self.access_layer.puffin_manager_factory().clone(),
538        )
539        .with_file_cache(file_cache)
540        .with_inverted_index_cache(inverted_index_cache)
541        .with_puffin_metadata_cache(puffin_metadata_cache)
542        .build(&self.request.filters)
543        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
544        .ok()
545        .flatten()
546        .map(Arc::new)
547    }
548
549    /// Use the latest schema to build the bloom filter index applier.
550    fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
551        if self.ignore_bloom_filter {
552            return None;
553        }
554
555        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
556        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
557        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
558
559        BloomFilterIndexApplierBuilder::new(
560            self.access_layer.region_dir().to_string(),
561            self.access_layer.object_store().clone(),
562            self.version.metadata.as_ref(),
563            self.access_layer.puffin_manager_factory().clone(),
564        )
565        .with_file_cache(file_cache)
566        .with_bloom_filter_index_cache(bloom_filter_index_cache)
567        .with_puffin_metadata_cache(puffin_metadata_cache)
568        .build(&self.request.filters)
569        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
570        .ok()
571        .flatten()
572        .map(Arc::new)
573    }
574
575    /// Use the latest schema to build the fulltext index applier.
576    fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
577        if self.ignore_fulltext_index {
578            return None;
579        }
580
581        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
582        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
583        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
584        FulltextIndexApplierBuilder::new(
585            self.access_layer.region_dir().to_string(),
586            self.region_id(),
587            self.access_layer.object_store().clone(),
588            self.access_layer.puffin_manager_factory().clone(),
589            self.version.metadata.as_ref(),
590        )
591        .with_file_cache(file_cache)
592        .with_puffin_metadata_cache(puffin_metadata_cache)
593        .with_bloom_filter_cache(bloom_filter_index_cache)
594        .build(&self.request.filters)
595        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
596        .ok()
597        .flatten()
598        .map(Arc::new)
599    }
600}
601
602/// Returns true if the time range of a SST `file` matches the `predicate`.
603fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
604    if predicate == &TimestampRange::min_to_max() {
605        return true;
606    }
607    // end timestamp of a SST is inclusive.
608    let (start, end) = file.time_range();
609    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
610    file_ts_range.intersects(predicate)
611}
612
613/// Common input for different scanners.
614pub struct ScanInput {
615    /// Region SST access layer.
616    access_layer: AccessLayerRef,
617    /// Maps projected Batches to RecordBatches.
618    pub(crate) mapper: Arc<ProjectionMapper>,
619    /// Time range filter for time index.
620    time_range: Option<TimestampRange>,
621    /// Predicate to push down.
622    pub(crate) predicate: PredicateGroup,
623    /// Memtable range builders for memtables in the time range..
624    pub(crate) memtables: Vec<MemRangeBuilder>,
625    /// Handles to SST files to scan.
626    pub(crate) files: Vec<FileHandle>,
627    /// Cache.
628    pub(crate) cache_strategy: CacheStrategy,
629    /// Ignores file not found error.
630    ignore_file_not_found: bool,
631    /// Capacity of the channel to send data from parallel scan tasks to the main task.
632    pub(crate) parallel_scan_channel_size: usize,
633    /// Index appliers.
634    inverted_index_applier: Option<InvertedIndexApplierRef>,
635    bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
636    fulltext_index_applier: Option<FulltextIndexApplierRef>,
637    /// Start time of the query.
638    pub(crate) query_start: Option<Instant>,
639    /// The region is using append mode.
640    pub(crate) append_mode: bool,
641    /// Whether to remove deletion markers.
642    pub(crate) filter_deleted: bool,
643    /// Mode to merge duplicate rows.
644    pub(crate) merge_mode: MergeMode,
645    /// Hint to select rows from time series.
646    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
647    /// Hint for the required distribution of the scanner.
648    pub(crate) distribution: Option<TimeSeriesDistribution>,
649    #[cfg(feature = "enterprise")]
650    extension_ranges: Vec<BoxedExtensionRange>,
651}
652
653impl ScanInput {
654    /// Creates a new [ScanInput].
655    #[must_use]
656    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
657        ScanInput {
658            access_layer,
659            mapper: Arc::new(mapper),
660            time_range: None,
661            predicate: PredicateGroup::default(),
662            memtables: Vec::new(),
663            files: Vec::new(),
664            cache_strategy: CacheStrategy::Disabled,
665            ignore_file_not_found: false,
666            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
667            inverted_index_applier: None,
668            bloom_filter_index_applier: None,
669            fulltext_index_applier: None,
670            query_start: None,
671            append_mode: false,
672            filter_deleted: true,
673            merge_mode: MergeMode::default(),
674            series_row_selector: None,
675            distribution: None,
676            #[cfg(feature = "enterprise")]
677            extension_ranges: Vec::new(),
678        }
679    }
680
681    /// Sets time range filter for time index.
682    #[must_use]
683    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
684        self.time_range = time_range;
685        self
686    }
687
688    /// Sets predicate to push down.
689    #[must_use]
690    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
691        self.predicate = predicate;
692        self
693    }
694
695    /// Sets memtable range builders.
696    #[must_use]
697    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
698        self.memtables = memtables;
699        self
700    }
701
702    /// Sets files to read.
703    #[must_use]
704    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
705        self.files = files;
706        self
707    }
708
709    /// Sets cache for this query.
710    #[must_use]
711    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
712        self.cache_strategy = cache;
713        self
714    }
715
716    /// Ignores file not found error.
717    #[must_use]
718    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
719        self.ignore_file_not_found = ignore;
720        self
721    }
722
723    /// Sets scan task channel size.
724    #[must_use]
725    pub(crate) fn with_parallel_scan_channel_size(
726        mut self,
727        parallel_scan_channel_size: usize,
728    ) -> Self {
729        self.parallel_scan_channel_size = parallel_scan_channel_size;
730        self
731    }
732
733    /// Sets invereted index applier.
734    #[must_use]
735    pub(crate) fn with_inverted_index_applier(
736        mut self,
737        applier: Option<InvertedIndexApplierRef>,
738    ) -> Self {
739        self.inverted_index_applier = applier;
740        self
741    }
742
743    /// Sets bloom filter applier.
744    #[must_use]
745    pub(crate) fn with_bloom_filter_index_applier(
746        mut self,
747        applier: Option<BloomFilterIndexApplierRef>,
748    ) -> Self {
749        self.bloom_filter_index_applier = applier;
750        self
751    }
752
753    /// Sets fulltext index applier.
754    #[must_use]
755    pub(crate) fn with_fulltext_index_applier(
756        mut self,
757        applier: Option<FulltextIndexApplierRef>,
758    ) -> Self {
759        self.fulltext_index_applier = applier;
760        self
761    }
762
763    /// Sets start time of the query.
764    #[must_use]
765    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
766        self.query_start = now;
767        self
768    }
769
770    #[must_use]
771    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
772        self.append_mode = is_append_mode;
773        self
774    }
775
776    /// Sets whether to remove deletion markers during scan.
777    #[must_use]
778    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
779        self.filter_deleted = filter_deleted;
780        self
781    }
782
783    /// Sets the merge mode.
784    #[must_use]
785    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
786        self.merge_mode = merge_mode;
787        self
788    }
789
790    /// Sets the distribution hint.
791    #[must_use]
792    pub(crate) fn with_distribution(
793        mut self,
794        distribution: Option<TimeSeriesDistribution>,
795    ) -> Self {
796        self.distribution = distribution;
797        self
798    }
799
800    /// Sets the time series row selector.
801    #[must_use]
802    pub(crate) fn with_series_row_selector(
803        mut self,
804        series_row_selector: Option<TimeSeriesRowSelector>,
805    ) -> Self {
806        self.series_row_selector = series_row_selector;
807        self
808    }
809
810    /// Scans sources in parallel.
811    ///
812    /// # Panics if the input doesn't allow parallel scan.
813    pub(crate) fn create_parallel_sources(
814        &self,
815        sources: Vec<Source>,
816        semaphore: Arc<Semaphore>,
817    ) -> Result<Vec<Source>> {
818        if sources.len() <= 1 {
819            return Ok(sources);
820        }
821
822        // Spawn a task for each source.
823        let sources = sources
824            .into_iter()
825            .map(|source| {
826                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
827                self.spawn_scan_task(source, semaphore.clone(), sender);
828                let stream = Box::pin(ReceiverStream::new(receiver));
829                Source::Stream(stream)
830            })
831            .collect();
832        Ok(sources)
833    }
834
835    /// Builds memtable ranges to scan by `index`.
836    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
837        let memtable = &self.memtables[index.index];
838        let mut ranges = SmallVec::new();
839        memtable.build_ranges(index.row_group_index, &mut ranges);
840        ranges
841    }
842
843    /// Prunes a file to scan and returns the builder to build readers.
844    pub async fn prune_file(
845        &self,
846        file: &FileHandle,
847        reader_metrics: &mut ReaderMetrics,
848    ) -> Result<FileRangeBuilder> {
849        let res = self
850            .access_layer
851            .read_sst(file.clone())
852            .predicate(self.predicate.predicate().cloned())
853            .projection(Some(self.mapper.column_ids().to_vec()))
854            .cache(self.cache_strategy.clone())
855            .inverted_index_applier(self.inverted_index_applier.clone())
856            .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
857            .fulltext_index_applier(self.fulltext_index_applier.clone())
858            .expected_metadata(Some(self.mapper.metadata().clone()))
859            .build_reader_input(reader_metrics)
860            .await;
861        let (mut file_range_ctx, selection) = match res {
862            Ok(x) => x,
863            Err(e) => {
864                if e.is_object_not_found() && self.ignore_file_not_found {
865                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
866                    return Ok(FileRangeBuilder::default());
867                } else {
868                    return Err(e);
869                }
870            }
871        };
872        if !compat::has_same_columns_and_pk_encoding(
873            self.mapper.metadata(),
874            file_range_ctx.read_format().metadata(),
875        ) {
876            // They have different schema. We need to adapt the batch first so the
877            // mapper can convert it.
878            let compat = CompatBatch::new(
879                &self.mapper,
880                file_range_ctx.read_format().metadata().clone(),
881            )?;
882            file_range_ctx.set_compat_batch(Some(compat));
883        }
884        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
885    }
886
887    /// Scans the input source in another task and sends batches to the sender.
888    pub(crate) fn spawn_scan_task(
889        &self,
890        mut input: Source,
891        semaphore: Arc<Semaphore>,
892        sender: mpsc::Sender<Result<Batch>>,
893    ) {
894        common_runtime::spawn_global(async move {
895            loop {
896                // We release the permit before sending result to avoid the task waiting on
897                // the channel with the permit held.
898                let maybe_batch = {
899                    // Safety: We never close the semaphore.
900                    let _permit = semaphore.acquire().await.unwrap();
901                    input.next_batch().await
902                };
903                match maybe_batch {
904                    Ok(Some(batch)) => {
905                        let _ = sender.send(Ok(batch)).await;
906                    }
907                    Ok(None) => break,
908                    Err(e) => {
909                        let _ = sender.send(Err(e)).await;
910                        break;
911                    }
912                }
913            }
914        });
915    }
916
917    pub(crate) fn total_rows(&self) -> usize {
918        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
919        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
920
921        let rows = rows_in_files + rows_in_memtables;
922        #[cfg(feature = "enterprise")]
923        let rows = rows
924            + self
925                .extension_ranges
926                .iter()
927                .map(|x| x.num_rows())
928                .sum::<u64>() as usize;
929        rows
930    }
931
932    /// Returns table predicate of all exprs.
933    pub(crate) fn predicate(&self) -> Option<&Predicate> {
934        self.predicate.predicate()
935    }
936
937    /// Returns number of memtables to scan.
938    pub(crate) fn num_memtables(&self) -> usize {
939        self.memtables.len()
940    }
941
942    /// Returns number of SST files to scan.
943    pub(crate) fn num_files(&self) -> usize {
944        self.files.len()
945    }
946
947    pub fn region_metadata(&self) -> &RegionMetadataRef {
948        self.mapper.metadata()
949    }
950}
951
952#[cfg(feature = "enterprise")]
953impl ScanInput {
954    #[must_use]
955    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
956        Self {
957            extension_ranges,
958            ..self
959        }
960    }
961
962    #[cfg(feature = "enterprise")]
963    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
964        &self.extension_ranges
965    }
966
967    /// Get a boxed [ExtensionRange] by the index in all ranges.
968    #[cfg(feature = "enterprise")]
969    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
970        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
971    }
972}
973
974#[cfg(test)]
975impl ScanInput {
976    /// Returns SST file ids to scan.
977    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
978        self.files.iter().map(|file| file.file_id()).collect()
979    }
980}
981
982/// Context shared by different streams from a scanner.
983/// It contains the input and ranges to scan.
984pub struct StreamContext {
985    /// Input memtables and files.
986    pub input: ScanInput,
987    /// Metadata for partition ranges.
988    pub(crate) ranges: Vec<RangeMeta>,
989
990    // Metrics:
991    /// The start time of the query.
992    pub(crate) query_start: Instant,
993}
994
995impl StreamContext {
996    /// Creates a new [StreamContext] for [SeqScan].
997    pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
998        let query_start = input.query_start.unwrap_or_else(Instant::now);
999        let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
1000        READ_SST_COUNT.observe(input.num_files() as f64);
1001
1002        Self {
1003            input,
1004            ranges,
1005            query_start,
1006        }
1007    }
1008
1009    /// Creates a new [StreamContext] for [UnorderedScan].
1010    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1011        let query_start = input.query_start.unwrap_or_else(Instant::now);
1012        let ranges = RangeMeta::unordered_scan_ranges(&input);
1013        READ_SST_COUNT.observe(input.num_files() as f64);
1014
1015        Self {
1016            input,
1017            ranges,
1018            query_start,
1019        }
1020    }
1021
1022    /// Returns true if the index refers to a memtable.
1023    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1024        self.input.num_memtables() > index.index
1025    }
1026
1027    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1028        !self.is_mem_range_index(index)
1029            && index.index < self.input.num_files() + self.input.num_memtables()
1030    }
1031
1032    /// Retrieves the partition ranges.
1033    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1034        self.ranges
1035            .iter()
1036            .enumerate()
1037            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1038            .collect()
1039    }
1040
1041    /// Format the context for explain.
1042    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1043        let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
1044        for range_meta in &self.ranges {
1045            for idx in &range_meta.row_group_indices {
1046                if self.is_mem_range_index(*idx) {
1047                    num_mem_ranges += 1;
1048                } else {
1049                    num_file_ranges += 1;
1050                }
1051            }
1052        }
1053        if verbose {
1054            write!(f, "{{")?;
1055        }
1056        write!(
1057            f,
1058            "\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
1059            self.ranges.len(),
1060            num_mem_ranges,
1061            self.input.num_files(),
1062            num_file_ranges,
1063        )?;
1064        if let Some(selector) = &self.input.series_row_selector {
1065            write!(f, ", \"selector\":\"{}\"", selector)?;
1066        }
1067        if let Some(distribution) = &self.input.distribution {
1068            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1069        }
1070
1071        if verbose {
1072            self.format_verbose_content(f)?;
1073        }
1074
1075        Ok(())
1076    }
1077
1078    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1079        struct FileWrapper<'a> {
1080            file: &'a FileHandle,
1081        }
1082
1083        impl fmt::Debug for FileWrapper<'_> {
1084            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1085                let (start, end) = self.file.time_range();
1086                write!(
1087                    f,
1088                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1089                    self.file.file_id(),
1090                    start.value(),
1091                    start.unit(),
1092                    end.value(),
1093                    end.unit(),
1094                    self.file.num_rows(),
1095                    self.file.size(),
1096                    self.file.index_size()
1097                )
1098            }
1099        }
1100
1101        struct InputWrapper<'a> {
1102            input: &'a ScanInput,
1103        }
1104
1105        impl fmt::Debug for InputWrapper<'_> {
1106            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1107                let output_schema = self.input.mapper.output_schema();
1108                if !output_schema.is_empty() {
1109                    let names: Vec<_> = output_schema
1110                        .column_schemas()
1111                        .iter()
1112                        .map(|col| &col.name)
1113                        .collect();
1114                    write!(f, ", \"projection\": {:?}", names)?;
1115                }
1116                if let Some(predicate) = &self.input.predicate.predicate() {
1117                    if !predicate.exprs().is_empty() {
1118                        let exprs: Vec<_> =
1119                            predicate.exprs().iter().map(|e| e.to_string()).collect();
1120                        write!(f, ", \"filters\": {:?}", exprs)?;
1121                    }
1122                }
1123                if !self.input.files.is_empty() {
1124                    write!(f, ", \"files\": ")?;
1125                    f.debug_list()
1126                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1127                        .finish()?;
1128                }
1129
1130                Ok(())
1131            }
1132        }
1133
1134        write!(f, "{:?}", InputWrapper { input: &self.input })
1135    }
1136}
1137
1138/// Predicates to evaluate.
1139/// It only keeps filters that [SimpleFilterEvaluator] supports.
1140#[derive(Clone, Default)]
1141pub struct PredicateGroup {
1142    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1143
1144    /// Table predicate for all logical exprs to evaluate.
1145    /// Parquet reader uses it to prune row groups.
1146    predicate: Option<Predicate>,
1147}
1148
1149impl PredicateGroup {
1150    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1151    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1152        let mut time_filters = Vec::with_capacity(exprs.len());
1153        // Columns in the expr.
1154        let mut columns = HashSet::new();
1155        for expr in exprs {
1156            columns.clear();
1157            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1158                continue;
1159            };
1160            time_filters.push(filter);
1161        }
1162        let time_filters = if time_filters.is_empty() {
1163            None
1164        } else {
1165            Some(Arc::new(time_filters))
1166        };
1167        let predicate = Predicate::new(exprs.to_vec());
1168
1169        Self {
1170            time_filters,
1171            predicate: Some(predicate),
1172        }
1173    }
1174
1175    /// Returns time filters.
1176    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1177        self.time_filters.clone()
1178    }
1179
1180    /// Returns predicate of all exprs.
1181    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1182        self.predicate.as_ref()
1183    }
1184
1185    fn expr_to_filter(
1186        expr: &Expr,
1187        metadata: &RegionMetadata,
1188        columns: &mut HashSet<Column>,
1189    ) -> Option<SimpleFilterEvaluator> {
1190        columns.clear();
1191        // `expr_to_columns` won't return error.
1192        // We still ignore these expressions for safety.
1193        expr_to_columns(expr, columns).ok()?;
1194        if columns.len() > 1 {
1195            // Simple filter doesn't support multiple columns.
1196            return None;
1197        }
1198        let column = columns.iter().next()?;
1199        let column_meta = metadata.column_by_name(&column.name)?;
1200        if column_meta.semantic_type == SemanticType::Timestamp {
1201            SimpleFilterEvaluator::try_new(expr)
1202        } else {
1203            None
1204        }
1205    }
1206}