1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::ResultExt;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40 RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73use crate::sst::parquet::file_range::PreFilterMode;
74use crate::sst::parquet::reader::ReaderMetrics;
75
76const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
78
79pub(crate) enum Scanner {
81 Seq(SeqScan),
83 Unordered(UnorderedScan),
85 Series(SeriesScan),
87}
88
89impl Scanner {
90 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
92 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93 match self {
94 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
95 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
96 Scanner::Series(series_scan) => series_scan.build_stream().await,
97 }
98 }
99
100 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
102 match self {
103 Scanner::Seq(x) => x.scan_all_partitions(),
104 Scanner::Unordered(x) => x.scan_all_partitions(),
105 Scanner::Series(x) => x.scan_all_partitions(),
106 }
107 }
108}
109
110#[cfg(test)]
111impl Scanner {
112 pub(crate) fn num_files(&self) -> usize {
114 match self {
115 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
116 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
117 Scanner::Series(series_scan) => series_scan.input().num_files(),
118 }
119 }
120
121 pub(crate) fn num_memtables(&self) -> usize {
123 match self {
124 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
125 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
126 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
127 }
128 }
129
130 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
132 match self {
133 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
134 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
135 Scanner::Series(series_scan) => series_scan.input().file_ids(),
136 }
137 }
138
139 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
140 match self {
141 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
142 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
143 Scanner::Series(series_scan) => series_scan.input().index_ids(),
144 }
145 }
146
147 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
149 use store_api::region_engine::{PrepareRequest, RegionScanner};
150
151 let request = PrepareRequest::default().with_target_partitions(target_partitions);
152 match self {
153 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
154 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
155 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
156 }
157 }
158}
159
160#[cfg_attr(doc, aquamarine::aquamarine)]
161pub(crate) struct ScanRegion {
211 version: VersionRef,
213 access_layer: AccessLayerRef,
215 request: ScanRequest,
217 cache_strategy: CacheStrategy,
219 parallel_scan_channel_size: usize,
221 max_concurrent_scan_files: usize,
223 ignore_inverted_index: bool,
225 ignore_fulltext_index: bool,
227 ignore_bloom_filter: bool,
229 start_time: Option<Instant>,
231 filter_deleted: bool,
234 #[cfg(feature = "enterprise")]
235 extension_range_provider: Option<BoxedExtensionRangeProvider>,
236}
237
238impl ScanRegion {
239 pub(crate) fn new(
241 version: VersionRef,
242 access_layer: AccessLayerRef,
243 request: ScanRequest,
244 cache_strategy: CacheStrategy,
245 ) -> ScanRegion {
246 ScanRegion {
247 version,
248 access_layer,
249 request,
250 cache_strategy,
251 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
252 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
253 ignore_inverted_index: false,
254 ignore_fulltext_index: false,
255 ignore_bloom_filter: false,
256 start_time: None,
257 filter_deleted: true,
258 #[cfg(feature = "enterprise")]
259 extension_range_provider: None,
260 }
261 }
262
263 #[must_use]
265 pub(crate) fn with_parallel_scan_channel_size(
266 mut self,
267 parallel_scan_channel_size: usize,
268 ) -> Self {
269 self.parallel_scan_channel_size = parallel_scan_channel_size;
270 self
271 }
272
273 #[must_use]
275 pub(crate) fn with_max_concurrent_scan_files(
276 mut self,
277 max_concurrent_scan_files: usize,
278 ) -> Self {
279 self.max_concurrent_scan_files = max_concurrent_scan_files;
280 self
281 }
282
283 #[must_use]
285 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
286 self.ignore_inverted_index = ignore;
287 self
288 }
289
290 #[must_use]
292 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
293 self.ignore_fulltext_index = ignore;
294 self
295 }
296
297 #[must_use]
299 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
300 self.ignore_bloom_filter = ignore;
301 self
302 }
303
304 #[must_use]
305 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
306 self.start_time = Some(now);
307 self
308 }
309
310 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
311 self.filter_deleted = filter_deleted;
312 }
313
314 #[cfg(feature = "enterprise")]
315 pub(crate) fn set_extension_range_provider(
316 &mut self,
317 extension_range_provider: BoxedExtensionRangeProvider,
318 ) {
319 self.extension_range_provider = Some(extension_range_provider);
320 }
321
322 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
324 pub(crate) async fn scanner(self) -> Result<Scanner> {
325 if self.use_series_scan() {
326 self.series_scan().await.map(Scanner::Series)
327 } else if self.use_unordered_scan() {
328 self.unordered_scan().await.map(Scanner::Unordered)
331 } else {
332 self.seq_scan().await.map(Scanner::Seq)
333 }
334 }
335
336 #[tracing::instrument(
338 level = tracing::Level::DEBUG,
339 skip_all,
340 fields(region_id = %self.region_id())
341 )]
342 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
343 if self.use_series_scan() {
344 self.series_scan()
345 .await
346 .map(|scanner| Box::new(scanner) as _)
347 } else if self.use_unordered_scan() {
348 self.unordered_scan()
349 .await
350 .map(|scanner| Box::new(scanner) as _)
351 } else {
352 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
353 }
354 }
355
356 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
358 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
359 let input = self.scan_input().await?.with_compaction(false);
360 Ok(SeqScan::new(input))
361 }
362
363 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
365 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
366 let input = self.scan_input().await?;
367 Ok(UnorderedScan::new(input))
368 }
369
370 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
372 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
373 let input = self.scan_input().await?;
374 Ok(SeriesScan::new(input))
375 }
376
377 fn use_unordered_scan(&self) -> bool {
379 self.version.options.append_mode
386 && self.request.series_row_selector.is_none()
387 && (self.request.distribution.is_none()
388 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
389 }
390
391 fn use_series_scan(&self) -> bool {
393 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
394 }
395
396 fn use_flat_format(&self) -> bool {
398 self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
399 }
400
401 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
403 async fn scan_input(mut self) -> Result<ScanInput> {
404 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
405 let time_range = self.build_time_range_predicate();
406 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
407 let flat_format = self.use_flat_format();
408
409 let mapper = match &self.request.projection {
411 Some(p) => {
412 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
413 }
414 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
415 };
416
417 let ssts = &self.version.ssts;
418 let mut files = Vec::new();
419 for level in ssts.levels() {
420 for file in level.files.values() {
421 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
422 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
423 (Some(_), None) => true,
429 (None, _) => true,
430 };
431
432 if exceed_min_sequence && file_in_range(file, &time_range) {
434 files.push(file.clone());
435 }
436 }
440 }
441
442 let memtables = self.version.memtables.list_memtables();
443 let mut mem_range_builders = Vec::new();
445 let filter_mode = pre_filter_mode(
446 self.version.options.append_mode,
447 self.version.options.merge_mode(),
448 );
449
450 for m in memtables {
451 let Some((start, end)) = m.stats().time_range() else {
453 continue;
454 };
455 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
457 if !memtable_range.intersects(&time_range) {
458 continue;
459 }
460 let ranges_in_memtable = m.ranges(
461 Some(mapper.column_ids()),
462 RangesOptions::default()
463 .with_predicate(predicate.clone())
464 .with_sequence(SequenceRange::new(
465 self.request.memtable_min_sequence,
466 self.request.memtable_max_sequence,
467 ))
468 .with_pre_filter_mode(filter_mode),
469 )?;
470 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
471 let stats = v.stats().clone();
472 MemRangeBuilder::new(v, stats)
473 }));
474 }
475
476 let region_id = self.region_id();
477 debug!(
478 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
479 region_id,
480 self.request,
481 time_range,
482 mem_range_builders.len(),
483 files.len(),
484 self.version.options.append_mode,
485 flat_format,
486 );
487
488 let (non_field_filters, field_filters) = self.partition_by_field_filters();
489 let inverted_index_appliers = [
490 self.build_invereted_index_applier(&non_field_filters),
491 self.build_invereted_index_applier(&field_filters),
492 ];
493 let bloom_filter_appliers = [
494 self.build_bloom_filter_applier(&non_field_filters),
495 self.build_bloom_filter_applier(&field_filters),
496 ];
497 let fulltext_index_appliers = [
498 self.build_fulltext_index_applier(&non_field_filters),
499 self.build_fulltext_index_applier(&field_filters),
500 ];
501 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
502
503 if flat_format {
504 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
506 }
507
508 let input = ScanInput::new(self.access_layer, mapper)
509 .with_time_range(Some(time_range))
510 .with_predicate(predicate)
511 .with_memtables(mem_range_builders)
512 .with_files(files)
513 .with_cache(self.cache_strategy)
514 .with_inverted_index_appliers(inverted_index_appliers)
515 .with_bloom_filter_index_appliers(bloom_filter_appliers)
516 .with_fulltext_index_appliers(fulltext_index_appliers)
517 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
518 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
519 .with_start_time(self.start_time)
520 .with_append_mode(self.version.options.append_mode)
521 .with_filter_deleted(self.filter_deleted)
522 .with_merge_mode(self.version.options.merge_mode())
523 .with_series_row_selector(self.request.series_row_selector)
524 .with_distribution(self.request.distribution)
525 .with_flat_format(flat_format);
526
527 #[cfg(feature = "enterprise")]
528 let input = if let Some(provider) = self.extension_range_provider {
529 let ranges = provider
530 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
531 .await?;
532 debug!("Find extension ranges: {ranges:?}");
533 input.with_extension_ranges(ranges)
534 } else {
535 input
536 };
537 Ok(input)
538 }
539
540 fn region_id(&self) -> RegionId {
541 self.version.metadata.region_id
542 }
543
544 fn build_time_range_predicate(&self) -> TimestampRange {
546 let time_index = self.version.metadata.time_index_column();
547 let unit = time_index
548 .column_schema
549 .data_type
550 .as_timestamp()
551 .expect("Time index must have timestamp-compatible type")
552 .unit();
553 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
554 }
555
556 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
559 let field_columns = self
560 .version
561 .metadata
562 .field_columns()
563 .map(|col| &col.column_schema.name)
564 .collect::<HashSet<_>>();
565
566 let mut columns = HashSet::new();
567
568 self.request.filters.iter().cloned().partition(|expr| {
569 columns.clear();
570 if expr_to_columns(expr, &mut columns).is_err() {
572 return true;
574 }
575 !columns
577 .iter()
578 .any(|column| field_columns.contains(&column.name))
579 })
580 }
581
582 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
584 if self.ignore_inverted_index {
585 return None;
586 }
587
588 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
589 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
590
591 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
592
593 InvertedIndexApplierBuilder::new(
594 self.access_layer.table_dir().to_string(),
595 self.access_layer.path_type(),
596 self.access_layer.object_store().clone(),
597 self.version.metadata.as_ref(),
598 self.version.metadata.inverted_indexed_column_ids(
599 self.version
600 .options
601 .index_options
602 .inverted_index
603 .ignore_column_ids
604 .iter(),
605 ),
606 self.access_layer.puffin_manager_factory().clone(),
607 )
608 .with_file_cache(file_cache)
609 .with_inverted_index_cache(inverted_index_cache)
610 .with_puffin_metadata_cache(puffin_metadata_cache)
611 .build(filters)
612 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
613 .ok()
614 .flatten()
615 .map(Arc::new)
616 }
617
618 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
620 if self.ignore_bloom_filter {
621 return None;
622 }
623
624 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
625 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
626 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
627
628 BloomFilterIndexApplierBuilder::new(
629 self.access_layer.table_dir().to_string(),
630 self.access_layer.path_type(),
631 self.access_layer.object_store().clone(),
632 self.version.metadata.as_ref(),
633 self.access_layer.puffin_manager_factory().clone(),
634 )
635 .with_file_cache(file_cache)
636 .with_bloom_filter_index_cache(bloom_filter_index_cache)
637 .with_puffin_metadata_cache(puffin_metadata_cache)
638 .build(filters)
639 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
640 .ok()
641 .flatten()
642 .map(Arc::new)
643 }
644
645 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
647 if self.ignore_fulltext_index {
648 return None;
649 }
650
651 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
652 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
653 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
654 FulltextIndexApplierBuilder::new(
655 self.access_layer.table_dir().to_string(),
656 self.access_layer.path_type(),
657 self.access_layer.object_store().clone(),
658 self.access_layer.puffin_manager_factory().clone(),
659 self.version.metadata.as_ref(),
660 )
661 .with_file_cache(file_cache)
662 .with_puffin_metadata_cache(puffin_metadata_cache)
663 .with_bloom_filter_cache(bloom_filter_index_cache)
664 .build(filters)
665 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
666 .ok()
667 .flatten()
668 .map(Arc::new)
669 }
670}
671
672fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
674 if predicate == &TimestampRange::min_to_max() {
675 return true;
676 }
677 let (start, end) = file.time_range();
679 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
680 file_ts_range.intersects(predicate)
681}
682
683pub struct ScanInput {
685 access_layer: AccessLayerRef,
687 pub(crate) mapper: Arc<ProjectionMapper>,
689 time_range: Option<TimestampRange>,
691 pub(crate) predicate: PredicateGroup,
693 region_partition_expr: Option<PartitionExpr>,
695 pub(crate) memtables: Vec<MemRangeBuilder>,
697 pub(crate) files: Vec<FileHandle>,
699 pub(crate) cache_strategy: CacheStrategy,
701 ignore_file_not_found: bool,
703 pub(crate) parallel_scan_channel_size: usize,
705 pub(crate) max_concurrent_scan_files: usize,
707 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
709 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
710 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
711 pub(crate) query_start: Option<Instant>,
713 pub(crate) append_mode: bool,
715 pub(crate) filter_deleted: bool,
717 pub(crate) merge_mode: MergeMode,
719 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
721 pub(crate) distribution: Option<TimeSeriesDistribution>,
723 pub(crate) flat_format: bool,
725 pub(crate) compaction: bool,
727 #[cfg(feature = "enterprise")]
728 extension_ranges: Vec<BoxedExtensionRange>,
729}
730
731impl ScanInput {
732 #[must_use]
734 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
735 ScanInput {
736 access_layer,
737 mapper: Arc::new(mapper),
738 time_range: None,
739 predicate: PredicateGroup::default(),
740 region_partition_expr: None,
741 memtables: Vec::new(),
742 files: Vec::new(),
743 cache_strategy: CacheStrategy::Disabled,
744 ignore_file_not_found: false,
745 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
746 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
747 inverted_index_appliers: [None, None],
748 bloom_filter_index_appliers: [None, None],
749 fulltext_index_appliers: [None, None],
750 query_start: None,
751 append_mode: false,
752 filter_deleted: true,
753 merge_mode: MergeMode::default(),
754 series_row_selector: None,
755 distribution: None,
756 flat_format: false,
757 compaction: false,
758 #[cfg(feature = "enterprise")]
759 extension_ranges: Vec::new(),
760 }
761 }
762
763 #[must_use]
765 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
766 self.time_range = time_range;
767 self
768 }
769
770 #[must_use]
772 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
773 self.region_partition_expr = predicate.region_partition_expr().cloned();
774 self.predicate = predicate;
775 self
776 }
777
778 #[must_use]
780 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
781 self.memtables = memtables;
782 self
783 }
784
785 #[must_use]
787 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
788 self.files = files;
789 self
790 }
791
792 #[must_use]
794 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
795 self.cache_strategy = cache;
796 self
797 }
798
799 #[must_use]
801 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
802 self.ignore_file_not_found = ignore;
803 self
804 }
805
806 #[must_use]
808 pub(crate) fn with_parallel_scan_channel_size(
809 mut self,
810 parallel_scan_channel_size: usize,
811 ) -> Self {
812 self.parallel_scan_channel_size = parallel_scan_channel_size;
813 self
814 }
815
816 #[must_use]
818 pub(crate) fn with_max_concurrent_scan_files(
819 mut self,
820 max_concurrent_scan_files: usize,
821 ) -> Self {
822 self.max_concurrent_scan_files = max_concurrent_scan_files;
823 self
824 }
825
826 #[must_use]
828 pub(crate) fn with_inverted_index_appliers(
829 mut self,
830 appliers: [Option<InvertedIndexApplierRef>; 2],
831 ) -> Self {
832 self.inverted_index_appliers = appliers;
833 self
834 }
835
836 #[must_use]
838 pub(crate) fn with_bloom_filter_index_appliers(
839 mut self,
840 appliers: [Option<BloomFilterIndexApplierRef>; 2],
841 ) -> Self {
842 self.bloom_filter_index_appliers = appliers;
843 self
844 }
845
846 #[must_use]
848 pub(crate) fn with_fulltext_index_appliers(
849 mut self,
850 appliers: [Option<FulltextIndexApplierRef>; 2],
851 ) -> Self {
852 self.fulltext_index_appliers = appliers;
853 self
854 }
855
856 #[must_use]
858 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
859 self.query_start = now;
860 self
861 }
862
863 #[must_use]
864 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
865 self.append_mode = is_append_mode;
866 self
867 }
868
869 #[must_use]
871 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
872 self.filter_deleted = filter_deleted;
873 self
874 }
875
876 #[must_use]
878 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
879 self.merge_mode = merge_mode;
880 self
881 }
882
883 #[must_use]
885 pub(crate) fn with_distribution(
886 mut self,
887 distribution: Option<TimeSeriesDistribution>,
888 ) -> Self {
889 self.distribution = distribution;
890 self
891 }
892
893 #[must_use]
895 pub(crate) fn with_series_row_selector(
896 mut self,
897 series_row_selector: Option<TimeSeriesRowSelector>,
898 ) -> Self {
899 self.series_row_selector = series_row_selector;
900 self
901 }
902
903 #[must_use]
905 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
906 self.flat_format = flat_format;
907 self
908 }
909
910 #[must_use]
912 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
913 self.compaction = compaction;
914 self
915 }
916
917 #[tracing::instrument(
921 skip(self, sources, semaphore),
922 fields(
923 region_id = %self.region_metadata().region_id,
924 source_count = sources.len()
925 )
926 )]
927 pub(crate) fn create_parallel_sources(
928 &self,
929 sources: Vec<Source>,
930 semaphore: Arc<Semaphore>,
931 ) -> Result<Vec<Source>> {
932 if sources.len() <= 1 {
933 return Ok(sources);
934 }
935
936 let sources = sources
938 .into_iter()
939 .map(|source| {
940 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
941 self.spawn_scan_task(source, semaphore.clone(), sender);
942 let stream = Box::pin(ReceiverStream::new(receiver));
943 Source::Stream(stream)
944 })
945 .collect();
946 Ok(sources)
947 }
948
949 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
951 let memtable = &self.memtables[index.index];
952 let mut ranges = SmallVec::new();
953 memtable.build_ranges(index.row_group_index, &mut ranges);
954 ranges
955 }
956
957 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
958 if self.should_skip_region_partition(file) {
959 self.predicate.predicate_without_region().cloned()
960 } else {
961 self.predicate.predicate().cloned()
962 }
963 }
964
965 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
966 match (
967 self.region_partition_expr.as_ref(),
968 file.meta_ref().partition_expr.as_ref(),
969 ) {
970 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
971 _ => false,
972 }
973 }
974
975 #[tracing::instrument(
977 skip_all,
978 fields(
979 region_id = %self.region_metadata().region_id,
980 file_id = %file.file_id()
981 )
982 )]
983 pub async fn prune_file(
984 &self,
985 file: &FileHandle,
986 reader_metrics: &mut ReaderMetrics,
987 ) -> Result<FileRangeBuilder> {
988 let predicate = self.predicate_for_file(file);
989 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
990 let decode_pk_values = !self.compaction && self.mapper.has_tags();
991 let res = self
992 .access_layer
993 .read_sst(file.clone())
994 .predicate(predicate)
995 .projection(Some(self.mapper.column_ids().to_vec()))
996 .cache(self.cache_strategy.clone())
997 .inverted_index_appliers(self.inverted_index_appliers.clone())
998 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
999 .fulltext_index_appliers(self.fulltext_index_appliers.clone())
1000 .expected_metadata(Some(self.mapper.metadata().clone()))
1001 .flat_format(self.flat_format)
1002 .compaction(self.compaction)
1003 .pre_filter_mode(filter_mode)
1004 .decode_primary_key_values(decode_pk_values)
1005 .build_reader_input(reader_metrics)
1006 .await;
1007 let (mut file_range_ctx, selection) = match res {
1008 Ok(x) => x,
1009 Err(e) => {
1010 if e.is_object_not_found() && self.ignore_file_not_found {
1011 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1012 return Ok(FileRangeBuilder::default());
1013 } else {
1014 return Err(e);
1015 }
1016 }
1017 };
1018
1019 let need_compat = !compat::has_same_columns_and_pk_encoding(
1020 self.mapper.metadata(),
1021 file_range_ctx.read_format().metadata(),
1022 );
1023 if need_compat {
1024 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1027 let mapper = self.mapper.as_flat().unwrap();
1028 FlatCompatBatch::try_new(
1029 mapper,
1030 flat_format.metadata(),
1031 flat_format.format_projection(),
1032 self.compaction,
1033 )?
1034 .map(CompatBatch::Flat)
1035 } else {
1036 let compact_batch = PrimaryKeyCompatBatch::new(
1037 &self.mapper,
1038 file_range_ctx.read_format().metadata().clone(),
1039 )?;
1040 Some(CompatBatch::PrimaryKey(compact_batch))
1041 };
1042 file_range_ctx.set_compat_batch(compat);
1043 }
1044 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1045 }
1046
1047 #[tracing::instrument(
1049 skip(self, input, semaphore, sender),
1050 fields(region_id = %self.region_metadata().region_id)
1051 )]
1052 pub(crate) fn spawn_scan_task(
1053 &self,
1054 mut input: Source,
1055 semaphore: Arc<Semaphore>,
1056 sender: mpsc::Sender<Result<Batch>>,
1057 ) {
1058 let region_id = self.region_metadata().region_id;
1059 let span = tracing::info_span!(
1060 "ScanInput::parallel_scan_task",
1061 region_id = %region_id,
1062 stream_kind = "batch"
1063 );
1064 common_runtime::spawn_global(
1065 async move {
1066 loop {
1067 let maybe_batch = {
1070 let _permit = semaphore.acquire().await.unwrap();
1072 input.next_batch().await
1073 };
1074 match maybe_batch {
1075 Ok(Some(batch)) => {
1076 let _ = sender.send(Ok(batch)).await;
1077 }
1078 Ok(None) => break,
1079 Err(e) => {
1080 let _ = sender.send(Err(e)).await;
1081 break;
1082 }
1083 }
1084 }
1085 }
1086 .instrument(span),
1087 );
1088 }
1089
1090 #[tracing::instrument(
1094 skip(self, sources, semaphore),
1095 fields(
1096 region_id = %self.region_metadata().region_id,
1097 source_count = sources.len()
1098 )
1099 )]
1100 pub(crate) fn create_parallel_flat_sources(
1101 &self,
1102 sources: Vec<BoxedRecordBatchStream>,
1103 semaphore: Arc<Semaphore>,
1104 ) -> Result<Vec<BoxedRecordBatchStream>> {
1105 if sources.len() <= 1 {
1106 return Ok(sources);
1107 }
1108
1109 let sources = sources
1111 .into_iter()
1112 .map(|source| {
1113 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1114 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1115 let stream = Box::pin(ReceiverStream::new(receiver));
1116 Box::pin(stream) as _
1117 })
1118 .collect();
1119 Ok(sources)
1120 }
1121
1122 #[tracing::instrument(
1124 skip(self, input, semaphore, sender),
1125 fields(region_id = %self.region_metadata().region_id)
1126 )]
1127 pub(crate) fn spawn_flat_scan_task(
1128 &self,
1129 mut input: BoxedRecordBatchStream,
1130 semaphore: Arc<Semaphore>,
1131 sender: mpsc::Sender<Result<RecordBatch>>,
1132 ) {
1133 let region_id = self.region_metadata().region_id;
1134 let span = tracing::info_span!(
1135 "ScanInput::parallel_scan_task",
1136 region_id = %region_id,
1137 stream_kind = "flat"
1138 );
1139 common_runtime::spawn_global(
1140 async move {
1141 loop {
1142 let maybe_batch = {
1145 let _permit = semaphore.acquire().await.unwrap();
1147 input.next().await
1148 };
1149 match maybe_batch {
1150 Some(Ok(batch)) => {
1151 let _ = sender.send(Ok(batch)).await;
1152 }
1153 Some(Err(e)) => {
1154 let _ = sender.send(Err(e)).await;
1155 break;
1156 }
1157 None => break,
1158 }
1159 }
1160 }
1161 .instrument(span),
1162 );
1163 }
1164
1165 pub(crate) fn total_rows(&self) -> usize {
1166 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1167 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1168
1169 let rows = rows_in_files + rows_in_memtables;
1170 #[cfg(feature = "enterprise")]
1171 let rows = rows
1172 + self
1173 .extension_ranges
1174 .iter()
1175 .map(|x| x.num_rows())
1176 .sum::<u64>() as usize;
1177 rows
1178 }
1179
1180 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1181 &self.predicate
1182 }
1183
1184 pub(crate) fn num_memtables(&self) -> usize {
1186 self.memtables.len()
1187 }
1188
1189 pub(crate) fn num_files(&self) -> usize {
1191 self.files.len()
1192 }
1193
1194 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1196 let file_index = index.index - self.num_memtables();
1197 &self.files[file_index]
1198 }
1199
1200 pub fn region_metadata(&self) -> &RegionMetadataRef {
1201 self.mapper.metadata()
1202 }
1203}
1204
1205#[cfg(feature = "enterprise")]
1206impl ScanInput {
1207 #[must_use]
1208 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1209 Self {
1210 extension_ranges,
1211 ..self
1212 }
1213 }
1214
1215 #[cfg(feature = "enterprise")]
1216 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1217 &self.extension_ranges
1218 }
1219
1220 #[cfg(feature = "enterprise")]
1222 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1223 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1224 }
1225}
1226
1227#[cfg(test)]
1228impl ScanInput {
1229 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1231 self.files.iter().map(|file| file.file_id()).collect()
1232 }
1233
1234 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1235 self.files.iter().map(|file| file.index_id()).collect()
1236 }
1237}
1238
1239fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1240 if append_mode {
1241 return PreFilterMode::All;
1242 }
1243
1244 match merge_mode {
1245 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1246 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1247 }
1248}
1249
1250pub struct StreamContext {
1253 pub input: ScanInput,
1255 pub(crate) ranges: Vec<RangeMeta>,
1257
1258 pub(crate) query_start: Instant,
1261}
1262
1263impl StreamContext {
1264 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1266 let query_start = input.query_start.unwrap_or_else(Instant::now);
1267 let ranges = RangeMeta::seq_scan_ranges(&input);
1268 READ_SST_COUNT.observe(input.num_files() as f64);
1269
1270 Self {
1271 input,
1272 ranges,
1273 query_start,
1274 }
1275 }
1276
1277 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1279 let query_start = input.query_start.unwrap_or_else(Instant::now);
1280 let ranges = RangeMeta::unordered_scan_ranges(&input);
1281 READ_SST_COUNT.observe(input.num_files() as f64);
1282
1283 Self {
1284 input,
1285 ranges,
1286 query_start,
1287 }
1288 }
1289
1290 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1292 self.input.num_memtables() > index.index
1293 }
1294
1295 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1296 !self.is_mem_range_index(index)
1297 && index.index < self.input.num_files() + self.input.num_memtables()
1298 }
1299
1300 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1302 self.ranges
1303 .iter()
1304 .enumerate()
1305 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1306 .collect()
1307 }
1308
1309 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1311 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1312 for range_meta in &self.ranges {
1313 for idx in &range_meta.row_group_indices {
1314 if self.is_mem_range_index(*idx) {
1315 num_mem_ranges += 1;
1316 } else if self.is_file_range_index(*idx) {
1317 num_file_ranges += 1;
1318 } else {
1319 num_other_ranges += 1;
1320 }
1321 }
1322 }
1323 if verbose {
1324 write!(f, "{{")?;
1325 }
1326 write!(
1327 f,
1328 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1329 self.ranges.len(),
1330 num_mem_ranges,
1331 self.input.num_files(),
1332 num_file_ranges,
1333 )?;
1334 if num_other_ranges > 0 {
1335 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1336 }
1337 write!(f, "}}")?;
1338
1339 if let Some(selector) = &self.input.series_row_selector {
1340 write!(f, ", \"selector\":\"{}\"", selector)?;
1341 }
1342 if let Some(distribution) = &self.input.distribution {
1343 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1344 }
1345
1346 if verbose {
1347 self.format_verbose_content(f)?;
1348 }
1349
1350 Ok(())
1351 }
1352
1353 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1354 struct FileWrapper<'a> {
1355 file: &'a FileHandle,
1356 }
1357
1358 impl fmt::Debug for FileWrapper<'_> {
1359 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1360 let (start, end) = self.file.time_range();
1361 write!(
1362 f,
1363 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1364 self.file.file_id(),
1365 start.value(),
1366 start.unit(),
1367 end.value(),
1368 end.unit(),
1369 self.file.num_rows(),
1370 self.file.size(),
1371 self.file.index_size()
1372 )
1373 }
1374 }
1375
1376 struct InputWrapper<'a> {
1377 input: &'a ScanInput,
1378 }
1379
1380 #[cfg(feature = "enterprise")]
1381 impl InputWrapper<'_> {
1382 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1383 if self.input.extension_ranges.is_empty() {
1384 return Ok(());
1385 }
1386
1387 let mut delimiter = "";
1388 write!(f, ", extension_ranges: [")?;
1389 for range in self.input.extension_ranges() {
1390 write!(f, "{}{:?}", delimiter, range)?;
1391 delimiter = ", ";
1392 }
1393 write!(f, "]")?;
1394 Ok(())
1395 }
1396 }
1397
1398 impl fmt::Debug for InputWrapper<'_> {
1399 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1400 let output_schema = self.input.mapper.output_schema();
1401 if !output_schema.is_empty() {
1402 let names: Vec<_> = output_schema
1403 .column_schemas()
1404 .iter()
1405 .map(|col| &col.name)
1406 .collect();
1407 write!(f, ", \"projection\": {:?}", names)?;
1408 }
1409 if let Some(predicate) = &self.input.predicate.predicate()
1410 && !predicate.exprs().is_empty()
1411 {
1412 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1413 write!(f, ", \"filters\": {:?}", exprs)?;
1414 }
1415 if !self.input.files.is_empty() {
1416 write!(f, ", \"files\": ")?;
1417 f.debug_list()
1418 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1419 .finish()?;
1420 }
1421
1422 #[cfg(feature = "enterprise")]
1423 self.format_extension_ranges(f)?;
1424
1425 Ok(())
1426 }
1427 }
1428
1429 write!(f, "{:?}", InputWrapper { input: &self.input })
1430 }
1431}
1432
1433#[derive(Clone, Default)]
1436pub struct PredicateGroup {
1437 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1438 predicate_all: Option<Predicate>,
1440 predicate_without_region: Option<Predicate>,
1442 region_partition_expr: Option<PartitionExpr>,
1444}
1445
1446impl PredicateGroup {
1447 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1449 let mut combined_exprs = exprs.to_vec();
1450 let mut region_partition_expr = None;
1451
1452 if let Some(expr_json) = metadata.partition_expr.as_ref()
1453 && !expr_json.is_empty()
1454 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1455 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1456 {
1457 let logical_expr = expr
1458 .try_as_logical_expr()
1459 .context(InvalidPartitionExprSnafu {
1460 expr: expr_json.clone(),
1461 })?;
1462
1463 combined_exprs.push(logical_expr);
1464 region_partition_expr = Some(expr);
1465 }
1466
1467 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1468 let mut columns = HashSet::new();
1470 for expr in &combined_exprs {
1471 columns.clear();
1472 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1473 continue;
1474 };
1475 time_filters.push(filter);
1476 }
1477 let time_filters = if time_filters.is_empty() {
1478 None
1479 } else {
1480 Some(Arc::new(time_filters))
1481 };
1482
1483 let predicate_all = if combined_exprs.is_empty() {
1484 None
1485 } else {
1486 Some(Predicate::new(combined_exprs))
1487 };
1488 let predicate_without_region = if exprs.is_empty() {
1489 None
1490 } else {
1491 Some(Predicate::new(exprs.to_vec()))
1492 };
1493
1494 Ok(Self {
1495 time_filters,
1496 predicate_all,
1497 predicate_without_region,
1498 region_partition_expr,
1499 })
1500 }
1501
1502 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1504 self.time_filters.clone()
1505 }
1506
1507 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1509 self.predicate_all.as_ref()
1510 }
1511
1512 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1514 self.predicate_without_region.as_ref()
1515 }
1516
1517 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1519 self.region_partition_expr.as_ref()
1520 }
1521
1522 fn expr_to_filter(
1523 expr: &Expr,
1524 metadata: &RegionMetadata,
1525 columns: &mut HashSet<Column>,
1526 ) -> Option<SimpleFilterEvaluator> {
1527 columns.clear();
1528 expr_to_columns(expr, columns).ok()?;
1531 if columns.len() > 1 {
1532 return None;
1534 }
1535 let column = columns.iter().next()?;
1536 let column_meta = metadata.column_by_name(&column.name)?;
1537 if column_meta.semantic_type == SemanticType::Timestamp {
1538 SimpleFilterEvaluator::try_new(expr)
1539 } else {
1540 None
1541 }
1542 }
1543}