1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39 RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::file::FileHandle;
64use crate::sst::index::bloom_filter::applier::{
65 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
66};
67use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
68use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
69use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
70use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
71use crate::sst::parquet::file_range::PreFilterMode;
72use crate::sst::parquet::reader::ReaderMetrics;
73
74const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
76
77pub(crate) enum Scanner {
79 Seq(SeqScan),
81 Unordered(UnorderedScan),
83 Series(SeriesScan),
85}
86
87impl Scanner {
88 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
90 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
91 match self {
92 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
93 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
94 Scanner::Series(series_scan) => series_scan.build_stream().await,
95 }
96 }
97
98 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
100 match self {
101 Scanner::Seq(x) => x.scan_all_partitions(),
102 Scanner::Unordered(x) => x.scan_all_partitions(),
103 Scanner::Series(x) => x.scan_all_partitions(),
104 }
105 }
106}
107
108#[cfg(test)]
109impl Scanner {
110 pub(crate) fn num_files(&self) -> usize {
112 match self {
113 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
114 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
115 Scanner::Series(series_scan) => series_scan.input().num_files(),
116 }
117 }
118
119 pub(crate) fn num_memtables(&self) -> usize {
121 match self {
122 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
123 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
124 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
125 }
126 }
127
128 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
130 match self {
131 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
132 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
133 Scanner::Series(series_scan) => series_scan.input().file_ids(),
134 }
135 }
136
137 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
139 use store_api::region_engine::{PrepareRequest, RegionScanner};
140
141 let request = PrepareRequest::default().with_target_partitions(target_partitions);
142 match self {
143 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
144 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
145 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
146 }
147 }
148}
149
150#[cfg_attr(doc, aquamarine::aquamarine)]
151pub(crate) struct ScanRegion {
201 version: VersionRef,
203 access_layer: AccessLayerRef,
205 request: ScanRequest,
207 cache_strategy: CacheStrategy,
209 parallel_scan_channel_size: usize,
211 max_concurrent_scan_files: usize,
213 ignore_inverted_index: bool,
215 ignore_fulltext_index: bool,
217 ignore_bloom_filter: bool,
219 start_time: Option<Instant>,
221 filter_deleted: bool,
224 flat_format: bool,
226 #[cfg(feature = "enterprise")]
227 extension_range_provider: Option<BoxedExtensionRangeProvider>,
228}
229
230impl ScanRegion {
231 pub(crate) fn new(
233 version: VersionRef,
234 access_layer: AccessLayerRef,
235 request: ScanRequest,
236 cache_strategy: CacheStrategy,
237 ) -> ScanRegion {
238 ScanRegion {
239 version,
240 access_layer,
241 request,
242 cache_strategy,
243 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
244 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
245 ignore_inverted_index: false,
246 ignore_fulltext_index: false,
247 ignore_bloom_filter: false,
248 start_time: None,
249 filter_deleted: true,
250 flat_format: false,
251 #[cfg(feature = "enterprise")]
252 extension_range_provider: None,
253 }
254 }
255
256 #[must_use]
258 pub(crate) fn with_parallel_scan_channel_size(
259 mut self,
260 parallel_scan_channel_size: usize,
261 ) -> Self {
262 self.parallel_scan_channel_size = parallel_scan_channel_size;
263 self
264 }
265
266 #[must_use]
268 pub(crate) fn with_max_concurrent_scan_files(
269 mut self,
270 max_concurrent_scan_files: usize,
271 ) -> Self {
272 self.max_concurrent_scan_files = max_concurrent_scan_files;
273 self
274 }
275
276 #[must_use]
278 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
279 self.ignore_inverted_index = ignore;
280 self
281 }
282
283 #[must_use]
285 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
286 self.ignore_fulltext_index = ignore;
287 self
288 }
289
290 #[must_use]
292 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
293 self.ignore_bloom_filter = ignore;
294 self
295 }
296
297 #[must_use]
298 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
299 self.start_time = Some(now);
300 self
301 }
302
303 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
304 self.filter_deleted = filter_deleted;
305 }
306
307 #[must_use]
309 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
310 self.flat_format = flat_format;
311 self
312 }
313
314 #[cfg(feature = "enterprise")]
315 pub(crate) fn set_extension_range_provider(
316 &mut self,
317 extension_range_provider: BoxedExtensionRangeProvider,
318 ) {
319 self.extension_range_provider = Some(extension_range_provider);
320 }
321
322 pub(crate) async fn scanner(self) -> Result<Scanner> {
324 if self.use_series_scan() {
325 self.series_scan().await.map(Scanner::Series)
326 } else if self.use_unordered_scan() {
327 self.unordered_scan().await.map(Scanner::Unordered)
330 } else {
331 self.seq_scan().await.map(Scanner::Seq)
332 }
333 }
334
335 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
337 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
338 if self.use_series_scan() {
339 self.series_scan()
340 .await
341 .map(|scanner| Box::new(scanner) as _)
342 } else if self.use_unordered_scan() {
343 self.unordered_scan()
344 .await
345 .map(|scanner| Box::new(scanner) as _)
346 } else {
347 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
348 }
349 }
350
351 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
353 let input = self.scan_input().await?.with_compaction(false);
354 Ok(SeqScan::new(input))
355 }
356
357 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
359 let input = self.scan_input().await?;
360 Ok(UnorderedScan::new(input))
361 }
362
363 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
365 let input = self.scan_input().await?;
366 Ok(SeriesScan::new(input))
367 }
368
369 fn use_unordered_scan(&self) -> bool {
371 self.version.options.append_mode
378 && self.request.series_row_selector.is_none()
379 && (self.request.distribution.is_none()
380 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
381 }
382
383 fn use_series_scan(&self) -> bool {
385 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
386 }
387
388 async fn scan_input(mut self) -> Result<ScanInput> {
390 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
391 let time_range = self.build_time_range_predicate();
392 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
393
394 let mapper = match &self.request.projection {
396 Some(p) => {
397 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
398 }
399 None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
400 };
401
402 let ssts = &self.version.ssts;
403 let mut files = Vec::new();
404 for level in ssts.levels() {
405 for file in level.files.values() {
406 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
407 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
408 (Some(_), None) => true,
414 (None, _) => true,
415 };
416
417 if exceed_min_sequence && file_in_range(file, &time_range) {
419 files.push(file.clone());
420 }
421 }
425 }
426
427 let memtables = self.version.memtables.list_memtables();
428 let mut mem_range_builders = Vec::new();
430 let filter_mode = pre_filter_mode(
431 self.version.options.append_mode,
432 self.version.options.merge_mode(),
433 );
434
435 for m in memtables {
436 let Some((start, end)) = m.stats().time_range() else {
438 continue;
439 };
440 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
442 if !memtable_range.intersects(&time_range) {
443 continue;
444 }
445 let ranges_in_memtable = m.ranges(
446 Some(mapper.column_ids()),
447 RangesOptions::default()
448 .with_predicate(predicate.clone())
449 .with_sequence(SequenceRange::new(
450 self.request.memtable_min_sequence,
451 self.request.memtable_max_sequence,
452 ))
453 .with_pre_filter_mode(filter_mode),
454 )?;
455 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
456 let mut stats = ranges_in_memtable.stats.clone();
458 stats.num_ranges = 1;
459 stats.num_rows = v.num_rows();
460 MemRangeBuilder::new(v, stats)
461 }));
462 }
463
464 let region_id = self.region_id();
465 debug!(
466 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
467 region_id,
468 self.request,
469 time_range,
470 mem_range_builders.len(),
471 files.len(),
472 self.version.options.append_mode,
473 );
474
475 let (non_field_filters, field_filters) = self.partition_by_field_filters();
476 let inverted_index_appliers = [
477 self.build_invereted_index_applier(&non_field_filters),
478 self.build_invereted_index_applier(&field_filters),
479 ];
480 let bloom_filter_appliers = [
481 self.build_bloom_filter_applier(&non_field_filters),
482 self.build_bloom_filter_applier(&field_filters),
483 ];
484 let fulltext_index_appliers = [
485 self.build_fulltext_index_applier(&non_field_filters),
486 self.build_fulltext_index_applier(&field_filters),
487 ];
488 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
489
490 if self.flat_format {
491 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
493 }
494
495 let input = ScanInput::new(self.access_layer, mapper)
496 .with_time_range(Some(time_range))
497 .with_predicate(predicate)
498 .with_memtables(mem_range_builders)
499 .with_files(files)
500 .with_cache(self.cache_strategy)
501 .with_inverted_index_appliers(inverted_index_appliers)
502 .with_bloom_filter_index_appliers(bloom_filter_appliers)
503 .with_fulltext_index_appliers(fulltext_index_appliers)
504 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
505 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
506 .with_start_time(self.start_time)
507 .with_append_mode(self.version.options.append_mode)
508 .with_filter_deleted(self.filter_deleted)
509 .with_merge_mode(self.version.options.merge_mode())
510 .with_series_row_selector(self.request.series_row_selector)
511 .with_distribution(self.request.distribution)
512 .with_flat_format(self.flat_format);
513
514 #[cfg(feature = "enterprise")]
515 let input = if let Some(provider) = self.extension_range_provider {
516 let ranges = provider
517 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
518 .await?;
519 debug!("Find extension ranges: {ranges:?}");
520 input.with_extension_ranges(ranges)
521 } else {
522 input
523 };
524 Ok(input)
525 }
526
527 fn region_id(&self) -> RegionId {
528 self.version.metadata.region_id
529 }
530
531 fn build_time_range_predicate(&self) -> TimestampRange {
533 let time_index = self.version.metadata.time_index_column();
534 let unit = time_index
535 .column_schema
536 .data_type
537 .as_timestamp()
538 .expect("Time index must have timestamp-compatible type")
539 .unit();
540 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
541 }
542
543 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
546 let field_columns = self
547 .version
548 .metadata
549 .field_columns()
550 .map(|col| &col.column_schema.name)
551 .collect::<HashSet<_>>();
552
553 let mut columns = HashSet::new();
554
555 self.request.filters.iter().cloned().partition(|expr| {
556 columns.clear();
557 if expr_to_columns(expr, &mut columns).is_err() {
559 return true;
561 }
562 !columns
564 .iter()
565 .any(|column| field_columns.contains(&column.name))
566 })
567 }
568
569 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
571 if self.ignore_inverted_index {
572 return None;
573 }
574
575 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
576 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
577
578 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
579
580 InvertedIndexApplierBuilder::new(
581 self.access_layer.table_dir().to_string(),
582 self.access_layer.path_type(),
583 self.access_layer.object_store().clone(),
584 self.version.metadata.as_ref(),
585 self.version.metadata.inverted_indexed_column_ids(
586 self.version
587 .options
588 .index_options
589 .inverted_index
590 .ignore_column_ids
591 .iter(),
592 ),
593 self.access_layer.puffin_manager_factory().clone(),
594 )
595 .with_file_cache(file_cache)
596 .with_inverted_index_cache(inverted_index_cache)
597 .with_puffin_metadata_cache(puffin_metadata_cache)
598 .build(filters)
599 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
600 .ok()
601 .flatten()
602 .map(Arc::new)
603 }
604
605 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
607 if self.ignore_bloom_filter {
608 return None;
609 }
610
611 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
612 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
613 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
614
615 BloomFilterIndexApplierBuilder::new(
616 self.access_layer.table_dir().to_string(),
617 self.access_layer.path_type(),
618 self.access_layer.object_store().clone(),
619 self.version.metadata.as_ref(),
620 self.access_layer.puffin_manager_factory().clone(),
621 )
622 .with_file_cache(file_cache)
623 .with_bloom_filter_index_cache(bloom_filter_index_cache)
624 .with_puffin_metadata_cache(puffin_metadata_cache)
625 .build(filters)
626 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
627 .ok()
628 .flatten()
629 .map(Arc::new)
630 }
631
632 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
634 if self.ignore_fulltext_index {
635 return None;
636 }
637
638 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
639 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
640 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
641 FulltextIndexApplierBuilder::new(
642 self.access_layer.table_dir().to_string(),
643 self.access_layer.path_type(),
644 self.access_layer.object_store().clone(),
645 self.access_layer.puffin_manager_factory().clone(),
646 self.version.metadata.as_ref(),
647 )
648 .with_file_cache(file_cache)
649 .with_puffin_metadata_cache(puffin_metadata_cache)
650 .with_bloom_filter_cache(bloom_filter_index_cache)
651 .build(filters)
652 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
653 .ok()
654 .flatten()
655 .map(Arc::new)
656 }
657}
658
659fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
661 if predicate == &TimestampRange::min_to_max() {
662 return true;
663 }
664 let (start, end) = file.time_range();
666 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
667 file_ts_range.intersects(predicate)
668}
669
670pub struct ScanInput {
672 access_layer: AccessLayerRef,
674 pub(crate) mapper: Arc<ProjectionMapper>,
676 time_range: Option<TimestampRange>,
678 pub(crate) predicate: PredicateGroup,
680 region_partition_expr: Option<PartitionExpr>,
682 pub(crate) memtables: Vec<MemRangeBuilder>,
684 pub(crate) files: Vec<FileHandle>,
686 pub(crate) cache_strategy: CacheStrategy,
688 ignore_file_not_found: bool,
690 pub(crate) parallel_scan_channel_size: usize,
692 pub(crate) max_concurrent_scan_files: usize,
694 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
696 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
697 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
698 pub(crate) query_start: Option<Instant>,
700 pub(crate) append_mode: bool,
702 pub(crate) filter_deleted: bool,
704 pub(crate) merge_mode: MergeMode,
706 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
708 pub(crate) distribution: Option<TimeSeriesDistribution>,
710 pub(crate) flat_format: bool,
712 pub(crate) compaction: bool,
714 #[cfg(feature = "enterprise")]
715 extension_ranges: Vec<BoxedExtensionRange>,
716}
717
718impl ScanInput {
719 #[must_use]
721 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
722 ScanInput {
723 access_layer,
724 mapper: Arc::new(mapper),
725 time_range: None,
726 predicate: PredicateGroup::default(),
727 region_partition_expr: None,
728 memtables: Vec::new(),
729 files: Vec::new(),
730 cache_strategy: CacheStrategy::Disabled,
731 ignore_file_not_found: false,
732 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
733 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
734 inverted_index_appliers: [None, None],
735 bloom_filter_index_appliers: [None, None],
736 fulltext_index_appliers: [None, None],
737 query_start: None,
738 append_mode: false,
739 filter_deleted: true,
740 merge_mode: MergeMode::default(),
741 series_row_selector: None,
742 distribution: None,
743 flat_format: false,
744 compaction: false,
745 #[cfg(feature = "enterprise")]
746 extension_ranges: Vec::new(),
747 }
748 }
749
750 #[must_use]
752 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
753 self.time_range = time_range;
754 self
755 }
756
757 #[must_use]
759 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
760 self.region_partition_expr = predicate.region_partition_expr().cloned();
761 self.predicate = predicate;
762 self
763 }
764
765 #[must_use]
767 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
768 self.memtables = memtables;
769 self
770 }
771
772 #[must_use]
774 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
775 self.files = files;
776 self
777 }
778
779 #[must_use]
781 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
782 self.cache_strategy = cache;
783 self
784 }
785
786 #[must_use]
788 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
789 self.ignore_file_not_found = ignore;
790 self
791 }
792
793 #[must_use]
795 pub(crate) fn with_parallel_scan_channel_size(
796 mut self,
797 parallel_scan_channel_size: usize,
798 ) -> Self {
799 self.parallel_scan_channel_size = parallel_scan_channel_size;
800 self
801 }
802
803 #[must_use]
805 pub(crate) fn with_max_concurrent_scan_files(
806 mut self,
807 max_concurrent_scan_files: usize,
808 ) -> Self {
809 self.max_concurrent_scan_files = max_concurrent_scan_files;
810 self
811 }
812
813 #[must_use]
815 pub(crate) fn with_inverted_index_appliers(
816 mut self,
817 appliers: [Option<InvertedIndexApplierRef>; 2],
818 ) -> Self {
819 self.inverted_index_appliers = appliers;
820 self
821 }
822
823 #[must_use]
825 pub(crate) fn with_bloom_filter_index_appliers(
826 mut self,
827 appliers: [Option<BloomFilterIndexApplierRef>; 2],
828 ) -> Self {
829 self.bloom_filter_index_appliers = appliers;
830 self
831 }
832
833 #[must_use]
835 pub(crate) fn with_fulltext_index_appliers(
836 mut self,
837 appliers: [Option<FulltextIndexApplierRef>; 2],
838 ) -> Self {
839 self.fulltext_index_appliers = appliers;
840 self
841 }
842
843 #[must_use]
845 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
846 self.query_start = now;
847 self
848 }
849
850 #[must_use]
851 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
852 self.append_mode = is_append_mode;
853 self
854 }
855
856 #[must_use]
858 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
859 self.filter_deleted = filter_deleted;
860 self
861 }
862
863 #[must_use]
865 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
866 self.merge_mode = merge_mode;
867 self
868 }
869
870 #[must_use]
872 pub(crate) fn with_distribution(
873 mut self,
874 distribution: Option<TimeSeriesDistribution>,
875 ) -> Self {
876 self.distribution = distribution;
877 self
878 }
879
880 #[must_use]
882 pub(crate) fn with_series_row_selector(
883 mut self,
884 series_row_selector: Option<TimeSeriesRowSelector>,
885 ) -> Self {
886 self.series_row_selector = series_row_selector;
887 self
888 }
889
890 #[must_use]
892 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
893 self.flat_format = flat_format;
894 self
895 }
896
897 #[must_use]
899 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
900 self.compaction = compaction;
901 self
902 }
903
904 pub(crate) fn create_parallel_sources(
908 &self,
909 sources: Vec<Source>,
910 semaphore: Arc<Semaphore>,
911 ) -> Result<Vec<Source>> {
912 if sources.len() <= 1 {
913 return Ok(sources);
914 }
915
916 let sources = sources
918 .into_iter()
919 .map(|source| {
920 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
921 self.spawn_scan_task(source, semaphore.clone(), sender);
922 let stream = Box::pin(ReceiverStream::new(receiver));
923 Source::Stream(stream)
924 })
925 .collect();
926 Ok(sources)
927 }
928
929 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
931 let memtable = &self.memtables[index.index];
932 let mut ranges = SmallVec::new();
933 memtable.build_ranges(index.row_group_index, &mut ranges);
934 ranges
935 }
936
937 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
938 if self.should_skip_region_partition(file) {
939 self.predicate.predicate_without_region().cloned()
940 } else {
941 self.predicate.predicate().cloned()
942 }
943 }
944
945 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
946 match (
947 self.region_partition_expr.as_ref(),
948 file.meta_ref().partition_expr.as_ref(),
949 ) {
950 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
951 _ => false,
952 }
953 }
954
955 pub async fn prune_file(
957 &self,
958 file: &FileHandle,
959 reader_metrics: &mut ReaderMetrics,
960 ) -> Result<FileRangeBuilder> {
961 let predicate = self.predicate_for_file(file);
962 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
963 let res = self
964 .access_layer
965 .read_sst(file.clone())
966 .predicate(predicate)
967 .projection(Some(self.mapper.column_ids().to_vec()))
968 .cache(self.cache_strategy.clone())
969 .inverted_index_appliers(self.inverted_index_appliers.clone())
970 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
971 .fulltext_index_appliers(self.fulltext_index_appliers.clone())
972 .expected_metadata(Some(self.mapper.metadata().clone()))
973 .flat_format(self.flat_format)
974 .compaction(self.compaction)
975 .pre_filter_mode(filter_mode)
976 .build_reader_input(reader_metrics)
977 .await;
978 let (mut file_range_ctx, selection) = match res {
979 Ok(x) => x,
980 Err(e) => {
981 if e.is_object_not_found() && self.ignore_file_not_found {
982 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
983 return Ok(FileRangeBuilder::default());
984 } else {
985 return Err(e);
986 }
987 }
988 };
989
990 let need_compat = !compat::has_same_columns_and_pk_encoding(
991 self.mapper.metadata(),
992 file_range_ctx.read_format().metadata(),
993 );
994 if need_compat {
995 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
998 let mapper = self.mapper.as_flat().unwrap();
999 FlatCompatBatch::try_new(
1000 mapper,
1001 flat_format.metadata(),
1002 flat_format.format_projection(),
1003 self.compaction,
1004 )?
1005 .map(CompatBatch::Flat)
1006 } else {
1007 let compact_batch = PrimaryKeyCompatBatch::new(
1008 &self.mapper,
1009 file_range_ctx.read_format().metadata().clone(),
1010 )?;
1011 Some(CompatBatch::PrimaryKey(compact_batch))
1012 };
1013 file_range_ctx.set_compat_batch(compat);
1014 }
1015 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1016 }
1017
1018 pub(crate) fn spawn_scan_task(
1020 &self,
1021 mut input: Source,
1022 semaphore: Arc<Semaphore>,
1023 sender: mpsc::Sender<Result<Batch>>,
1024 ) {
1025 common_runtime::spawn_global(async move {
1026 loop {
1027 let maybe_batch = {
1030 let _permit = semaphore.acquire().await.unwrap();
1032 input.next_batch().await
1033 };
1034 match maybe_batch {
1035 Ok(Some(batch)) => {
1036 let _ = sender.send(Ok(batch)).await;
1037 }
1038 Ok(None) => break,
1039 Err(e) => {
1040 let _ = sender.send(Err(e)).await;
1041 break;
1042 }
1043 }
1044 }
1045 });
1046 }
1047
1048 pub(crate) fn create_parallel_flat_sources(
1052 &self,
1053 sources: Vec<BoxedRecordBatchStream>,
1054 semaphore: Arc<Semaphore>,
1055 ) -> Result<Vec<BoxedRecordBatchStream>> {
1056 if sources.len() <= 1 {
1057 return Ok(sources);
1058 }
1059
1060 let sources = sources
1062 .into_iter()
1063 .map(|source| {
1064 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1065 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1066 let stream = Box::pin(ReceiverStream::new(receiver));
1067 Box::pin(stream) as _
1068 })
1069 .collect();
1070 Ok(sources)
1071 }
1072
1073 pub(crate) fn spawn_flat_scan_task(
1075 &self,
1076 mut input: BoxedRecordBatchStream,
1077 semaphore: Arc<Semaphore>,
1078 sender: mpsc::Sender<Result<RecordBatch>>,
1079 ) {
1080 common_runtime::spawn_global(async move {
1081 loop {
1082 let maybe_batch = {
1085 let _permit = semaphore.acquire().await.unwrap();
1087 input.next().await
1088 };
1089 match maybe_batch {
1090 Some(Ok(batch)) => {
1091 let _ = sender.send(Ok(batch)).await;
1092 }
1093 Some(Err(e)) => {
1094 let _ = sender.send(Err(e)).await;
1095 break;
1096 }
1097 None => break,
1098 }
1099 }
1100 });
1101 }
1102
1103 pub(crate) fn total_rows(&self) -> usize {
1104 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1105 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1106
1107 let rows = rows_in_files + rows_in_memtables;
1108 #[cfg(feature = "enterprise")]
1109 let rows = rows
1110 + self
1111 .extension_ranges
1112 .iter()
1113 .map(|x| x.num_rows())
1114 .sum::<u64>() as usize;
1115 rows
1116 }
1117
1118 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1119 &self.predicate
1120 }
1121
1122 pub(crate) fn num_memtables(&self) -> usize {
1124 self.memtables.len()
1125 }
1126
1127 pub(crate) fn num_files(&self) -> usize {
1129 self.files.len()
1130 }
1131
1132 pub fn region_metadata(&self) -> &RegionMetadataRef {
1133 self.mapper.metadata()
1134 }
1135}
1136
1137#[cfg(feature = "enterprise")]
1138impl ScanInput {
1139 #[must_use]
1140 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1141 Self {
1142 extension_ranges,
1143 ..self
1144 }
1145 }
1146
1147 #[cfg(feature = "enterprise")]
1148 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1149 &self.extension_ranges
1150 }
1151
1152 #[cfg(feature = "enterprise")]
1154 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1155 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1156 }
1157}
1158
1159#[cfg(test)]
1160impl ScanInput {
1161 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1163 self.files.iter().map(|file| file.file_id()).collect()
1164 }
1165}
1166
1167fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1168 if append_mode {
1169 return PreFilterMode::All;
1170 }
1171
1172 match merge_mode {
1173 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1174 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1175 }
1176}
1177
1178pub struct StreamContext {
1181 pub input: ScanInput,
1183 pub(crate) ranges: Vec<RangeMeta>,
1185
1186 pub(crate) query_start: Instant,
1189}
1190
1191impl StreamContext {
1192 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1194 let query_start = input.query_start.unwrap_or_else(Instant::now);
1195 let ranges = RangeMeta::seq_scan_ranges(&input);
1196 READ_SST_COUNT.observe(input.num_files() as f64);
1197
1198 Self {
1199 input,
1200 ranges,
1201 query_start,
1202 }
1203 }
1204
1205 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1207 let query_start = input.query_start.unwrap_or_else(Instant::now);
1208 let ranges = RangeMeta::unordered_scan_ranges(&input);
1209 READ_SST_COUNT.observe(input.num_files() as f64);
1210
1211 Self {
1212 input,
1213 ranges,
1214 query_start,
1215 }
1216 }
1217
1218 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1220 self.input.num_memtables() > index.index
1221 }
1222
1223 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1224 !self.is_mem_range_index(index)
1225 && index.index < self.input.num_files() + self.input.num_memtables()
1226 }
1227
1228 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1230 self.ranges
1231 .iter()
1232 .enumerate()
1233 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1234 .collect()
1235 }
1236
1237 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1239 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1240 for range_meta in &self.ranges {
1241 for idx in &range_meta.row_group_indices {
1242 if self.is_mem_range_index(*idx) {
1243 num_mem_ranges += 1;
1244 } else if self.is_file_range_index(*idx) {
1245 num_file_ranges += 1;
1246 } else {
1247 num_other_ranges += 1;
1248 }
1249 }
1250 }
1251 if verbose {
1252 write!(f, "{{")?;
1253 }
1254 write!(
1255 f,
1256 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1257 self.ranges.len(),
1258 num_mem_ranges,
1259 self.input.num_files(),
1260 num_file_ranges,
1261 )?;
1262 if num_other_ranges > 0 {
1263 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1264 }
1265 write!(f, "}}")?;
1266
1267 if let Some(selector) = &self.input.series_row_selector {
1268 write!(f, ", \"selector\":\"{}\"", selector)?;
1269 }
1270 if let Some(distribution) = &self.input.distribution {
1271 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1272 }
1273
1274 if verbose {
1275 self.format_verbose_content(f)?;
1276 }
1277
1278 Ok(())
1279 }
1280
1281 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1282 struct FileWrapper<'a> {
1283 file: &'a FileHandle,
1284 }
1285
1286 impl fmt::Debug for FileWrapper<'_> {
1287 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1288 let (start, end) = self.file.time_range();
1289 write!(
1290 f,
1291 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1292 self.file.file_id(),
1293 start.value(),
1294 start.unit(),
1295 end.value(),
1296 end.unit(),
1297 self.file.num_rows(),
1298 self.file.size(),
1299 self.file.index_size()
1300 )
1301 }
1302 }
1303
1304 struct InputWrapper<'a> {
1305 input: &'a ScanInput,
1306 }
1307
1308 #[cfg(feature = "enterprise")]
1309 impl InputWrapper<'_> {
1310 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1311 if self.input.extension_ranges.is_empty() {
1312 return Ok(());
1313 }
1314
1315 let mut delimiter = "";
1316 write!(f, ", extension_ranges: [")?;
1317 for range in self.input.extension_ranges() {
1318 write!(f, "{}{:?}", delimiter, range)?;
1319 delimiter = ", ";
1320 }
1321 write!(f, "]")?;
1322 Ok(())
1323 }
1324 }
1325
1326 impl fmt::Debug for InputWrapper<'_> {
1327 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1328 let output_schema = self.input.mapper.output_schema();
1329 if !output_schema.is_empty() {
1330 let names: Vec<_> = output_schema
1331 .column_schemas()
1332 .iter()
1333 .map(|col| &col.name)
1334 .collect();
1335 write!(f, ", \"projection\": {:?}", names)?;
1336 }
1337 if let Some(predicate) = &self.input.predicate.predicate()
1338 && !predicate.exprs().is_empty()
1339 {
1340 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1341 write!(f, ", \"filters\": {:?}", exprs)?;
1342 }
1343 if !self.input.files.is_empty() {
1344 write!(f, ", \"files\": ")?;
1345 f.debug_list()
1346 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1347 .finish()?;
1348 }
1349
1350 #[cfg(feature = "enterprise")]
1351 self.format_extension_ranges(f)?;
1352
1353 Ok(())
1354 }
1355 }
1356
1357 write!(f, "{:?}", InputWrapper { input: &self.input })
1358 }
1359}
1360
1361#[derive(Clone, Default)]
1364pub struct PredicateGroup {
1365 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1366 predicate_all: Option<Predicate>,
1368 predicate_without_region: Option<Predicate>,
1370 region_partition_expr: Option<PartitionExpr>,
1372}
1373
1374impl PredicateGroup {
1375 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1377 let mut combined_exprs = exprs.to_vec();
1378 let mut region_partition_expr = None;
1379
1380 if let Some(expr_json) = metadata.partition_expr.as_ref()
1381 && !expr_json.is_empty()
1382 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1383 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1384 {
1385 let logical_expr = expr
1386 .try_as_logical_expr()
1387 .context(InvalidPartitionExprSnafu {
1388 expr: expr_json.clone(),
1389 })?;
1390
1391 combined_exprs.push(logical_expr);
1392 region_partition_expr = Some(expr);
1393 }
1394
1395 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1396 let mut columns = HashSet::new();
1398 for expr in &combined_exprs {
1399 columns.clear();
1400 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1401 continue;
1402 };
1403 time_filters.push(filter);
1404 }
1405 let time_filters = if time_filters.is_empty() {
1406 None
1407 } else {
1408 Some(Arc::new(time_filters))
1409 };
1410
1411 let predicate_all = if combined_exprs.is_empty() {
1412 None
1413 } else {
1414 Some(Predicate::new(combined_exprs))
1415 };
1416 let predicate_without_region = if exprs.is_empty() {
1417 None
1418 } else {
1419 Some(Predicate::new(exprs.to_vec()))
1420 };
1421
1422 Ok(Self {
1423 time_filters,
1424 predicate_all,
1425 predicate_without_region,
1426 region_partition_expr,
1427 })
1428 }
1429
1430 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1432 self.time_filters.clone()
1433 }
1434
1435 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1437 self.predicate_all.as_ref()
1438 }
1439
1440 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1442 self.predicate_without_region.as_ref()
1443 }
1444
1445 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1447 self.region_partition_expr.as_ref()
1448 }
1449
1450 fn expr_to_filter(
1451 expr: &Expr,
1452 metadata: &RegionMetadata,
1453 columns: &mut HashSet<Column>,
1454 ) -> Option<SimpleFilterEvaluator> {
1455 columns.clear();
1456 expr_to_columns(expr, columns).ok()?;
1459 if columns.len() > 1 {
1460 return None;
1462 }
1463 let column = columns.iter().next()?;
1464 let column_meta = metadata.column_by_name(&column.name)?;
1465 if column_meta.semantic_type == SemanticType::Timestamp {
1466 SimpleFilterEvaluator::try_new(expr)
1467 } else {
1468 None
1469 }
1470 }
1471}