1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39 RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::FormatType;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
67};
68use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
69use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
70use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
71use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
72use crate::sst::parquet::file_range::PreFilterMode;
73use crate::sst::parquet::reader::ReaderMetrics;
74
75const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
77
78pub(crate) enum Scanner {
80 Seq(SeqScan),
82 Unordered(UnorderedScan),
84 Series(SeriesScan),
86}
87
88impl Scanner {
89 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
91 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
92 match self {
93 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
94 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
95 Scanner::Series(series_scan) => series_scan.build_stream().await,
96 }
97 }
98
99 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
101 match self {
102 Scanner::Seq(x) => x.scan_all_partitions(),
103 Scanner::Unordered(x) => x.scan_all_partitions(),
104 Scanner::Series(x) => x.scan_all_partitions(),
105 }
106 }
107}
108
109#[cfg(test)]
110impl Scanner {
111 pub(crate) fn num_files(&self) -> usize {
113 match self {
114 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
115 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
116 Scanner::Series(series_scan) => series_scan.input().num_files(),
117 }
118 }
119
120 pub(crate) fn num_memtables(&self) -> usize {
122 match self {
123 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
124 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
125 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
126 }
127 }
128
129 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
131 match self {
132 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
133 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
134 Scanner::Series(series_scan) => series_scan.input().file_ids(),
135 }
136 }
137
138 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
140 use store_api::region_engine::{PrepareRequest, RegionScanner};
141
142 let request = PrepareRequest::default().with_target_partitions(target_partitions);
143 match self {
144 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
145 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
146 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
147 }
148 }
149}
150
151#[cfg_attr(doc, aquamarine::aquamarine)]
152pub(crate) struct ScanRegion {
202 version: VersionRef,
204 access_layer: AccessLayerRef,
206 request: ScanRequest,
208 cache_strategy: CacheStrategy,
210 parallel_scan_channel_size: usize,
212 max_concurrent_scan_files: usize,
214 ignore_inverted_index: bool,
216 ignore_fulltext_index: bool,
218 ignore_bloom_filter: bool,
220 start_time: Option<Instant>,
222 filter_deleted: bool,
225 #[cfg(feature = "enterprise")]
226 extension_range_provider: Option<BoxedExtensionRangeProvider>,
227}
228
229impl ScanRegion {
230 pub(crate) fn new(
232 version: VersionRef,
233 access_layer: AccessLayerRef,
234 request: ScanRequest,
235 cache_strategy: CacheStrategy,
236 ) -> ScanRegion {
237 ScanRegion {
238 version,
239 access_layer,
240 request,
241 cache_strategy,
242 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
243 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
244 ignore_inverted_index: false,
245 ignore_fulltext_index: false,
246 ignore_bloom_filter: false,
247 start_time: None,
248 filter_deleted: true,
249 #[cfg(feature = "enterprise")]
250 extension_range_provider: None,
251 }
252 }
253
254 #[must_use]
256 pub(crate) fn with_parallel_scan_channel_size(
257 mut self,
258 parallel_scan_channel_size: usize,
259 ) -> Self {
260 self.parallel_scan_channel_size = parallel_scan_channel_size;
261 self
262 }
263
264 #[must_use]
266 pub(crate) fn with_max_concurrent_scan_files(
267 mut self,
268 max_concurrent_scan_files: usize,
269 ) -> Self {
270 self.max_concurrent_scan_files = max_concurrent_scan_files;
271 self
272 }
273
274 #[must_use]
276 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
277 self.ignore_inverted_index = ignore;
278 self
279 }
280
281 #[must_use]
283 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
284 self.ignore_fulltext_index = ignore;
285 self
286 }
287
288 #[must_use]
290 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
291 self.ignore_bloom_filter = ignore;
292 self
293 }
294
295 #[must_use]
296 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
297 self.start_time = Some(now);
298 self
299 }
300
301 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
302 self.filter_deleted = filter_deleted;
303 }
304
305 #[cfg(feature = "enterprise")]
306 pub(crate) fn set_extension_range_provider(
307 &mut self,
308 extension_range_provider: BoxedExtensionRangeProvider,
309 ) {
310 self.extension_range_provider = Some(extension_range_provider);
311 }
312
313 pub(crate) async fn scanner(self) -> Result<Scanner> {
315 if self.use_series_scan() {
316 self.series_scan().await.map(Scanner::Series)
317 } else if self.use_unordered_scan() {
318 self.unordered_scan().await.map(Scanner::Unordered)
321 } else {
322 self.seq_scan().await.map(Scanner::Seq)
323 }
324 }
325
326 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
328 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
329 if self.use_series_scan() {
330 self.series_scan()
331 .await
332 .map(|scanner| Box::new(scanner) as _)
333 } else if self.use_unordered_scan() {
334 self.unordered_scan()
335 .await
336 .map(|scanner| Box::new(scanner) as _)
337 } else {
338 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
339 }
340 }
341
342 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
344 let input = self.scan_input().await?.with_compaction(false);
345 Ok(SeqScan::new(input))
346 }
347
348 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
350 let input = self.scan_input().await?;
351 Ok(UnorderedScan::new(input))
352 }
353
354 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
356 let input = self.scan_input().await?;
357 Ok(SeriesScan::new(input))
358 }
359
360 fn use_unordered_scan(&self) -> bool {
362 self.version.options.append_mode
369 && self.request.series_row_selector.is_none()
370 && (self.request.distribution.is_none()
371 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
372 }
373
374 fn use_series_scan(&self) -> bool {
376 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
377 }
378
379 fn use_flat_format(&self) -> bool {
381 self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
382 }
383
384 async fn scan_input(mut self) -> Result<ScanInput> {
386 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
387 let time_range = self.build_time_range_predicate();
388 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
389 let flat_format = self.use_flat_format();
390
391 let mapper = match &self.request.projection {
393 Some(p) => {
394 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
395 }
396 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
397 };
398
399 let ssts = &self.version.ssts;
400 let mut files = Vec::new();
401 for level in ssts.levels() {
402 for file in level.files.values() {
403 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
404 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
405 (Some(_), None) => true,
411 (None, _) => true,
412 };
413
414 if exceed_min_sequence && file_in_range(file, &time_range) {
416 files.push(file.clone());
417 }
418 }
422 }
423
424 let memtables = self.version.memtables.list_memtables();
425 let mut mem_range_builders = Vec::new();
427 let filter_mode = pre_filter_mode(
428 self.version.options.append_mode,
429 self.version.options.merge_mode(),
430 );
431
432 for m in memtables {
433 let Some((start, end)) = m.stats().time_range() else {
435 continue;
436 };
437 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
439 if !memtable_range.intersects(&time_range) {
440 continue;
441 }
442 let ranges_in_memtable = m.ranges(
443 Some(mapper.column_ids()),
444 RangesOptions::default()
445 .with_predicate(predicate.clone())
446 .with_sequence(SequenceRange::new(
447 self.request.memtable_min_sequence,
448 self.request.memtable_max_sequence,
449 ))
450 .with_pre_filter_mode(filter_mode),
451 )?;
452 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
453 let mut stats = ranges_in_memtable.stats.clone();
455 stats.num_ranges = 1;
456 stats.num_rows = v.num_rows();
457 MemRangeBuilder::new(v, stats)
458 }));
459 }
460
461 let region_id = self.region_id();
462 debug!(
463 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
464 region_id,
465 self.request,
466 time_range,
467 mem_range_builders.len(),
468 files.len(),
469 self.version.options.append_mode,
470 flat_format,
471 );
472
473 let (non_field_filters, field_filters) = self.partition_by_field_filters();
474 let inverted_index_appliers = [
475 self.build_invereted_index_applier(&non_field_filters),
476 self.build_invereted_index_applier(&field_filters),
477 ];
478 let bloom_filter_appliers = [
479 self.build_bloom_filter_applier(&non_field_filters),
480 self.build_bloom_filter_applier(&field_filters),
481 ];
482 let fulltext_index_appliers = [
483 self.build_fulltext_index_applier(&non_field_filters),
484 self.build_fulltext_index_applier(&field_filters),
485 ];
486 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
487
488 if flat_format {
489 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
491 }
492
493 let input = ScanInput::new(self.access_layer, mapper)
494 .with_time_range(Some(time_range))
495 .with_predicate(predicate)
496 .with_memtables(mem_range_builders)
497 .with_files(files)
498 .with_cache(self.cache_strategy)
499 .with_inverted_index_appliers(inverted_index_appliers)
500 .with_bloom_filter_index_appliers(bloom_filter_appliers)
501 .with_fulltext_index_appliers(fulltext_index_appliers)
502 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
503 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
504 .with_start_time(self.start_time)
505 .with_append_mode(self.version.options.append_mode)
506 .with_filter_deleted(self.filter_deleted)
507 .with_merge_mode(self.version.options.merge_mode())
508 .with_series_row_selector(self.request.series_row_selector)
509 .with_distribution(self.request.distribution)
510 .with_flat_format(flat_format);
511
512 #[cfg(feature = "enterprise")]
513 let input = if let Some(provider) = self.extension_range_provider {
514 let ranges = provider
515 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
516 .await?;
517 debug!("Find extension ranges: {ranges:?}");
518 input.with_extension_ranges(ranges)
519 } else {
520 input
521 };
522 Ok(input)
523 }
524
525 fn region_id(&self) -> RegionId {
526 self.version.metadata.region_id
527 }
528
529 fn build_time_range_predicate(&self) -> TimestampRange {
531 let time_index = self.version.metadata.time_index_column();
532 let unit = time_index
533 .column_schema
534 .data_type
535 .as_timestamp()
536 .expect("Time index must have timestamp-compatible type")
537 .unit();
538 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
539 }
540
541 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
544 let field_columns = self
545 .version
546 .metadata
547 .field_columns()
548 .map(|col| &col.column_schema.name)
549 .collect::<HashSet<_>>();
550
551 let mut columns = HashSet::new();
552
553 self.request.filters.iter().cloned().partition(|expr| {
554 columns.clear();
555 if expr_to_columns(expr, &mut columns).is_err() {
557 return true;
559 }
560 !columns
562 .iter()
563 .any(|column| field_columns.contains(&column.name))
564 })
565 }
566
567 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
569 if self.ignore_inverted_index {
570 return None;
571 }
572
573 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
574 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
575
576 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
577
578 InvertedIndexApplierBuilder::new(
579 self.access_layer.table_dir().to_string(),
580 self.access_layer.path_type(),
581 self.access_layer.object_store().clone(),
582 self.version.metadata.as_ref(),
583 self.version.metadata.inverted_indexed_column_ids(
584 self.version
585 .options
586 .index_options
587 .inverted_index
588 .ignore_column_ids
589 .iter(),
590 ),
591 self.access_layer.puffin_manager_factory().clone(),
592 )
593 .with_file_cache(file_cache)
594 .with_inverted_index_cache(inverted_index_cache)
595 .with_puffin_metadata_cache(puffin_metadata_cache)
596 .build(filters)
597 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
598 .ok()
599 .flatten()
600 .map(Arc::new)
601 }
602
603 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
605 if self.ignore_bloom_filter {
606 return None;
607 }
608
609 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
610 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
611 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
612
613 BloomFilterIndexApplierBuilder::new(
614 self.access_layer.table_dir().to_string(),
615 self.access_layer.path_type(),
616 self.access_layer.object_store().clone(),
617 self.version.metadata.as_ref(),
618 self.access_layer.puffin_manager_factory().clone(),
619 )
620 .with_file_cache(file_cache)
621 .with_bloom_filter_index_cache(bloom_filter_index_cache)
622 .with_puffin_metadata_cache(puffin_metadata_cache)
623 .build(filters)
624 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
625 .ok()
626 .flatten()
627 .map(Arc::new)
628 }
629
630 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
632 if self.ignore_fulltext_index {
633 return None;
634 }
635
636 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
637 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
638 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
639 FulltextIndexApplierBuilder::new(
640 self.access_layer.table_dir().to_string(),
641 self.access_layer.path_type(),
642 self.access_layer.object_store().clone(),
643 self.access_layer.puffin_manager_factory().clone(),
644 self.version.metadata.as_ref(),
645 )
646 .with_file_cache(file_cache)
647 .with_puffin_metadata_cache(puffin_metadata_cache)
648 .with_bloom_filter_cache(bloom_filter_index_cache)
649 .build(filters)
650 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
651 .ok()
652 .flatten()
653 .map(Arc::new)
654 }
655}
656
657fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
659 if predicate == &TimestampRange::min_to_max() {
660 return true;
661 }
662 let (start, end) = file.time_range();
664 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
665 file_ts_range.intersects(predicate)
666}
667
668pub struct ScanInput {
670 access_layer: AccessLayerRef,
672 pub(crate) mapper: Arc<ProjectionMapper>,
674 time_range: Option<TimestampRange>,
676 pub(crate) predicate: PredicateGroup,
678 region_partition_expr: Option<PartitionExpr>,
680 pub(crate) memtables: Vec<MemRangeBuilder>,
682 pub(crate) files: Vec<FileHandle>,
684 pub(crate) cache_strategy: CacheStrategy,
686 ignore_file_not_found: bool,
688 pub(crate) parallel_scan_channel_size: usize,
690 pub(crate) max_concurrent_scan_files: usize,
692 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
694 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
695 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
696 pub(crate) query_start: Option<Instant>,
698 pub(crate) append_mode: bool,
700 pub(crate) filter_deleted: bool,
702 pub(crate) merge_mode: MergeMode,
704 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
706 pub(crate) distribution: Option<TimeSeriesDistribution>,
708 pub(crate) flat_format: bool,
710 pub(crate) compaction: bool,
712 #[cfg(feature = "enterprise")]
713 extension_ranges: Vec<BoxedExtensionRange>,
714}
715
716impl ScanInput {
717 #[must_use]
719 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
720 ScanInput {
721 access_layer,
722 mapper: Arc::new(mapper),
723 time_range: None,
724 predicate: PredicateGroup::default(),
725 region_partition_expr: None,
726 memtables: Vec::new(),
727 files: Vec::new(),
728 cache_strategy: CacheStrategy::Disabled,
729 ignore_file_not_found: false,
730 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
731 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
732 inverted_index_appliers: [None, None],
733 bloom_filter_index_appliers: [None, None],
734 fulltext_index_appliers: [None, None],
735 query_start: None,
736 append_mode: false,
737 filter_deleted: true,
738 merge_mode: MergeMode::default(),
739 series_row_selector: None,
740 distribution: None,
741 flat_format: false,
742 compaction: false,
743 #[cfg(feature = "enterprise")]
744 extension_ranges: Vec::new(),
745 }
746 }
747
748 #[must_use]
750 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
751 self.time_range = time_range;
752 self
753 }
754
755 #[must_use]
757 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
758 self.region_partition_expr = predicate.region_partition_expr().cloned();
759 self.predicate = predicate;
760 self
761 }
762
763 #[must_use]
765 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
766 self.memtables = memtables;
767 self
768 }
769
770 #[must_use]
772 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
773 self.files = files;
774 self
775 }
776
777 #[must_use]
779 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
780 self.cache_strategy = cache;
781 self
782 }
783
784 #[must_use]
786 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
787 self.ignore_file_not_found = ignore;
788 self
789 }
790
791 #[must_use]
793 pub(crate) fn with_parallel_scan_channel_size(
794 mut self,
795 parallel_scan_channel_size: usize,
796 ) -> Self {
797 self.parallel_scan_channel_size = parallel_scan_channel_size;
798 self
799 }
800
801 #[must_use]
803 pub(crate) fn with_max_concurrent_scan_files(
804 mut self,
805 max_concurrent_scan_files: usize,
806 ) -> Self {
807 self.max_concurrent_scan_files = max_concurrent_scan_files;
808 self
809 }
810
811 #[must_use]
813 pub(crate) fn with_inverted_index_appliers(
814 mut self,
815 appliers: [Option<InvertedIndexApplierRef>; 2],
816 ) -> Self {
817 self.inverted_index_appliers = appliers;
818 self
819 }
820
821 #[must_use]
823 pub(crate) fn with_bloom_filter_index_appliers(
824 mut self,
825 appliers: [Option<BloomFilterIndexApplierRef>; 2],
826 ) -> Self {
827 self.bloom_filter_index_appliers = appliers;
828 self
829 }
830
831 #[must_use]
833 pub(crate) fn with_fulltext_index_appliers(
834 mut self,
835 appliers: [Option<FulltextIndexApplierRef>; 2],
836 ) -> Self {
837 self.fulltext_index_appliers = appliers;
838 self
839 }
840
841 #[must_use]
843 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
844 self.query_start = now;
845 self
846 }
847
848 #[must_use]
849 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
850 self.append_mode = is_append_mode;
851 self
852 }
853
854 #[must_use]
856 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
857 self.filter_deleted = filter_deleted;
858 self
859 }
860
861 #[must_use]
863 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
864 self.merge_mode = merge_mode;
865 self
866 }
867
868 #[must_use]
870 pub(crate) fn with_distribution(
871 mut self,
872 distribution: Option<TimeSeriesDistribution>,
873 ) -> Self {
874 self.distribution = distribution;
875 self
876 }
877
878 #[must_use]
880 pub(crate) fn with_series_row_selector(
881 mut self,
882 series_row_selector: Option<TimeSeriesRowSelector>,
883 ) -> Self {
884 self.series_row_selector = series_row_selector;
885 self
886 }
887
888 #[must_use]
890 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
891 self.flat_format = flat_format;
892 self
893 }
894
895 #[must_use]
897 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
898 self.compaction = compaction;
899 self
900 }
901
902 pub(crate) fn create_parallel_sources(
906 &self,
907 sources: Vec<Source>,
908 semaphore: Arc<Semaphore>,
909 ) -> Result<Vec<Source>> {
910 if sources.len() <= 1 {
911 return Ok(sources);
912 }
913
914 let sources = sources
916 .into_iter()
917 .map(|source| {
918 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
919 self.spawn_scan_task(source, semaphore.clone(), sender);
920 let stream = Box::pin(ReceiverStream::new(receiver));
921 Source::Stream(stream)
922 })
923 .collect();
924 Ok(sources)
925 }
926
927 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
929 let memtable = &self.memtables[index.index];
930 let mut ranges = SmallVec::new();
931 memtable.build_ranges(index.row_group_index, &mut ranges);
932 ranges
933 }
934
935 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
936 if self.should_skip_region_partition(file) {
937 self.predicate.predicate_without_region().cloned()
938 } else {
939 self.predicate.predicate().cloned()
940 }
941 }
942
943 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
944 match (
945 self.region_partition_expr.as_ref(),
946 file.meta_ref().partition_expr.as_ref(),
947 ) {
948 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
949 _ => false,
950 }
951 }
952
953 pub async fn prune_file(
955 &self,
956 file: &FileHandle,
957 reader_metrics: &mut ReaderMetrics,
958 ) -> Result<FileRangeBuilder> {
959 let predicate = self.predicate_for_file(file);
960 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
961 let res = self
962 .access_layer
963 .read_sst(file.clone())
964 .predicate(predicate)
965 .projection(Some(self.mapper.column_ids().to_vec()))
966 .cache(self.cache_strategy.clone())
967 .inverted_index_appliers(self.inverted_index_appliers.clone())
968 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
969 .fulltext_index_appliers(self.fulltext_index_appliers.clone())
970 .expected_metadata(Some(self.mapper.metadata().clone()))
971 .flat_format(self.flat_format)
972 .compaction(self.compaction)
973 .pre_filter_mode(filter_mode)
974 .build_reader_input(reader_metrics)
975 .await;
976 let (mut file_range_ctx, selection) = match res {
977 Ok(x) => x,
978 Err(e) => {
979 if e.is_object_not_found() && self.ignore_file_not_found {
980 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
981 return Ok(FileRangeBuilder::default());
982 } else {
983 return Err(e);
984 }
985 }
986 };
987
988 let need_compat = !compat::has_same_columns_and_pk_encoding(
989 self.mapper.metadata(),
990 file_range_ctx.read_format().metadata(),
991 );
992 if need_compat {
993 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
996 let mapper = self.mapper.as_flat().unwrap();
997 FlatCompatBatch::try_new(
998 mapper,
999 flat_format.metadata(),
1000 flat_format.format_projection(),
1001 self.compaction,
1002 )?
1003 .map(CompatBatch::Flat)
1004 } else {
1005 let compact_batch = PrimaryKeyCompatBatch::new(
1006 &self.mapper,
1007 file_range_ctx.read_format().metadata().clone(),
1008 )?;
1009 Some(CompatBatch::PrimaryKey(compact_batch))
1010 };
1011 file_range_ctx.set_compat_batch(compat);
1012 }
1013 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1014 }
1015
1016 pub(crate) fn spawn_scan_task(
1018 &self,
1019 mut input: Source,
1020 semaphore: Arc<Semaphore>,
1021 sender: mpsc::Sender<Result<Batch>>,
1022 ) {
1023 common_runtime::spawn_global(async move {
1024 loop {
1025 let maybe_batch = {
1028 let _permit = semaphore.acquire().await.unwrap();
1030 input.next_batch().await
1031 };
1032 match maybe_batch {
1033 Ok(Some(batch)) => {
1034 let _ = sender.send(Ok(batch)).await;
1035 }
1036 Ok(None) => break,
1037 Err(e) => {
1038 let _ = sender.send(Err(e)).await;
1039 break;
1040 }
1041 }
1042 }
1043 });
1044 }
1045
1046 pub(crate) fn create_parallel_flat_sources(
1050 &self,
1051 sources: Vec<BoxedRecordBatchStream>,
1052 semaphore: Arc<Semaphore>,
1053 ) -> Result<Vec<BoxedRecordBatchStream>> {
1054 if sources.len() <= 1 {
1055 return Ok(sources);
1056 }
1057
1058 let sources = sources
1060 .into_iter()
1061 .map(|source| {
1062 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1063 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1064 let stream = Box::pin(ReceiverStream::new(receiver));
1065 Box::pin(stream) as _
1066 })
1067 .collect();
1068 Ok(sources)
1069 }
1070
1071 pub(crate) fn spawn_flat_scan_task(
1073 &self,
1074 mut input: BoxedRecordBatchStream,
1075 semaphore: Arc<Semaphore>,
1076 sender: mpsc::Sender<Result<RecordBatch>>,
1077 ) {
1078 common_runtime::spawn_global(async move {
1079 loop {
1080 let maybe_batch = {
1083 let _permit = semaphore.acquire().await.unwrap();
1085 input.next().await
1086 };
1087 match maybe_batch {
1088 Some(Ok(batch)) => {
1089 let _ = sender.send(Ok(batch)).await;
1090 }
1091 Some(Err(e)) => {
1092 let _ = sender.send(Err(e)).await;
1093 break;
1094 }
1095 None => break,
1096 }
1097 }
1098 });
1099 }
1100
1101 pub(crate) fn total_rows(&self) -> usize {
1102 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1103 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1104
1105 let rows = rows_in_files + rows_in_memtables;
1106 #[cfg(feature = "enterprise")]
1107 let rows = rows
1108 + self
1109 .extension_ranges
1110 .iter()
1111 .map(|x| x.num_rows())
1112 .sum::<u64>() as usize;
1113 rows
1114 }
1115
1116 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1117 &self.predicate
1118 }
1119
1120 pub(crate) fn num_memtables(&self) -> usize {
1122 self.memtables.len()
1123 }
1124
1125 pub(crate) fn num_files(&self) -> usize {
1127 self.files.len()
1128 }
1129
1130 pub fn region_metadata(&self) -> &RegionMetadataRef {
1131 self.mapper.metadata()
1132 }
1133}
1134
1135#[cfg(feature = "enterprise")]
1136impl ScanInput {
1137 #[must_use]
1138 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1139 Self {
1140 extension_ranges,
1141 ..self
1142 }
1143 }
1144
1145 #[cfg(feature = "enterprise")]
1146 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1147 &self.extension_ranges
1148 }
1149
1150 #[cfg(feature = "enterprise")]
1152 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1153 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1154 }
1155}
1156
1157#[cfg(test)]
1158impl ScanInput {
1159 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1161 self.files.iter().map(|file| file.file_id()).collect()
1162 }
1163}
1164
1165fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1166 if append_mode {
1167 return PreFilterMode::All;
1168 }
1169
1170 match merge_mode {
1171 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1172 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1173 }
1174}
1175
1176pub struct StreamContext {
1179 pub input: ScanInput,
1181 pub(crate) ranges: Vec<RangeMeta>,
1183
1184 pub(crate) query_start: Instant,
1187}
1188
1189impl StreamContext {
1190 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1192 let query_start = input.query_start.unwrap_or_else(Instant::now);
1193 let ranges = RangeMeta::seq_scan_ranges(&input);
1194 READ_SST_COUNT.observe(input.num_files() as f64);
1195
1196 Self {
1197 input,
1198 ranges,
1199 query_start,
1200 }
1201 }
1202
1203 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1205 let query_start = input.query_start.unwrap_or_else(Instant::now);
1206 let ranges = RangeMeta::unordered_scan_ranges(&input);
1207 READ_SST_COUNT.observe(input.num_files() as f64);
1208
1209 Self {
1210 input,
1211 ranges,
1212 query_start,
1213 }
1214 }
1215
1216 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1218 self.input.num_memtables() > index.index
1219 }
1220
1221 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1222 !self.is_mem_range_index(index)
1223 && index.index < self.input.num_files() + self.input.num_memtables()
1224 }
1225
1226 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1228 self.ranges
1229 .iter()
1230 .enumerate()
1231 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1232 .collect()
1233 }
1234
1235 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1237 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1238 for range_meta in &self.ranges {
1239 for idx in &range_meta.row_group_indices {
1240 if self.is_mem_range_index(*idx) {
1241 num_mem_ranges += 1;
1242 } else if self.is_file_range_index(*idx) {
1243 num_file_ranges += 1;
1244 } else {
1245 num_other_ranges += 1;
1246 }
1247 }
1248 }
1249 if verbose {
1250 write!(f, "{{")?;
1251 }
1252 write!(
1253 f,
1254 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1255 self.ranges.len(),
1256 num_mem_ranges,
1257 self.input.num_files(),
1258 num_file_ranges,
1259 )?;
1260 if num_other_ranges > 0 {
1261 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1262 }
1263 write!(f, "}}")?;
1264
1265 if let Some(selector) = &self.input.series_row_selector {
1266 write!(f, ", \"selector\":\"{}\"", selector)?;
1267 }
1268 if let Some(distribution) = &self.input.distribution {
1269 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1270 }
1271
1272 if verbose {
1273 self.format_verbose_content(f)?;
1274 }
1275
1276 Ok(())
1277 }
1278
1279 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1280 struct FileWrapper<'a> {
1281 file: &'a FileHandle,
1282 }
1283
1284 impl fmt::Debug for FileWrapper<'_> {
1285 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1286 let (start, end) = self.file.time_range();
1287 write!(
1288 f,
1289 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1290 self.file.file_id(),
1291 start.value(),
1292 start.unit(),
1293 end.value(),
1294 end.unit(),
1295 self.file.num_rows(),
1296 self.file.size(),
1297 self.file.index_size()
1298 )
1299 }
1300 }
1301
1302 struct InputWrapper<'a> {
1303 input: &'a ScanInput,
1304 }
1305
1306 #[cfg(feature = "enterprise")]
1307 impl InputWrapper<'_> {
1308 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1309 if self.input.extension_ranges.is_empty() {
1310 return Ok(());
1311 }
1312
1313 let mut delimiter = "";
1314 write!(f, ", extension_ranges: [")?;
1315 for range in self.input.extension_ranges() {
1316 write!(f, "{}{:?}", delimiter, range)?;
1317 delimiter = ", ";
1318 }
1319 write!(f, "]")?;
1320 Ok(())
1321 }
1322 }
1323
1324 impl fmt::Debug for InputWrapper<'_> {
1325 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1326 let output_schema = self.input.mapper.output_schema();
1327 if !output_schema.is_empty() {
1328 let names: Vec<_> = output_schema
1329 .column_schemas()
1330 .iter()
1331 .map(|col| &col.name)
1332 .collect();
1333 write!(f, ", \"projection\": {:?}", names)?;
1334 }
1335 if let Some(predicate) = &self.input.predicate.predicate()
1336 && !predicate.exprs().is_empty()
1337 {
1338 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1339 write!(f, ", \"filters\": {:?}", exprs)?;
1340 }
1341 if !self.input.files.is_empty() {
1342 write!(f, ", \"files\": ")?;
1343 f.debug_list()
1344 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1345 .finish()?;
1346 }
1347
1348 #[cfg(feature = "enterprise")]
1349 self.format_extension_ranges(f)?;
1350
1351 Ok(())
1352 }
1353 }
1354
1355 write!(f, "{:?}", InputWrapper { input: &self.input })
1356 }
1357}
1358
1359#[derive(Clone, Default)]
1362pub struct PredicateGroup {
1363 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1364 predicate_all: Option<Predicate>,
1366 predicate_without_region: Option<Predicate>,
1368 region_partition_expr: Option<PartitionExpr>,
1370}
1371
1372impl PredicateGroup {
1373 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1375 let mut combined_exprs = exprs.to_vec();
1376 let mut region_partition_expr = None;
1377
1378 if let Some(expr_json) = metadata.partition_expr.as_ref()
1379 && !expr_json.is_empty()
1380 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1381 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1382 {
1383 let logical_expr = expr
1384 .try_as_logical_expr()
1385 .context(InvalidPartitionExprSnafu {
1386 expr: expr_json.clone(),
1387 })?;
1388
1389 combined_exprs.push(logical_expr);
1390 region_partition_expr = Some(expr);
1391 }
1392
1393 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1394 let mut columns = HashSet::new();
1396 for expr in &combined_exprs {
1397 columns.clear();
1398 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1399 continue;
1400 };
1401 time_filters.push(filter);
1402 }
1403 let time_filters = if time_filters.is_empty() {
1404 None
1405 } else {
1406 Some(Arc::new(time_filters))
1407 };
1408
1409 let predicate_all = if combined_exprs.is_empty() {
1410 None
1411 } else {
1412 Some(Predicate::new(combined_exprs))
1413 };
1414 let predicate_without_region = if exprs.is_empty() {
1415 None
1416 } else {
1417 Some(Predicate::new(exprs.to_vec()))
1418 };
1419
1420 Ok(Self {
1421 time_filters,
1422 predicate_all,
1423 predicate_without_region,
1424 region_partition_expr,
1425 })
1426 }
1427
1428 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1430 self.time_filters.clone()
1431 }
1432
1433 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1435 self.predicate_all.as_ref()
1436 }
1437
1438 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1440 self.predicate_without_region.as_ref()
1441 }
1442
1443 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1445 self.region_partition_expr.as_ref()
1446 }
1447
1448 fn expr_to_filter(
1449 expr: &Expr,
1450 metadata: &RegionMetadata,
1451 columns: &mut HashSet<Column>,
1452 ) -> Option<SimpleFilterEvaluator> {
1453 columns.clear();
1454 expr_to_columns(expr, columns).ok()?;
1457 if columns.len() > 1 {
1458 return None;
1460 }
1461 let column = columns.iter().next()?;
1462 let column_meta = metadata.column_by_name(&column.name)?;
1463 if column_meta.semantic_type == SemanticType::Timestamp {
1464 SimpleFilterEvaluator::try_new(expr)
1465 } else {
1466 None
1467 }
1468 }
1469}