mito2/read/
scan_region.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Scans a region according to the scan request.
16
17use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::{OptionExt as _, ResultExt};
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40    ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67    BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78/// Parallel scan channel size for flat format.
79const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83/// A scanner scans a region and returns a [SendableRecordBatchStream].
84pub(crate) enum Scanner {
85    /// Sequential scan.
86    Seq(SeqScan),
87    /// Unordered scan.
88    Unordered(UnorderedScan),
89    /// Per-series scan.
90    Series(SeriesScan),
91}
92
93impl Scanner {
94    /// Returns a [SendableRecordBatchStream] to retrieve scan results from all partitions.
95    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96    pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97        match self {
98            Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99            Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100            Scanner::Series(series_scan) => series_scan.build_stream().await,
101        }
102    }
103
104    /// Create a stream of [`Batch`] by this scanner.
105    pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106        match self {
107            Scanner::Seq(x) => x.scan_all_partitions(),
108            Scanner::Unordered(x) => x.scan_all_partitions(),
109            Scanner::Series(x) => x.scan_all_partitions(),
110        }
111    }
112}
113
114#[cfg(test)]
115impl Scanner {
116    /// Returns number of files to scan.
117    pub(crate) fn num_files(&self) -> usize {
118        match self {
119            Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121            Scanner::Series(series_scan) => series_scan.input().num_files(),
122        }
123    }
124
125    /// Returns number of memtables to scan.
126    pub(crate) fn num_memtables(&self) -> usize {
127        match self {
128            Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129            Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130            Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131        }
132    }
133
134    /// Returns SST file ids to scan.
135    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136        match self {
137            Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138            Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139            Scanner::Series(series_scan) => series_scan.input().file_ids(),
140        }
141    }
142
143    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144        match self {
145            Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146            Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147            Scanner::Series(series_scan) => series_scan.input().index_ids(),
148        }
149    }
150
151    /// Sets the target partitions for the scanner. It can controls the parallelism of the scanner.
152    pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153        use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155        let request = PrepareRequest::default().with_target_partitions(target_partitions);
156        match self {
157            Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158            Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159            Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160        }
161    }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165/// Helper to scans a region by [ScanRequest].
166///
167/// [ScanRegion] collects SSTs and memtables to scan without actually reading them. It
168/// creates a [Scanner] to actually scan these targets in [Scanner::scan()].
169///
170/// ```mermaid
171/// classDiagram
172/// class ScanRegion {
173///     -VersionRef version
174///     -ScanRequest request
175///     ~scanner() Scanner
176///     ~seq_scan() SeqScan
177/// }
178/// class Scanner {
179///     <<enumeration>>
180///     SeqScan
181///     UnorderedScan
182///     +scan() SendableRecordBatchStream
183/// }
184/// class SeqScan {
185///     -ScanInput input
186///     +build() SendableRecordBatchStream
187/// }
188/// class UnorderedScan {
189///     -ScanInput input
190///     +build() SendableRecordBatchStream
191/// }
192/// class ScanInput {
193///     -ProjectionMapper mapper
194///     -Option~TimeRange~ time_range
195///     -Option~Predicate~ predicate
196///     -Vec~MemtableRef~ memtables
197///     -Vec~FileHandle~ files
198/// }
199/// class ProjectionMapper {
200///     ~output_schema() SchemaRef
201///     ~convert(Batch) RecordBatch
202/// }
203/// ScanRegion -- Scanner
204/// ScanRegion o-- ScanRequest
205/// Scanner o-- SeqScan
206/// Scanner o-- UnorderedScan
207/// SeqScan o-- ScanInput
208/// UnorderedScan o-- ScanInput
209/// Scanner -- SendableRecordBatchStream
210/// ScanInput o-- ProjectionMapper
211/// SeqScan -- SendableRecordBatchStream
212/// UnorderedScan -- SendableRecordBatchStream
213/// ```
214pub(crate) struct ScanRegion {
215    /// Version of the region at scan.
216    version: VersionRef,
217    /// Access layer of the region.
218    access_layer: AccessLayerRef,
219    /// Scan request.
220    request: ScanRequest,
221    /// Cache.
222    cache_strategy: CacheStrategy,
223    /// Capacity of the channel to send data from parallel scan tasks to the main task.
224    parallel_scan_channel_size: usize,
225    /// Maximum number of SST files to scan concurrently.
226    max_concurrent_scan_files: usize,
227    /// Whether to ignore inverted index.
228    ignore_inverted_index: bool,
229    /// Whether to ignore fulltext index.
230    ignore_fulltext_index: bool,
231    /// Whether to ignore bloom filter.
232    ignore_bloom_filter: bool,
233    /// Start time of the scan task.
234    start_time: Option<Instant>,
235    /// Whether to filter out the deleted rows.
236    /// Usually true for normal read, and false for scan for compaction.
237    filter_deleted: bool,
238    #[cfg(feature = "enterprise")]
239    extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243    /// Creates a [ScanRegion].
244    pub(crate) fn new(
245        version: VersionRef,
246        access_layer: AccessLayerRef,
247        request: ScanRequest,
248        cache_strategy: CacheStrategy,
249    ) -> ScanRegion {
250        ScanRegion {
251            version,
252            access_layer,
253            request,
254            cache_strategy,
255            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257            ignore_inverted_index: false,
258            ignore_fulltext_index: false,
259            ignore_bloom_filter: false,
260            start_time: None,
261            filter_deleted: true,
262            #[cfg(feature = "enterprise")]
263            extension_range_provider: None,
264        }
265    }
266
267    /// Sets parallel scan task channel size.
268    #[must_use]
269    pub(crate) fn with_parallel_scan_channel_size(
270        mut self,
271        parallel_scan_channel_size: usize,
272    ) -> Self {
273        self.parallel_scan_channel_size = parallel_scan_channel_size;
274        self
275    }
276
277    /// Sets maximum number of SST files to scan concurrently.
278    #[must_use]
279    pub(crate) fn with_max_concurrent_scan_files(
280        mut self,
281        max_concurrent_scan_files: usize,
282    ) -> Self {
283        self.max_concurrent_scan_files = max_concurrent_scan_files;
284        self
285    }
286
287    /// Sets whether to ignore inverted index.
288    #[must_use]
289    pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290        self.ignore_inverted_index = ignore;
291        self
292    }
293
294    /// Sets whether to ignore fulltext index.
295    #[must_use]
296    pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297        self.ignore_fulltext_index = ignore;
298        self
299    }
300
301    /// Sets whether to ignore bloom filter.
302    #[must_use]
303    pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304        self.ignore_bloom_filter = ignore;
305        self
306    }
307
308    #[must_use]
309    pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310        self.start_time = Some(now);
311        self
312    }
313
314    pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315        self.filter_deleted = filter_deleted;
316    }
317
318    #[cfg(feature = "enterprise")]
319    pub(crate) fn set_extension_range_provider(
320        &mut self,
321        extension_range_provider: BoxedExtensionRangeProvider,
322    ) {
323        self.extension_range_provider = Some(extension_range_provider);
324    }
325
326    /// Returns a [Scanner] to scan the region.
327    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328    pub(crate) async fn scanner(self) -> Result<Scanner> {
329        if self.use_series_scan() {
330            self.series_scan().await.map(Scanner::Series)
331        } else if self.use_unordered_scan() {
332            // If table is append only and there is no series row selector, we use unordered scan in query.
333            // We still use seq scan in compaction.
334            self.unordered_scan().await.map(Scanner::Unordered)
335        } else {
336            self.seq_scan().await.map(Scanner::Seq)
337        }
338    }
339
340    /// Returns a [RegionScanner] to scan the region.
341    #[tracing::instrument(
342        level = tracing::Level::DEBUG,
343        skip_all,
344        fields(region_id = %self.region_id())
345    )]
346    pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347        if self.use_series_scan() {
348            self.series_scan()
349                .await
350                .map(|scanner| Box::new(scanner) as _)
351        } else if self.use_unordered_scan() {
352            self.unordered_scan()
353                .await
354                .map(|scanner| Box::new(scanner) as _)
355        } else {
356            self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357        }
358    }
359
360    /// Scan sequentially.
361    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362    pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363        let input = self.scan_input().await?.with_compaction(false);
364        Ok(SeqScan::new(input))
365    }
366
367    /// Unordered scan.
368    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369    pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370        let input = self.scan_input().await?;
371        Ok(UnorderedScan::new(input))
372    }
373
374    /// Scans by series.
375    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376    pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377        let input = self.scan_input().await?;
378        Ok(SeriesScan::new(input))
379    }
380
381    /// Returns true if the region can use unordered scan for current request.
382    fn use_unordered_scan(&self) -> bool {
383        // We use unordered scan when:
384        // 1. The region is in append mode.
385        // 2. There is no series row selector.
386        // 3. The required distribution is None or TimeSeriesDistribution::TimeWindowed.
387        //
388        // We still use seq scan in compaction.
389        self.version.options.append_mode
390            && self.request.series_row_selector.is_none()
391            && (self.request.distribution.is_none()
392                || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393    }
394
395    /// Returns true if the region can use series scan for current request.
396    fn use_series_scan(&self) -> bool {
397        self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398    }
399
400    /// Returns true if the region use flat format.
401    fn use_flat_format(&self) -> bool {
402        self.request.force_flat_format
403            || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
404    }
405
406    /// Creates a scan input.
407    #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
408    async fn scan_input(mut self) -> Result<ScanInput> {
409        let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
410        let time_range = self.build_time_range_predicate();
411        let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
412        let flat_format = self.use_flat_format();
413
414        let read_column_ids = match &self.request.projection {
415            Some(p) => self.build_read_column_ids(p, &predicate)?,
416            None => self
417                .version
418                .metadata
419                .column_metadatas
420                .iter()
421                .map(|col| col.column_id)
422                .collect(),
423        };
424
425        // The mapper always computes projected column ids as the schema of SSTs may change.
426        let mapper = match &self.request.projection {
427            Some(p) => ProjectionMapper::new_with_read_columns(
428                &self.version.metadata,
429                p.iter().copied(),
430                flat_format,
431                read_column_ids.clone(),
432            )?,
433            None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
434        };
435
436        let ssts = &self.version.ssts;
437        let mut files = Vec::new();
438        for level in ssts.levels() {
439            for file in level.files.values() {
440                let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
441                    (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
442                    // If the file's sequence is None (or actually is zero), it could mean the file
443                    // is generated and added to the region "directly". In this case, its data should
444                    // be considered as fresh as the memtable. So its sequence is treated greater than
445                    // the min_sequence, whatever the value of min_sequence is. Hence the default
446                    // "true" in this arm.
447                    (Some(_), None) => true,
448                    (None, _) => true,
449                };
450
451                // Finds SST files in range.
452                if exceed_min_sequence && file_in_range(file, &time_range) {
453                    files.push(file.clone());
454                }
455                // There is no need to check and prune for file's sequence here as the sequence number is usually very new,
456                // unless the timing is too good, or the sequence number wouldn't be in file.
457                // and the batch will be filtered out by tree reader anyway.
458            }
459        }
460
461        let memtables = self.version.memtables.list_memtables();
462        // Skip empty memtables and memtables out of time range.
463        let mut mem_range_builders = Vec::new();
464        let filter_mode = pre_filter_mode(
465            self.version.options.append_mode,
466            self.version.options.merge_mode(),
467        );
468
469        for m in memtables {
470            // check if memtable is empty by reading stats.
471            let Some((start, end)) = m.stats().time_range() else {
472                continue;
473            };
474            // The time range of the memtable is inclusive.
475            let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
476            if !memtable_range.intersects(&time_range) {
477                continue;
478            }
479            let ranges_in_memtable = m.ranges(
480                Some(read_column_ids.as_slice()),
481                RangesOptions::default()
482                    .with_predicate(predicate.clone())
483                    .with_sequence(SequenceRange::new(
484                        self.request.memtable_min_sequence,
485                        self.request.memtable_max_sequence,
486                    ))
487                    .with_pre_filter_mode(filter_mode),
488            )?;
489            mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
490                let stats = v.stats().clone();
491                MemRangeBuilder::new(v, stats)
492            }));
493        }
494
495        let region_id = self.region_id();
496        debug!(
497            "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
498            region_id,
499            self.request,
500            time_range,
501            mem_range_builders.len(),
502            files.len(),
503            self.version.options.append_mode,
504            flat_format,
505        );
506
507        let (non_field_filters, field_filters) = self.partition_by_field_filters();
508        let inverted_index_appliers = [
509            self.build_invereted_index_applier(&non_field_filters),
510            self.build_invereted_index_applier(&field_filters),
511        ];
512        let bloom_filter_appliers = [
513            self.build_bloom_filter_applier(&non_field_filters),
514            self.build_bloom_filter_applier(&field_filters),
515        ];
516        let fulltext_index_appliers = [
517            self.build_fulltext_index_applier(&non_field_filters),
518            self.build_fulltext_index_applier(&field_filters),
519        ];
520        #[cfg(feature = "vector_index")]
521        let vector_index_applier = self.build_vector_index_applier();
522        #[cfg(feature = "vector_index")]
523        let vector_index_k = self.request.vector_search.as_ref().map(|search| {
524            if self.request.filters.is_empty() {
525                search.k
526            } else {
527                search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
528            }
529        });
530
531        if flat_format {
532            // The batch is already large enough so we use a small channel size here.
533            self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
534        }
535
536        let input = ScanInput::new(self.access_layer, mapper)
537            .with_time_range(Some(time_range))
538            .with_predicate(predicate)
539            .with_memtables(mem_range_builders)
540            .with_files(files)
541            .with_cache(self.cache_strategy)
542            .with_inverted_index_appliers(inverted_index_appliers)
543            .with_bloom_filter_index_appliers(bloom_filter_appliers)
544            .with_fulltext_index_appliers(fulltext_index_appliers)
545            .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
546            .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
547            .with_start_time(self.start_time)
548            .with_append_mode(self.version.options.append_mode)
549            .with_filter_deleted(self.filter_deleted)
550            .with_merge_mode(self.version.options.merge_mode())
551            .with_series_row_selector(self.request.series_row_selector)
552            .with_distribution(self.request.distribution)
553            .with_flat_format(flat_format);
554        #[cfg(feature = "vector_index")]
555        let input = input
556            .with_vector_index_applier(vector_index_applier)
557            .with_vector_index_k(vector_index_k);
558
559        #[cfg(feature = "enterprise")]
560        let input = if let Some(provider) = self.extension_range_provider {
561            let ranges = provider
562                .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
563                .await?;
564            debug!("Find extension ranges: {ranges:?}");
565            input.with_extension_ranges(ranges)
566        } else {
567            input
568        };
569        Ok(input)
570    }
571
572    fn region_id(&self) -> RegionId {
573        self.version.metadata.region_id
574    }
575
576    /// Build time range predicate from filters.
577    fn build_time_range_predicate(&self) -> TimestampRange {
578        let time_index = self.version.metadata.time_index_column();
579        let unit = time_index
580            .column_schema
581            .data_type
582            .as_timestamp()
583            .expect("Time index must have timestamp-compatible type")
584            .unit();
585        build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
586    }
587
588    /// Return all columns id to read according to the projection and filters.
589    fn build_read_column_ids(
590        &self,
591        projection: &[usize],
592        predicate: &PredicateGroup,
593    ) -> Result<Vec<ColumnId>> {
594        let metadata = &self.version.metadata;
595        // use Vec for read_column_ids to keep the order of columns.
596        let mut read_column_ids = Vec::new();
597        let mut seen = HashSet::new();
598
599        for idx in projection {
600            let column =
601                metadata
602                    .column_metadatas
603                    .get(*idx)
604                    .with_context(|| InvalidRequestSnafu {
605                        region_id: metadata.region_id,
606                        reason: format!("projection index {} is out of bound", idx),
607                    })?;
608            seen.insert(column.column_id);
609            // keep the projection order
610            read_column_ids.push(column.column_id);
611        }
612
613        if projection.is_empty() {
614            let time_index = metadata.time_index_column().column_id;
615            if seen.insert(time_index) {
616                read_column_ids.push(time_index);
617            }
618        }
619
620        let mut extra_names = HashSet::new();
621        let mut columns = HashSet::new();
622
623        for expr in &self.request.filters {
624            columns.clear();
625            if expr_to_columns(expr, &mut columns).is_err() {
626                continue;
627            }
628            extra_names.extend(columns.iter().map(|column| column.name.clone()));
629        }
630
631        if let Some(expr) = predicate.region_partition_expr() {
632            expr.collect_column_names(&mut extra_names);
633        }
634
635        if !extra_names.is_empty() {
636            for column in &metadata.column_metadatas {
637                if extra_names.contains(column.column_schema.name.as_str())
638                    && !seen.contains(&column.column_id)
639                {
640                    read_column_ids.push(column.column_id);
641                }
642                extra_names.remove(column.column_schema.name.as_str());
643            }
644            if !extra_names.is_empty() {
645                warn!(
646                    "Some columns in filters are not found in region {}: {:?}",
647                    metadata.region_id, extra_names
648                );
649            }
650        }
651        Ok(read_column_ids)
652    }
653
654    /// Partitions filters into two groups: non-field filters and field filters.
655    /// Returns `(non_field_filters, field_filters)`.
656    fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
657        let field_columns = self
658            .version
659            .metadata
660            .field_columns()
661            .map(|col| &col.column_schema.name)
662            .collect::<HashSet<_>>();
663
664        let mut columns = HashSet::new();
665
666        self.request.filters.iter().cloned().partition(|expr| {
667            columns.clear();
668            // `expr_to_columns` won't return error.
669            if expr_to_columns(expr, &mut columns).is_err() {
670                // If we can't extract columns, treat it as non-field filter
671                return true;
672            }
673            // Return true for non-field filters (partition puts true cases in first vec)
674            !columns
675                .iter()
676                .any(|column| field_columns.contains(&column.name))
677        })
678    }
679
680    /// Use the latest schema to build the inverted index applier.
681    fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
682        if self.ignore_inverted_index {
683            return None;
684        }
685
686        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
687        let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
688
689        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
690
691        InvertedIndexApplierBuilder::new(
692            self.access_layer.table_dir().to_string(),
693            self.access_layer.path_type(),
694            self.access_layer.object_store().clone(),
695            self.version.metadata.as_ref(),
696            self.version.metadata.inverted_indexed_column_ids(
697                self.version
698                    .options
699                    .index_options
700                    .inverted_index
701                    .ignore_column_ids
702                    .iter(),
703            ),
704            self.access_layer.puffin_manager_factory().clone(),
705        )
706        .with_file_cache(file_cache)
707        .with_inverted_index_cache(inverted_index_cache)
708        .with_puffin_metadata_cache(puffin_metadata_cache)
709        .build(filters)
710        .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
711        .ok()
712        .flatten()
713        .map(Arc::new)
714    }
715
716    /// Use the latest schema to build the bloom filter index applier.
717    fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
718        if self.ignore_bloom_filter {
719            return None;
720        }
721
722        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
723        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
724        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
725
726        BloomFilterIndexApplierBuilder::new(
727            self.access_layer.table_dir().to_string(),
728            self.access_layer.path_type(),
729            self.access_layer.object_store().clone(),
730            self.version.metadata.as_ref(),
731            self.access_layer.puffin_manager_factory().clone(),
732        )
733        .with_file_cache(file_cache)
734        .with_bloom_filter_index_cache(bloom_filter_index_cache)
735        .with_puffin_metadata_cache(puffin_metadata_cache)
736        .build(filters)
737        .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
738        .ok()
739        .flatten()
740        .map(Arc::new)
741    }
742
743    /// Use the latest schema to build the fulltext index applier.
744    fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
745        if self.ignore_fulltext_index {
746            return None;
747        }
748
749        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
750        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
751        let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
752        FulltextIndexApplierBuilder::new(
753            self.access_layer.table_dir().to_string(),
754            self.access_layer.path_type(),
755            self.access_layer.object_store().clone(),
756            self.access_layer.puffin_manager_factory().clone(),
757            self.version.metadata.as_ref(),
758        )
759        .with_file_cache(file_cache)
760        .with_puffin_metadata_cache(puffin_metadata_cache)
761        .with_bloom_filter_cache(bloom_filter_index_cache)
762        .build(filters)
763        .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
764        .ok()
765        .flatten()
766        .map(Arc::new)
767    }
768
769    /// Build the vector index applier from vector search request.
770    #[cfg(feature = "vector_index")]
771    fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
772        let vector_search = self.request.vector_search.as_ref()?;
773
774        let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
775        let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
776        let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
777
778        let applier = VectorIndexApplier::new(
779            self.access_layer.table_dir().to_string(),
780            self.access_layer.path_type(),
781            self.access_layer.object_store().clone(),
782            self.access_layer.puffin_manager_factory().clone(),
783            vector_search.column_id,
784            vector_search.query_vector.clone(),
785            vector_search.metric,
786        )
787        .with_file_cache(file_cache)
788        .with_puffin_metadata_cache(puffin_metadata_cache)
789        .with_vector_index_cache(vector_index_cache);
790
791        Some(Arc::new(applier))
792    }
793}
794
795/// Returns true if the time range of a SST `file` matches the `predicate`.
796fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
797    if predicate == &TimestampRange::min_to_max() {
798        return true;
799    }
800    // end timestamp of a SST is inclusive.
801    let (start, end) = file.time_range();
802    let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
803    file_ts_range.intersects(predicate)
804}
805
806/// Common input for different scanners.
807pub struct ScanInput {
808    /// Region SST access layer.
809    access_layer: AccessLayerRef,
810    /// Maps projected Batches to RecordBatches.
811    pub(crate) mapper: Arc<ProjectionMapper>,
812    /// Column ids to read from memtables and SSTs.
813    /// Notice this is different from the columns in `mapper` which are projected columns.
814    /// But this read columns might also include non-projected columns needed for filtering.
815    pub(crate) read_column_ids: Vec<ColumnId>,
816    /// Time range filter for time index.
817    time_range: Option<TimestampRange>,
818    /// Predicate to push down.
819    pub(crate) predicate: PredicateGroup,
820    /// Region partition expr applied at read time.
821    region_partition_expr: Option<PartitionExpr>,
822    /// Memtable range builders for memtables in the time range..
823    pub(crate) memtables: Vec<MemRangeBuilder>,
824    /// Handles to SST files to scan.
825    pub(crate) files: Vec<FileHandle>,
826    /// Cache.
827    pub(crate) cache_strategy: CacheStrategy,
828    /// Ignores file not found error.
829    ignore_file_not_found: bool,
830    /// Capacity of the channel to send data from parallel scan tasks to the main task.
831    pub(crate) parallel_scan_channel_size: usize,
832    /// Maximum number of SST files to scan concurrently.
833    pub(crate) max_concurrent_scan_files: usize,
834    /// Index appliers.
835    inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
836    bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
837    fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
838    /// Vector index applier for KNN search.
839    #[cfg(feature = "vector_index")]
840    pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
841    /// Over-fetched k for vector index scan.
842    #[cfg(feature = "vector_index")]
843    pub(crate) vector_index_k: Option<usize>,
844    /// Start time of the query.
845    pub(crate) query_start: Option<Instant>,
846    /// The region is using append mode.
847    pub(crate) append_mode: bool,
848    /// Whether to remove deletion markers.
849    pub(crate) filter_deleted: bool,
850    /// Mode to merge duplicate rows.
851    pub(crate) merge_mode: MergeMode,
852    /// Hint to select rows from time series.
853    pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
854    /// Hint for the required distribution of the scanner.
855    pub(crate) distribution: Option<TimeSeriesDistribution>,
856    /// Whether to use flat format.
857    pub(crate) flat_format: bool,
858    /// Whether this scan is for compaction.
859    pub(crate) compaction: bool,
860    #[cfg(feature = "enterprise")]
861    extension_ranges: Vec<BoxedExtensionRange>,
862}
863
864impl ScanInput {
865    /// Creates a new [ScanInput].
866    #[must_use]
867    pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
868        ScanInput {
869            access_layer,
870            read_column_ids: mapper.column_ids().to_vec(),
871            mapper: Arc::new(mapper),
872            time_range: None,
873            predicate: PredicateGroup::default(),
874            region_partition_expr: None,
875            memtables: Vec::new(),
876            files: Vec::new(),
877            cache_strategy: CacheStrategy::Disabled,
878            ignore_file_not_found: false,
879            parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
880            max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
881            inverted_index_appliers: [None, None],
882            bloom_filter_index_appliers: [None, None],
883            fulltext_index_appliers: [None, None],
884            #[cfg(feature = "vector_index")]
885            vector_index_applier: None,
886            #[cfg(feature = "vector_index")]
887            vector_index_k: None,
888            query_start: None,
889            append_mode: false,
890            filter_deleted: true,
891            merge_mode: MergeMode::default(),
892            series_row_selector: None,
893            distribution: None,
894            flat_format: false,
895            compaction: false,
896            #[cfg(feature = "enterprise")]
897            extension_ranges: Vec::new(),
898        }
899    }
900
901    /// Sets time range filter for time index.
902    #[must_use]
903    pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
904        self.time_range = time_range;
905        self
906    }
907
908    /// Sets predicate to push down.
909    #[must_use]
910    pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
911        self.region_partition_expr = predicate.region_partition_expr().cloned();
912        self.predicate = predicate;
913        self
914    }
915
916    /// Sets memtable range builders.
917    #[must_use]
918    pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
919        self.memtables = memtables;
920        self
921    }
922
923    /// Sets files to read.
924    #[must_use]
925    pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
926        self.files = files;
927        self
928    }
929
930    /// Sets cache for this query.
931    #[must_use]
932    pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
933        self.cache_strategy = cache;
934        self
935    }
936
937    /// Ignores file not found error.
938    #[must_use]
939    pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
940        self.ignore_file_not_found = ignore;
941        self
942    }
943
944    /// Sets scan task channel size.
945    #[must_use]
946    pub(crate) fn with_parallel_scan_channel_size(
947        mut self,
948        parallel_scan_channel_size: usize,
949    ) -> Self {
950        self.parallel_scan_channel_size = parallel_scan_channel_size;
951        self
952    }
953
954    /// Sets maximum number of SST files to scan concurrently.
955    #[must_use]
956    pub(crate) fn with_max_concurrent_scan_files(
957        mut self,
958        max_concurrent_scan_files: usize,
959    ) -> Self {
960        self.max_concurrent_scan_files = max_concurrent_scan_files;
961        self
962    }
963
964    /// Sets inverted index appliers.
965    #[must_use]
966    pub(crate) fn with_inverted_index_appliers(
967        mut self,
968        appliers: [Option<InvertedIndexApplierRef>; 2],
969    ) -> Self {
970        self.inverted_index_appliers = appliers;
971        self
972    }
973
974    /// Sets bloom filter appliers.
975    #[must_use]
976    pub(crate) fn with_bloom_filter_index_appliers(
977        mut self,
978        appliers: [Option<BloomFilterIndexApplierRef>; 2],
979    ) -> Self {
980        self.bloom_filter_index_appliers = appliers;
981        self
982    }
983
984    /// Sets fulltext index appliers.
985    #[must_use]
986    pub(crate) fn with_fulltext_index_appliers(
987        mut self,
988        appliers: [Option<FulltextIndexApplierRef>; 2],
989    ) -> Self {
990        self.fulltext_index_appliers = appliers;
991        self
992    }
993
994    /// Sets vector index applier for KNN search.
995    #[cfg(feature = "vector_index")]
996    #[must_use]
997    pub(crate) fn with_vector_index_applier(
998        mut self,
999        applier: Option<VectorIndexApplierRef>,
1000    ) -> Self {
1001        self.vector_index_applier = applier;
1002        self
1003    }
1004
1005    /// Sets over-fetched k for vector index scan.
1006    #[cfg(feature = "vector_index")]
1007    #[must_use]
1008    pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1009        self.vector_index_k = k;
1010        self
1011    }
1012
1013    /// Sets start time of the query.
1014    #[must_use]
1015    pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1016        self.query_start = now;
1017        self
1018    }
1019
1020    #[must_use]
1021    pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1022        self.append_mode = is_append_mode;
1023        self
1024    }
1025
1026    /// Sets whether to remove deletion markers during scan.
1027    #[must_use]
1028    pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1029        self.filter_deleted = filter_deleted;
1030        self
1031    }
1032
1033    /// Sets the merge mode.
1034    #[must_use]
1035    pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1036        self.merge_mode = merge_mode;
1037        self
1038    }
1039
1040    /// Sets the distribution hint.
1041    #[must_use]
1042    pub(crate) fn with_distribution(
1043        mut self,
1044        distribution: Option<TimeSeriesDistribution>,
1045    ) -> Self {
1046        self.distribution = distribution;
1047        self
1048    }
1049
1050    /// Sets the time series row selector.
1051    #[must_use]
1052    pub(crate) fn with_series_row_selector(
1053        mut self,
1054        series_row_selector: Option<TimeSeriesRowSelector>,
1055    ) -> Self {
1056        self.series_row_selector = series_row_selector;
1057        self
1058    }
1059
1060    /// Sets whether to use flat format.
1061    #[must_use]
1062    pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1063        self.flat_format = flat_format;
1064        self
1065    }
1066
1067    /// Sets whether this scan is for compaction.
1068    #[must_use]
1069    pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1070        self.compaction = compaction;
1071        self
1072    }
1073
1074    /// Scans sources in parallel.
1075    ///
1076    /// # Panics if the input doesn't allow parallel scan.
1077    #[tracing::instrument(
1078        skip(self, sources, semaphore),
1079        fields(
1080            region_id = %self.region_metadata().region_id,
1081            source_count = sources.len()
1082        )
1083    )]
1084    pub(crate) fn create_parallel_sources(
1085        &self,
1086        sources: Vec<Source>,
1087        semaphore: Arc<Semaphore>,
1088    ) -> Result<Vec<Source>> {
1089        if sources.len() <= 1 {
1090            return Ok(sources);
1091        }
1092
1093        // Spawn a task for each source.
1094        let sources = sources
1095            .into_iter()
1096            .map(|source| {
1097                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1098                self.spawn_scan_task(source, semaphore.clone(), sender);
1099                let stream = Box::pin(ReceiverStream::new(receiver));
1100                Source::Stream(stream)
1101            })
1102            .collect();
1103        Ok(sources)
1104    }
1105
1106    /// Builds memtable ranges to scan by `index`.
1107    pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1108        let memtable = &self.memtables[index.index];
1109        let mut ranges = SmallVec::new();
1110        memtable.build_ranges(index.row_group_index, &mut ranges);
1111        ranges
1112    }
1113
1114    fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1115        if self.should_skip_region_partition(file) {
1116            self.predicate.predicate_without_region().cloned()
1117        } else {
1118            self.predicate.predicate().cloned()
1119        }
1120    }
1121
1122    fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1123        match (
1124            self.region_partition_expr.as_ref(),
1125            file.meta_ref().partition_expr.as_ref(),
1126        ) {
1127            (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1128            _ => false,
1129        }
1130    }
1131
1132    /// Prunes a file to scan and returns the builder to build readers.
1133    #[tracing::instrument(
1134        skip_all,
1135        fields(
1136            region_id = %self.region_metadata().region_id,
1137            file_id = %file.file_id()
1138        )
1139    )]
1140    pub async fn prune_file(
1141        &self,
1142        file: &FileHandle,
1143        reader_metrics: &mut ReaderMetrics,
1144    ) -> Result<FileRangeBuilder> {
1145        let predicate = self.predicate_for_file(file);
1146        let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1147        let decode_pk_values = !self.compaction && self.mapper.has_tags();
1148        let reader = self
1149            .access_layer
1150            .read_sst(file.clone())
1151            .predicate(predicate)
1152            .projection(Some(self.read_column_ids.clone()))
1153            .cache(self.cache_strategy.clone())
1154            .inverted_index_appliers(self.inverted_index_appliers.clone())
1155            .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1156            .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1157        #[cfg(feature = "vector_index")]
1158        let reader = {
1159            let mut reader = reader;
1160            reader =
1161                reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1162            reader
1163        };
1164        let res = reader
1165            .expected_metadata(Some(self.mapper.metadata().clone()))
1166            .flat_format(self.flat_format)
1167            .compaction(self.compaction)
1168            .pre_filter_mode(filter_mode)
1169            .decode_primary_key_values(decode_pk_values)
1170            .build_reader_input(reader_metrics)
1171            .await;
1172        let (mut file_range_ctx, selection) = match res {
1173            Ok(x) => x,
1174            Err(e) => {
1175                if e.is_object_not_found() && self.ignore_file_not_found {
1176                    error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1177                    return Ok(FileRangeBuilder::default());
1178                } else {
1179                    return Err(e);
1180                }
1181            }
1182        };
1183
1184        let need_compat = !compat::has_same_columns_and_pk_encoding(
1185            self.mapper.metadata(),
1186            file_range_ctx.read_format().metadata(),
1187        );
1188        if need_compat {
1189            // They have different schema. We need to adapt the batch first so the
1190            // mapper can convert it.
1191            let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1192                let mapper = self.mapper.as_flat().unwrap();
1193                FlatCompatBatch::try_new(
1194                    mapper,
1195                    flat_format.metadata(),
1196                    flat_format.format_projection(),
1197                    self.compaction,
1198                )?
1199                .map(CompatBatch::Flat)
1200            } else {
1201                let compact_batch = PrimaryKeyCompatBatch::new(
1202                    &self.mapper,
1203                    file_range_ctx.read_format().metadata().clone(),
1204                )?;
1205                Some(CompatBatch::PrimaryKey(compact_batch))
1206            };
1207            file_range_ctx.set_compat_batch(compat);
1208        }
1209        Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1210    }
1211
1212    /// Scans the input source in another task and sends batches to the sender.
1213    #[tracing::instrument(
1214        skip(self, input, semaphore, sender),
1215        fields(region_id = %self.region_metadata().region_id)
1216    )]
1217    pub(crate) fn spawn_scan_task(
1218        &self,
1219        mut input: Source,
1220        semaphore: Arc<Semaphore>,
1221        sender: mpsc::Sender<Result<Batch>>,
1222    ) {
1223        let region_id = self.region_metadata().region_id;
1224        let span = tracing::info_span!(
1225            "ScanInput::parallel_scan_task",
1226            region_id = %region_id,
1227            stream_kind = "batch"
1228        );
1229        common_runtime::spawn_global(
1230            async move {
1231                loop {
1232                    // We release the permit before sending result to avoid the task waiting on
1233                    // the channel with the permit held.
1234                    let maybe_batch = {
1235                        // Safety: We never close the semaphore.
1236                        let _permit = semaphore.acquire().await.unwrap();
1237                        input.next_batch().await
1238                    };
1239                    match maybe_batch {
1240                        Ok(Some(batch)) => {
1241                            let _ = sender.send(Ok(batch)).await;
1242                        }
1243                        Ok(None) => break,
1244                        Err(e) => {
1245                            let _ = sender.send(Err(e)).await;
1246                            break;
1247                        }
1248                    }
1249                }
1250            }
1251            .instrument(span),
1252        );
1253    }
1254
1255    /// Scans flat sources (RecordBatch streams) in parallel.
1256    ///
1257    /// # Panics if the input doesn't allow parallel scan.
1258    #[tracing::instrument(
1259        skip(self, sources, semaphore),
1260        fields(
1261            region_id = %self.region_metadata().region_id,
1262            source_count = sources.len()
1263        )
1264    )]
1265    pub(crate) fn create_parallel_flat_sources(
1266        &self,
1267        sources: Vec<BoxedRecordBatchStream>,
1268        semaphore: Arc<Semaphore>,
1269    ) -> Result<Vec<BoxedRecordBatchStream>> {
1270        if sources.len() <= 1 {
1271            return Ok(sources);
1272        }
1273
1274        // Spawn a task for each source.
1275        let sources = sources
1276            .into_iter()
1277            .map(|source| {
1278                let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1279                self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1280                let stream = Box::pin(ReceiverStream::new(receiver));
1281                Box::pin(stream) as _
1282            })
1283            .collect();
1284        Ok(sources)
1285    }
1286
1287    /// Spawns a task to scan a flat source (RecordBatch stream) asynchronously.
1288    #[tracing::instrument(
1289        skip(self, input, semaphore, sender),
1290        fields(region_id = %self.region_metadata().region_id)
1291    )]
1292    pub(crate) fn spawn_flat_scan_task(
1293        &self,
1294        mut input: BoxedRecordBatchStream,
1295        semaphore: Arc<Semaphore>,
1296        sender: mpsc::Sender<Result<RecordBatch>>,
1297    ) {
1298        let region_id = self.region_metadata().region_id;
1299        let span = tracing::info_span!(
1300            "ScanInput::parallel_scan_task",
1301            region_id = %region_id,
1302            stream_kind = "flat"
1303        );
1304        common_runtime::spawn_global(
1305            async move {
1306                loop {
1307                    // We release the permit before sending result to avoid the task waiting on
1308                    // the channel with the permit held.
1309                    let maybe_batch = {
1310                        // Safety: We never close the semaphore.
1311                        let _permit = semaphore.acquire().await.unwrap();
1312                        input.next().await
1313                    };
1314                    match maybe_batch {
1315                        Some(Ok(batch)) => {
1316                            let _ = sender.send(Ok(batch)).await;
1317                        }
1318                        Some(Err(e)) => {
1319                            let _ = sender.send(Err(e)).await;
1320                            break;
1321                        }
1322                        None => break,
1323                    }
1324                }
1325            }
1326            .instrument(span),
1327        );
1328    }
1329
1330    pub(crate) fn total_rows(&self) -> usize {
1331        let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1332        let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1333
1334        let rows = rows_in_files + rows_in_memtables;
1335        #[cfg(feature = "enterprise")]
1336        let rows = rows
1337            + self
1338                .extension_ranges
1339                .iter()
1340                .map(|x| x.num_rows())
1341                .sum::<u64>() as usize;
1342        rows
1343    }
1344
1345    pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1346        &self.predicate
1347    }
1348
1349    /// Returns number of memtables to scan.
1350    pub(crate) fn num_memtables(&self) -> usize {
1351        self.memtables.len()
1352    }
1353
1354    /// Returns number of SST files to scan.
1355    pub(crate) fn num_files(&self) -> usize {
1356        self.files.len()
1357    }
1358
1359    /// Gets the file handle from a row group index.
1360    pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1361        let file_index = index.index - self.num_memtables();
1362        &self.files[file_index]
1363    }
1364
1365    pub fn region_metadata(&self) -> &RegionMetadataRef {
1366        self.mapper.metadata()
1367    }
1368}
1369
1370#[cfg(feature = "enterprise")]
1371impl ScanInput {
1372    #[must_use]
1373    pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1374        Self {
1375            extension_ranges,
1376            ..self
1377        }
1378    }
1379
1380    #[cfg(feature = "enterprise")]
1381    pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1382        &self.extension_ranges
1383    }
1384
1385    /// Get a boxed [ExtensionRange] by the index in all ranges.
1386    #[cfg(feature = "enterprise")]
1387    pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1388        &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1389    }
1390}
1391
1392#[cfg(test)]
1393impl ScanInput {
1394    /// Returns SST file ids to scan.
1395    pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1396        self.files.iter().map(|file| file.file_id()).collect()
1397    }
1398
1399    pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1400        self.files.iter().map(|file| file.index_id()).collect()
1401    }
1402}
1403
1404fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1405    if append_mode {
1406        return PreFilterMode::All;
1407    }
1408
1409    match merge_mode {
1410        MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1411        MergeMode::LastNonNull => PreFilterMode::SkipFields,
1412    }
1413}
1414
1415/// Context shared by different streams from a scanner.
1416/// It contains the input and ranges to scan.
1417pub struct StreamContext {
1418    /// Input memtables and files.
1419    pub input: ScanInput,
1420    /// Metadata for partition ranges.
1421    pub(crate) ranges: Vec<RangeMeta>,
1422
1423    // Metrics:
1424    /// The start time of the query.
1425    pub(crate) query_start: Instant,
1426}
1427
1428impl StreamContext {
1429    /// Creates a new [StreamContext] for [SeqScan].
1430    pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1431        let query_start = input.query_start.unwrap_or_else(Instant::now);
1432        let ranges = RangeMeta::seq_scan_ranges(&input);
1433        READ_SST_COUNT.observe(input.num_files() as f64);
1434
1435        Self {
1436            input,
1437            ranges,
1438            query_start,
1439        }
1440    }
1441
1442    /// Creates a new [StreamContext] for [UnorderedScan].
1443    pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1444        let query_start = input.query_start.unwrap_or_else(Instant::now);
1445        let ranges = RangeMeta::unordered_scan_ranges(&input);
1446        READ_SST_COUNT.observe(input.num_files() as f64);
1447
1448        Self {
1449            input,
1450            ranges,
1451            query_start,
1452        }
1453    }
1454
1455    /// Returns true if the index refers to a memtable.
1456    pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1457        self.input.num_memtables() > index.index
1458    }
1459
1460    pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1461        !self.is_mem_range_index(index)
1462            && index.index < self.input.num_files() + self.input.num_memtables()
1463    }
1464
1465    /// Retrieves the partition ranges.
1466    pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1467        self.ranges
1468            .iter()
1469            .enumerate()
1470            .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1471            .collect()
1472    }
1473
1474    /// Format the context for explain.
1475    pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1476        let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1477        for range_meta in &self.ranges {
1478            for idx in &range_meta.row_group_indices {
1479                if self.is_mem_range_index(*idx) {
1480                    num_mem_ranges += 1;
1481                } else if self.is_file_range_index(*idx) {
1482                    num_file_ranges += 1;
1483                } else {
1484                    num_other_ranges += 1;
1485                }
1486            }
1487        }
1488        if verbose {
1489            write!(f, "{{")?;
1490        }
1491        write!(
1492            f,
1493            r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1494            self.ranges.len(),
1495            num_mem_ranges,
1496            self.input.num_files(),
1497            num_file_ranges,
1498        )?;
1499        if num_other_ranges > 0 {
1500            write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1501        }
1502        write!(f, "}}")?;
1503
1504        if let Some(selector) = &self.input.series_row_selector {
1505            write!(f, ", \"selector\":\"{}\"", selector)?;
1506        }
1507        if let Some(distribution) = &self.input.distribution {
1508            write!(f, ", \"distribution\":\"{}\"", distribution)?;
1509        }
1510
1511        if verbose {
1512            self.format_verbose_content(f)?;
1513        }
1514
1515        Ok(())
1516    }
1517
1518    fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1519        struct FileWrapper<'a> {
1520            file: &'a FileHandle,
1521        }
1522
1523        impl fmt::Debug for FileWrapper<'_> {
1524            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1525                let (start, end) = self.file.time_range();
1526                write!(
1527                    f,
1528                    r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1529                    self.file.file_id(),
1530                    start.value(),
1531                    start.unit(),
1532                    end.value(),
1533                    end.unit(),
1534                    self.file.num_rows(),
1535                    self.file.size(),
1536                    self.file.index_size()
1537                )
1538            }
1539        }
1540
1541        struct InputWrapper<'a> {
1542            input: &'a ScanInput,
1543        }
1544
1545        #[cfg(feature = "enterprise")]
1546        impl InputWrapper<'_> {
1547            fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1548                if self.input.extension_ranges.is_empty() {
1549                    return Ok(());
1550                }
1551
1552                let mut delimiter = "";
1553                write!(f, ", extension_ranges: [")?;
1554                for range in self.input.extension_ranges() {
1555                    write!(f, "{}{:?}", delimiter, range)?;
1556                    delimiter = ", ";
1557                }
1558                write!(f, "]")?;
1559                Ok(())
1560            }
1561        }
1562
1563        impl fmt::Debug for InputWrapper<'_> {
1564            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1565                let output_schema = self.input.mapper.output_schema();
1566                if !output_schema.is_empty() {
1567                    let names: Vec<_> = output_schema
1568                        .column_schemas()
1569                        .iter()
1570                        .map(|col| &col.name)
1571                        .collect();
1572                    write!(f, ", \"projection\": {:?}", names)?;
1573                }
1574                if let Some(predicate) = &self.input.predicate.predicate()
1575                    && !predicate.exprs().is_empty()
1576                {
1577                    let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1578                    write!(f, ", \"filters\": {:?}", exprs)?;
1579                }
1580                #[cfg(feature = "vector_index")]
1581                if let Some(vector_index_k) = self.input.vector_index_k {
1582                    write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1583                }
1584                if !self.input.files.is_empty() {
1585                    write!(f, ", \"files\": ")?;
1586                    f.debug_list()
1587                        .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1588                        .finish()?;
1589                }
1590                write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1591
1592                #[cfg(feature = "enterprise")]
1593                self.format_extension_ranges(f)?;
1594
1595                Ok(())
1596            }
1597        }
1598
1599        write!(f, "{:?}", InputWrapper { input: &self.input })
1600    }
1601}
1602
1603/// Predicates to evaluate.
1604/// It only keeps filters that [SimpleFilterEvaluator] supports.
1605#[derive(Clone, Default)]
1606pub struct PredicateGroup {
1607    time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1608    /// Predicate that includes request filters and region partition expr (if any).
1609    predicate_all: Option<Predicate>,
1610    /// Predicate that only includes request filters.
1611    predicate_without_region: Option<Predicate>,
1612    /// Region partition expression restored from metadata.
1613    region_partition_expr: Option<PartitionExpr>,
1614}
1615
1616impl PredicateGroup {
1617    /// Creates a new `PredicateGroup` from exprs according to the metadata.
1618    pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1619        let mut combined_exprs = exprs.to_vec();
1620        let mut region_partition_expr = None;
1621
1622        if let Some(expr_json) = metadata.partition_expr.as_ref()
1623            && !expr_json.is_empty()
1624            && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1625                .context(InvalidPartitionExprSnafu { expr: expr_json })?
1626        {
1627            let logical_expr = expr
1628                .try_as_logical_expr()
1629                .context(InvalidPartitionExprSnafu {
1630                    expr: expr_json.clone(),
1631                })?;
1632
1633            combined_exprs.push(logical_expr);
1634            region_partition_expr = Some(expr);
1635        }
1636
1637        let mut time_filters = Vec::with_capacity(combined_exprs.len());
1638        // Columns in the expr.
1639        let mut columns = HashSet::new();
1640        for expr in &combined_exprs {
1641            columns.clear();
1642            let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1643                continue;
1644            };
1645            time_filters.push(filter);
1646        }
1647        let time_filters = if time_filters.is_empty() {
1648            None
1649        } else {
1650            Some(Arc::new(time_filters))
1651        };
1652
1653        let predicate_all = if combined_exprs.is_empty() {
1654            None
1655        } else {
1656            Some(Predicate::new(combined_exprs))
1657        };
1658        let predicate_without_region = if exprs.is_empty() {
1659            None
1660        } else {
1661            Some(Predicate::new(exprs.to_vec()))
1662        };
1663
1664        Ok(Self {
1665            time_filters,
1666            predicate_all,
1667            predicate_without_region,
1668            region_partition_expr,
1669        })
1670    }
1671
1672    /// Returns time filters.
1673    pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1674        self.time_filters.clone()
1675    }
1676
1677    /// Returns predicate of all exprs (including region partition expr if present).
1678    pub(crate) fn predicate(&self) -> Option<&Predicate> {
1679        self.predicate_all.as_ref()
1680    }
1681
1682    /// Returns predicate that excludes region partition expr.
1683    pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1684        self.predicate_without_region.as_ref()
1685    }
1686
1687    /// Returns the region partition expr from metadata, if any.
1688    pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1689        self.region_partition_expr.as_ref()
1690    }
1691
1692    fn expr_to_filter(
1693        expr: &Expr,
1694        metadata: &RegionMetadata,
1695        columns: &mut HashSet<Column>,
1696    ) -> Option<SimpleFilterEvaluator> {
1697        columns.clear();
1698        // `expr_to_columns` won't return error.
1699        // We still ignore these expressions for safety.
1700        expr_to_columns(expr, columns).ok()?;
1701        if columns.len() > 1 {
1702            // Simple filter doesn't support multiple columns.
1703            return None;
1704        }
1705        let column = columns.iter().next()?;
1706        let column_meta = metadata.column_by_name(&column.name)?;
1707        if column_meta.semantic_type == SemanticType::Timestamp {
1708            SimpleFilterEvaluator::try_new(expr)
1709        } else {
1710            None
1711        }
1712    }
1713}
1714
1715#[cfg(test)]
1716mod tests {
1717    use std::sync::Arc;
1718
1719    use datafusion_expr::{col, lit};
1720    use store_api::storage::ScanRequest;
1721
1722    use super::*;
1723    use crate::memtable::time_partition::TimePartitions;
1724    use crate::region::options::RegionOptions;
1725    use crate::region::version::VersionBuilder;
1726    use crate::sst::FormatType;
1727    use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1728    use crate::test_util::scheduler_util::SchedulerEnv;
1729
1730    fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1731        let mutable = Arc::new(TimePartitions::new(
1732            metadata.clone(),
1733            Arc::new(EmptyMemtableBuilder::default()),
1734            0,
1735            None,
1736        ));
1737        Arc::new(VersionBuilder::new(metadata, mutable).build())
1738    }
1739
1740    fn new_version_with_sst_format(
1741        metadata: RegionMetadataRef,
1742        sst_format: Option<FormatType>,
1743    ) -> VersionRef {
1744        let mutable = Arc::new(TimePartitions::new(
1745            metadata.clone(),
1746            Arc::new(EmptyMemtableBuilder::default()),
1747            0,
1748            None,
1749        ));
1750        let options = RegionOptions {
1751            sst_format,
1752            ..Default::default()
1753        };
1754        Arc::new(
1755            VersionBuilder::new(metadata, mutable)
1756                .options(options)
1757                .build(),
1758        )
1759    }
1760
1761    #[tokio::test]
1762    async fn test_build_read_column_ids_includes_filters() {
1763        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1764        let version = new_version(metadata.clone());
1765        let env = SchedulerEnv::new().await;
1766        let request = ScanRequest {
1767            projection: Some(vec![4]),
1768            filters: vec![
1769                col("v0").gt(lit(1)),
1770                col("ts").gt(lit(0)),
1771                col("k0").eq(lit("foo")),
1772            ],
1773            ..Default::default()
1774        };
1775        let scan_region = ScanRegion::new(
1776            version,
1777            env.access_layer.clone(),
1778            request,
1779            CacheStrategy::Disabled,
1780        );
1781        let predicate =
1782            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1783        let projection = scan_region.request.projection.as_ref().unwrap();
1784        let read_ids = scan_region
1785            .build_read_column_ids(projection, &predicate)
1786            .unwrap();
1787        assert_eq!(vec![4, 0, 2, 3], read_ids);
1788    }
1789
1790    #[tokio::test]
1791    async fn test_build_read_column_ids_empty_projection() {
1792        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1793        let version = new_version(metadata.clone());
1794        let env = SchedulerEnv::new().await;
1795        let request = ScanRequest {
1796            projection: Some(vec![]),
1797            ..Default::default()
1798        };
1799        let scan_region = ScanRegion::new(
1800            version,
1801            env.access_layer.clone(),
1802            request,
1803            CacheStrategy::Disabled,
1804        );
1805        let predicate =
1806            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1807        let projection = scan_region.request.projection.as_ref().unwrap();
1808        let read_ids = scan_region
1809            .build_read_column_ids(projection, &predicate)
1810            .unwrap();
1811        // Empty projection should still read the time index column (id 2 in this test schema).
1812        assert_eq!(vec![2], read_ids);
1813    }
1814
1815    #[tokio::test]
1816    async fn test_build_read_column_ids_keeps_projection_order() {
1817        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1818        let version = new_version(metadata.clone());
1819        let env = SchedulerEnv::new().await;
1820        let request = ScanRequest {
1821            projection: Some(vec![4, 1]),
1822            filters: vec![col("v0").gt(lit(1))],
1823            ..Default::default()
1824        };
1825        let scan_region = ScanRegion::new(
1826            version,
1827            env.access_layer.clone(),
1828            request,
1829            CacheStrategy::Disabled,
1830        );
1831        let predicate =
1832            PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1833        let projection = scan_region.request.projection.as_ref().unwrap();
1834        let read_ids = scan_region
1835            .build_read_column_ids(projection, &predicate)
1836            .unwrap();
1837        // Projection order preserved, extra columns appended in schema order.
1838        assert_eq!(vec![4, 1, 3], read_ids);
1839    }
1840
1841    #[tokio::test]
1842    async fn test_use_flat_format_honors_request_override() {
1843        let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1844        let env = SchedulerEnv::new().await;
1845
1846        let primary_key_version =
1847            new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
1848        let request = ScanRequest::default();
1849        let scan_region = ScanRegion::new(
1850            primary_key_version.clone(),
1851            env.access_layer.clone(),
1852            request,
1853            CacheStrategy::Disabled,
1854        );
1855        assert!(!scan_region.use_flat_format());
1856
1857        let request = ScanRequest {
1858            force_flat_format: true,
1859            ..Default::default()
1860        };
1861        let scan_region = ScanRegion::new(
1862            primary_key_version,
1863            env.access_layer.clone(),
1864            request,
1865            CacheStrategy::Disabled,
1866        );
1867        assert!(scan_region.use_flat_format());
1868
1869        let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
1870        let request = ScanRequest::default();
1871        let scan_region = ScanRegion::new(
1872            flat_version,
1873            env.access_layer.clone(),
1874            request,
1875            CacheStrategy::Disabled,
1876        );
1877        assert!(scan_region.use_flat_format());
1878    }
1879}