1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{
39 RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
40};
41use table::predicate::{Predicate, build_time_range_predicate};
42use tokio::sync::{Semaphore, mpsc};
43use tokio_stream::wrappers::ReceiverStream;
44
45use crate::access_layer::AccessLayerRef;
46use crate::cache::CacheStrategy;
47use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
48use crate::error::{InvalidPartitionExprSnafu, Result};
49#[cfg(feature = "enterprise")]
50use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
51use crate::memtable::{MemtableRange, RangesOptions};
52use crate::metrics::READ_SST_COUNT;
53use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
54use crate::read::projection::ProjectionMapper;
55use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
56use crate::read::seq_scan::SeqScan;
57use crate::read::series_scan::SeriesScan;
58use crate::read::stream::ScanBatchStream;
59use crate::read::unordered_scan::UnorderedScan;
60use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
61use crate::region::options::MergeMode;
62use crate::region::version::VersionRef;
63use crate::sst::FormatType;
64use crate::sst::file::FileHandle;
65use crate::sst::index::bloom_filter::applier::{
66 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
67};
68use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
69use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
70use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
71use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
72use crate::sst::parquet::file_range::PreFilterMode;
73use crate::sst::parquet::reader::ReaderMetrics;
74
75const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
77
78pub(crate) enum Scanner {
80 Seq(SeqScan),
82 Unordered(UnorderedScan),
84 Series(SeriesScan),
86}
87
88impl Scanner {
89 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
91 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
92 match self {
93 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
94 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
95 Scanner::Series(series_scan) => series_scan.build_stream().await,
96 }
97 }
98
99 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
101 match self {
102 Scanner::Seq(x) => x.scan_all_partitions(),
103 Scanner::Unordered(x) => x.scan_all_partitions(),
104 Scanner::Series(x) => x.scan_all_partitions(),
105 }
106 }
107}
108
109#[cfg(test)]
110impl Scanner {
111 pub(crate) fn num_files(&self) -> usize {
113 match self {
114 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
115 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
116 Scanner::Series(series_scan) => series_scan.input().num_files(),
117 }
118 }
119
120 pub(crate) fn num_memtables(&self) -> usize {
122 match self {
123 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
124 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
125 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
126 }
127 }
128
129 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
131 match self {
132 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
133 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
134 Scanner::Series(series_scan) => series_scan.input().file_ids(),
135 }
136 }
137
138 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
139 match self {
140 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
141 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
142 Scanner::Series(series_scan) => series_scan.input().index_ids(),
143 }
144 }
145
146 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
148 use store_api::region_engine::{PrepareRequest, RegionScanner};
149
150 let request = PrepareRequest::default().with_target_partitions(target_partitions);
151 match self {
152 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
153 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
154 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
155 }
156 }
157}
158
159#[cfg_attr(doc, aquamarine::aquamarine)]
160pub(crate) struct ScanRegion {
210 version: VersionRef,
212 access_layer: AccessLayerRef,
214 request: ScanRequest,
216 cache_strategy: CacheStrategy,
218 parallel_scan_channel_size: usize,
220 max_concurrent_scan_files: usize,
222 ignore_inverted_index: bool,
224 ignore_fulltext_index: bool,
226 ignore_bloom_filter: bool,
228 start_time: Option<Instant>,
230 filter_deleted: bool,
233 #[cfg(feature = "enterprise")]
234 extension_range_provider: Option<BoxedExtensionRangeProvider>,
235}
236
237impl ScanRegion {
238 pub(crate) fn new(
240 version: VersionRef,
241 access_layer: AccessLayerRef,
242 request: ScanRequest,
243 cache_strategy: CacheStrategy,
244 ) -> ScanRegion {
245 ScanRegion {
246 version,
247 access_layer,
248 request,
249 cache_strategy,
250 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
251 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
252 ignore_inverted_index: false,
253 ignore_fulltext_index: false,
254 ignore_bloom_filter: false,
255 start_time: None,
256 filter_deleted: true,
257 #[cfg(feature = "enterprise")]
258 extension_range_provider: None,
259 }
260 }
261
262 #[must_use]
264 pub(crate) fn with_parallel_scan_channel_size(
265 mut self,
266 parallel_scan_channel_size: usize,
267 ) -> Self {
268 self.parallel_scan_channel_size = parallel_scan_channel_size;
269 self
270 }
271
272 #[must_use]
274 pub(crate) fn with_max_concurrent_scan_files(
275 mut self,
276 max_concurrent_scan_files: usize,
277 ) -> Self {
278 self.max_concurrent_scan_files = max_concurrent_scan_files;
279 self
280 }
281
282 #[must_use]
284 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
285 self.ignore_inverted_index = ignore;
286 self
287 }
288
289 #[must_use]
291 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
292 self.ignore_fulltext_index = ignore;
293 self
294 }
295
296 #[must_use]
298 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
299 self.ignore_bloom_filter = ignore;
300 self
301 }
302
303 #[must_use]
304 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
305 self.start_time = Some(now);
306 self
307 }
308
309 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
310 self.filter_deleted = filter_deleted;
311 }
312
313 #[cfg(feature = "enterprise")]
314 pub(crate) fn set_extension_range_provider(
315 &mut self,
316 extension_range_provider: BoxedExtensionRangeProvider,
317 ) {
318 self.extension_range_provider = Some(extension_range_provider);
319 }
320
321 pub(crate) async fn scanner(self) -> Result<Scanner> {
323 if self.use_series_scan() {
324 self.series_scan().await.map(Scanner::Series)
325 } else if self.use_unordered_scan() {
326 self.unordered_scan().await.map(Scanner::Unordered)
329 } else {
330 self.seq_scan().await.map(Scanner::Seq)
331 }
332 }
333
334 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
336 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
337 if self.use_series_scan() {
338 self.series_scan()
339 .await
340 .map(|scanner| Box::new(scanner) as _)
341 } else if self.use_unordered_scan() {
342 self.unordered_scan()
343 .await
344 .map(|scanner| Box::new(scanner) as _)
345 } else {
346 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
347 }
348 }
349
350 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
352 let input = self.scan_input().await?.with_compaction(false);
353 Ok(SeqScan::new(input))
354 }
355
356 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
358 let input = self.scan_input().await?;
359 Ok(UnorderedScan::new(input))
360 }
361
362 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
364 let input = self.scan_input().await?;
365 Ok(SeriesScan::new(input))
366 }
367
368 fn use_unordered_scan(&self) -> bool {
370 self.version.options.append_mode
377 && self.request.series_row_selector.is_none()
378 && (self.request.distribution.is_none()
379 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
380 }
381
382 fn use_series_scan(&self) -> bool {
384 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
385 }
386
387 fn use_flat_format(&self) -> bool {
389 self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
390 }
391
392 async fn scan_input(mut self) -> Result<ScanInput> {
394 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
395 let time_range = self.build_time_range_predicate();
396 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
397 let flat_format = self.use_flat_format();
398
399 let mapper = match &self.request.projection {
401 Some(p) => {
402 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
403 }
404 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
405 };
406
407 let ssts = &self.version.ssts;
408 let mut files = Vec::new();
409 for level in ssts.levels() {
410 for file in level.files.values() {
411 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
412 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
413 (Some(_), None) => true,
419 (None, _) => true,
420 };
421
422 if exceed_min_sequence && file_in_range(file, &time_range) {
424 files.push(file.clone());
425 }
426 }
430 }
431
432 let memtables = self.version.memtables.list_memtables();
433 let mut mem_range_builders = Vec::new();
435 let filter_mode = pre_filter_mode(
436 self.version.options.append_mode,
437 self.version.options.merge_mode(),
438 );
439
440 for m in memtables {
441 let Some((start, end)) = m.stats().time_range() else {
443 continue;
444 };
445 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
447 if !memtable_range.intersects(&time_range) {
448 continue;
449 }
450 let ranges_in_memtable = m.ranges(
451 Some(mapper.column_ids()),
452 RangesOptions::default()
453 .with_predicate(predicate.clone())
454 .with_sequence(SequenceRange::new(
455 self.request.memtable_min_sequence,
456 self.request.memtable_max_sequence,
457 ))
458 .with_pre_filter_mode(filter_mode),
459 )?;
460 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
461 let mut stats = ranges_in_memtable.stats.clone();
463 stats.num_ranges = 1;
464 stats.num_rows = v.num_rows();
465 MemRangeBuilder::new(v, stats)
466 }));
467 }
468
469 let region_id = self.region_id();
470 debug!(
471 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
472 region_id,
473 self.request,
474 time_range,
475 mem_range_builders.len(),
476 files.len(),
477 self.version.options.append_mode,
478 flat_format,
479 );
480
481 let (non_field_filters, field_filters) = self.partition_by_field_filters();
482 let inverted_index_appliers = [
483 self.build_invereted_index_applier(&non_field_filters),
484 self.build_invereted_index_applier(&field_filters),
485 ];
486 let bloom_filter_appliers = [
487 self.build_bloom_filter_applier(&non_field_filters),
488 self.build_bloom_filter_applier(&field_filters),
489 ];
490 let fulltext_index_appliers = [
491 self.build_fulltext_index_applier(&non_field_filters),
492 self.build_fulltext_index_applier(&field_filters),
493 ];
494 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
495
496 if flat_format {
497 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
499 }
500
501 let input = ScanInput::new(self.access_layer, mapper)
502 .with_time_range(Some(time_range))
503 .with_predicate(predicate)
504 .with_memtables(mem_range_builders)
505 .with_files(files)
506 .with_cache(self.cache_strategy)
507 .with_inverted_index_appliers(inverted_index_appliers)
508 .with_bloom_filter_index_appliers(bloom_filter_appliers)
509 .with_fulltext_index_appliers(fulltext_index_appliers)
510 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
511 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
512 .with_start_time(self.start_time)
513 .with_append_mode(self.version.options.append_mode)
514 .with_filter_deleted(self.filter_deleted)
515 .with_merge_mode(self.version.options.merge_mode())
516 .with_series_row_selector(self.request.series_row_selector)
517 .with_distribution(self.request.distribution)
518 .with_flat_format(flat_format);
519
520 #[cfg(feature = "enterprise")]
521 let input = if let Some(provider) = self.extension_range_provider {
522 let ranges = provider
523 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
524 .await?;
525 debug!("Find extension ranges: {ranges:?}");
526 input.with_extension_ranges(ranges)
527 } else {
528 input
529 };
530 Ok(input)
531 }
532
533 fn region_id(&self) -> RegionId {
534 self.version.metadata.region_id
535 }
536
537 fn build_time_range_predicate(&self) -> TimestampRange {
539 let time_index = self.version.metadata.time_index_column();
540 let unit = time_index
541 .column_schema
542 .data_type
543 .as_timestamp()
544 .expect("Time index must have timestamp-compatible type")
545 .unit();
546 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
547 }
548
549 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
552 let field_columns = self
553 .version
554 .metadata
555 .field_columns()
556 .map(|col| &col.column_schema.name)
557 .collect::<HashSet<_>>();
558
559 let mut columns = HashSet::new();
560
561 self.request.filters.iter().cloned().partition(|expr| {
562 columns.clear();
563 if expr_to_columns(expr, &mut columns).is_err() {
565 return true;
567 }
568 !columns
570 .iter()
571 .any(|column| field_columns.contains(&column.name))
572 })
573 }
574
575 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
577 if self.ignore_inverted_index {
578 return None;
579 }
580
581 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
582 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
583
584 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
585
586 InvertedIndexApplierBuilder::new(
587 self.access_layer.table_dir().to_string(),
588 self.access_layer.path_type(),
589 self.access_layer.object_store().clone(),
590 self.version.metadata.as_ref(),
591 self.version.metadata.inverted_indexed_column_ids(
592 self.version
593 .options
594 .index_options
595 .inverted_index
596 .ignore_column_ids
597 .iter(),
598 ),
599 self.access_layer.puffin_manager_factory().clone(),
600 )
601 .with_file_cache(file_cache)
602 .with_inverted_index_cache(inverted_index_cache)
603 .with_puffin_metadata_cache(puffin_metadata_cache)
604 .build(filters)
605 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
606 .ok()
607 .flatten()
608 .map(Arc::new)
609 }
610
611 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
613 if self.ignore_bloom_filter {
614 return None;
615 }
616
617 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
618 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
619 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
620
621 BloomFilterIndexApplierBuilder::new(
622 self.access_layer.table_dir().to_string(),
623 self.access_layer.path_type(),
624 self.access_layer.object_store().clone(),
625 self.version.metadata.as_ref(),
626 self.access_layer.puffin_manager_factory().clone(),
627 )
628 .with_file_cache(file_cache)
629 .with_bloom_filter_index_cache(bloom_filter_index_cache)
630 .with_puffin_metadata_cache(puffin_metadata_cache)
631 .build(filters)
632 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
633 .ok()
634 .flatten()
635 .map(Arc::new)
636 }
637
638 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
640 if self.ignore_fulltext_index {
641 return None;
642 }
643
644 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
645 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
646 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
647 FulltextIndexApplierBuilder::new(
648 self.access_layer.table_dir().to_string(),
649 self.access_layer.path_type(),
650 self.access_layer.object_store().clone(),
651 self.access_layer.puffin_manager_factory().clone(),
652 self.version.metadata.as_ref(),
653 )
654 .with_file_cache(file_cache)
655 .with_puffin_metadata_cache(puffin_metadata_cache)
656 .with_bloom_filter_cache(bloom_filter_index_cache)
657 .build(filters)
658 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
659 .ok()
660 .flatten()
661 .map(Arc::new)
662 }
663}
664
665fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
667 if predicate == &TimestampRange::min_to_max() {
668 return true;
669 }
670 let (start, end) = file.time_range();
672 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
673 file_ts_range.intersects(predicate)
674}
675
676pub struct ScanInput {
678 access_layer: AccessLayerRef,
680 pub(crate) mapper: Arc<ProjectionMapper>,
682 time_range: Option<TimestampRange>,
684 pub(crate) predicate: PredicateGroup,
686 region_partition_expr: Option<PartitionExpr>,
688 pub(crate) memtables: Vec<MemRangeBuilder>,
690 pub(crate) files: Vec<FileHandle>,
692 pub(crate) cache_strategy: CacheStrategy,
694 ignore_file_not_found: bool,
696 pub(crate) parallel_scan_channel_size: usize,
698 pub(crate) max_concurrent_scan_files: usize,
700 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
702 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
703 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
704 pub(crate) query_start: Option<Instant>,
706 pub(crate) append_mode: bool,
708 pub(crate) filter_deleted: bool,
710 pub(crate) merge_mode: MergeMode,
712 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
714 pub(crate) distribution: Option<TimeSeriesDistribution>,
716 pub(crate) flat_format: bool,
718 pub(crate) compaction: bool,
720 #[cfg(feature = "enterprise")]
721 extension_ranges: Vec<BoxedExtensionRange>,
722}
723
724impl ScanInput {
725 #[must_use]
727 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
728 ScanInput {
729 access_layer,
730 mapper: Arc::new(mapper),
731 time_range: None,
732 predicate: PredicateGroup::default(),
733 region_partition_expr: None,
734 memtables: Vec::new(),
735 files: Vec::new(),
736 cache_strategy: CacheStrategy::Disabled,
737 ignore_file_not_found: false,
738 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
739 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
740 inverted_index_appliers: [None, None],
741 bloom_filter_index_appliers: [None, None],
742 fulltext_index_appliers: [None, None],
743 query_start: None,
744 append_mode: false,
745 filter_deleted: true,
746 merge_mode: MergeMode::default(),
747 series_row_selector: None,
748 distribution: None,
749 flat_format: false,
750 compaction: false,
751 #[cfg(feature = "enterprise")]
752 extension_ranges: Vec::new(),
753 }
754 }
755
756 #[must_use]
758 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
759 self.time_range = time_range;
760 self
761 }
762
763 #[must_use]
765 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
766 self.region_partition_expr = predicate.region_partition_expr().cloned();
767 self.predicate = predicate;
768 self
769 }
770
771 #[must_use]
773 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
774 self.memtables = memtables;
775 self
776 }
777
778 #[must_use]
780 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
781 self.files = files;
782 self
783 }
784
785 #[must_use]
787 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
788 self.cache_strategy = cache;
789 self
790 }
791
792 #[must_use]
794 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
795 self.ignore_file_not_found = ignore;
796 self
797 }
798
799 #[must_use]
801 pub(crate) fn with_parallel_scan_channel_size(
802 mut self,
803 parallel_scan_channel_size: usize,
804 ) -> Self {
805 self.parallel_scan_channel_size = parallel_scan_channel_size;
806 self
807 }
808
809 #[must_use]
811 pub(crate) fn with_max_concurrent_scan_files(
812 mut self,
813 max_concurrent_scan_files: usize,
814 ) -> Self {
815 self.max_concurrent_scan_files = max_concurrent_scan_files;
816 self
817 }
818
819 #[must_use]
821 pub(crate) fn with_inverted_index_appliers(
822 mut self,
823 appliers: [Option<InvertedIndexApplierRef>; 2],
824 ) -> Self {
825 self.inverted_index_appliers = appliers;
826 self
827 }
828
829 #[must_use]
831 pub(crate) fn with_bloom_filter_index_appliers(
832 mut self,
833 appliers: [Option<BloomFilterIndexApplierRef>; 2],
834 ) -> Self {
835 self.bloom_filter_index_appliers = appliers;
836 self
837 }
838
839 #[must_use]
841 pub(crate) fn with_fulltext_index_appliers(
842 mut self,
843 appliers: [Option<FulltextIndexApplierRef>; 2],
844 ) -> Self {
845 self.fulltext_index_appliers = appliers;
846 self
847 }
848
849 #[must_use]
851 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
852 self.query_start = now;
853 self
854 }
855
856 #[must_use]
857 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
858 self.append_mode = is_append_mode;
859 self
860 }
861
862 #[must_use]
864 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
865 self.filter_deleted = filter_deleted;
866 self
867 }
868
869 #[must_use]
871 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
872 self.merge_mode = merge_mode;
873 self
874 }
875
876 #[must_use]
878 pub(crate) fn with_distribution(
879 mut self,
880 distribution: Option<TimeSeriesDistribution>,
881 ) -> Self {
882 self.distribution = distribution;
883 self
884 }
885
886 #[must_use]
888 pub(crate) fn with_series_row_selector(
889 mut self,
890 series_row_selector: Option<TimeSeriesRowSelector>,
891 ) -> Self {
892 self.series_row_selector = series_row_selector;
893 self
894 }
895
896 #[must_use]
898 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
899 self.flat_format = flat_format;
900 self
901 }
902
903 #[must_use]
905 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
906 self.compaction = compaction;
907 self
908 }
909
910 pub(crate) fn create_parallel_sources(
914 &self,
915 sources: Vec<Source>,
916 semaphore: Arc<Semaphore>,
917 ) -> Result<Vec<Source>> {
918 if sources.len() <= 1 {
919 return Ok(sources);
920 }
921
922 let sources = sources
924 .into_iter()
925 .map(|source| {
926 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
927 self.spawn_scan_task(source, semaphore.clone(), sender);
928 let stream = Box::pin(ReceiverStream::new(receiver));
929 Source::Stream(stream)
930 })
931 .collect();
932 Ok(sources)
933 }
934
935 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
937 let memtable = &self.memtables[index.index];
938 let mut ranges = SmallVec::new();
939 memtable.build_ranges(index.row_group_index, &mut ranges);
940 ranges
941 }
942
943 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
944 if self.should_skip_region_partition(file) {
945 self.predicate.predicate_without_region().cloned()
946 } else {
947 self.predicate.predicate().cloned()
948 }
949 }
950
951 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
952 match (
953 self.region_partition_expr.as_ref(),
954 file.meta_ref().partition_expr.as_ref(),
955 ) {
956 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
957 _ => false,
958 }
959 }
960
961 pub async fn prune_file(
963 &self,
964 file: &FileHandle,
965 reader_metrics: &mut ReaderMetrics,
966 ) -> Result<FileRangeBuilder> {
967 let predicate = self.predicate_for_file(file);
968 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
969 let decode_pk_values = !self.compaction && self.mapper.has_tags();
970 let res = self
971 .access_layer
972 .read_sst(file.clone())
973 .predicate(predicate)
974 .projection(Some(self.mapper.column_ids().to_vec()))
975 .cache(self.cache_strategy.clone())
976 .inverted_index_appliers(self.inverted_index_appliers.clone())
977 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
978 .fulltext_index_appliers(self.fulltext_index_appliers.clone())
979 .expected_metadata(Some(self.mapper.metadata().clone()))
980 .flat_format(self.flat_format)
981 .compaction(self.compaction)
982 .pre_filter_mode(filter_mode)
983 .decode_primary_key_values(decode_pk_values)
984 .build_reader_input(reader_metrics)
985 .await;
986 let (mut file_range_ctx, selection) = match res {
987 Ok(x) => x,
988 Err(e) => {
989 if e.is_object_not_found() && self.ignore_file_not_found {
990 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
991 return Ok(FileRangeBuilder::default());
992 } else {
993 return Err(e);
994 }
995 }
996 };
997
998 let need_compat = !compat::has_same_columns_and_pk_encoding(
999 self.mapper.metadata(),
1000 file_range_ctx.read_format().metadata(),
1001 );
1002 if need_compat {
1003 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1006 let mapper = self.mapper.as_flat().unwrap();
1007 FlatCompatBatch::try_new(
1008 mapper,
1009 flat_format.metadata(),
1010 flat_format.format_projection(),
1011 self.compaction,
1012 )?
1013 .map(CompatBatch::Flat)
1014 } else {
1015 let compact_batch = PrimaryKeyCompatBatch::new(
1016 &self.mapper,
1017 file_range_ctx.read_format().metadata().clone(),
1018 )?;
1019 Some(CompatBatch::PrimaryKey(compact_batch))
1020 };
1021 file_range_ctx.set_compat_batch(compat);
1022 }
1023 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1024 }
1025
1026 pub(crate) fn spawn_scan_task(
1028 &self,
1029 mut input: Source,
1030 semaphore: Arc<Semaphore>,
1031 sender: mpsc::Sender<Result<Batch>>,
1032 ) {
1033 common_runtime::spawn_global(async move {
1034 loop {
1035 let maybe_batch = {
1038 let _permit = semaphore.acquire().await.unwrap();
1040 input.next_batch().await
1041 };
1042 match maybe_batch {
1043 Ok(Some(batch)) => {
1044 let _ = sender.send(Ok(batch)).await;
1045 }
1046 Ok(None) => break,
1047 Err(e) => {
1048 let _ = sender.send(Err(e)).await;
1049 break;
1050 }
1051 }
1052 }
1053 });
1054 }
1055
1056 pub(crate) fn create_parallel_flat_sources(
1060 &self,
1061 sources: Vec<BoxedRecordBatchStream>,
1062 semaphore: Arc<Semaphore>,
1063 ) -> Result<Vec<BoxedRecordBatchStream>> {
1064 if sources.len() <= 1 {
1065 return Ok(sources);
1066 }
1067
1068 let sources = sources
1070 .into_iter()
1071 .map(|source| {
1072 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1073 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1074 let stream = Box::pin(ReceiverStream::new(receiver));
1075 Box::pin(stream) as _
1076 })
1077 .collect();
1078 Ok(sources)
1079 }
1080
1081 pub(crate) fn spawn_flat_scan_task(
1083 &self,
1084 mut input: BoxedRecordBatchStream,
1085 semaphore: Arc<Semaphore>,
1086 sender: mpsc::Sender<Result<RecordBatch>>,
1087 ) {
1088 common_runtime::spawn_global(async move {
1089 loop {
1090 let maybe_batch = {
1093 let _permit = semaphore.acquire().await.unwrap();
1095 input.next().await
1096 };
1097 match maybe_batch {
1098 Some(Ok(batch)) => {
1099 let _ = sender.send(Ok(batch)).await;
1100 }
1101 Some(Err(e)) => {
1102 let _ = sender.send(Err(e)).await;
1103 break;
1104 }
1105 None => break,
1106 }
1107 }
1108 });
1109 }
1110
1111 pub(crate) fn total_rows(&self) -> usize {
1112 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1113 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1114
1115 let rows = rows_in_files + rows_in_memtables;
1116 #[cfg(feature = "enterprise")]
1117 let rows = rows
1118 + self
1119 .extension_ranges
1120 .iter()
1121 .map(|x| x.num_rows())
1122 .sum::<u64>() as usize;
1123 rows
1124 }
1125
1126 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1127 &self.predicate
1128 }
1129
1130 pub(crate) fn num_memtables(&self) -> usize {
1132 self.memtables.len()
1133 }
1134
1135 pub(crate) fn num_files(&self) -> usize {
1137 self.files.len()
1138 }
1139
1140 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1142 let file_index = index.index - self.num_memtables();
1143 &self.files[file_index]
1144 }
1145
1146 pub fn region_metadata(&self) -> &RegionMetadataRef {
1147 self.mapper.metadata()
1148 }
1149}
1150
1151#[cfg(feature = "enterprise")]
1152impl ScanInput {
1153 #[must_use]
1154 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1155 Self {
1156 extension_ranges,
1157 ..self
1158 }
1159 }
1160
1161 #[cfg(feature = "enterprise")]
1162 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1163 &self.extension_ranges
1164 }
1165
1166 #[cfg(feature = "enterprise")]
1168 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1169 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1170 }
1171}
1172
1173#[cfg(test)]
1174impl ScanInput {
1175 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1177 self.files.iter().map(|file| file.file_id()).collect()
1178 }
1179
1180 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1181 self.files.iter().map(|file| file.index_id()).collect()
1182 }
1183}
1184
1185fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1186 if append_mode {
1187 return PreFilterMode::All;
1188 }
1189
1190 match merge_mode {
1191 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1192 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1193 }
1194}
1195
1196pub struct StreamContext {
1199 pub input: ScanInput,
1201 pub(crate) ranges: Vec<RangeMeta>,
1203
1204 pub(crate) query_start: Instant,
1207}
1208
1209impl StreamContext {
1210 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1212 let query_start = input.query_start.unwrap_or_else(Instant::now);
1213 let ranges = RangeMeta::seq_scan_ranges(&input);
1214 READ_SST_COUNT.observe(input.num_files() as f64);
1215
1216 Self {
1217 input,
1218 ranges,
1219 query_start,
1220 }
1221 }
1222
1223 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1225 let query_start = input.query_start.unwrap_or_else(Instant::now);
1226 let ranges = RangeMeta::unordered_scan_ranges(&input);
1227 READ_SST_COUNT.observe(input.num_files() as f64);
1228
1229 Self {
1230 input,
1231 ranges,
1232 query_start,
1233 }
1234 }
1235
1236 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1238 self.input.num_memtables() > index.index
1239 }
1240
1241 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1242 !self.is_mem_range_index(index)
1243 && index.index < self.input.num_files() + self.input.num_memtables()
1244 }
1245
1246 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1248 self.ranges
1249 .iter()
1250 .enumerate()
1251 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1252 .collect()
1253 }
1254
1255 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1257 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1258 for range_meta in &self.ranges {
1259 for idx in &range_meta.row_group_indices {
1260 if self.is_mem_range_index(*idx) {
1261 num_mem_ranges += 1;
1262 } else if self.is_file_range_index(*idx) {
1263 num_file_ranges += 1;
1264 } else {
1265 num_other_ranges += 1;
1266 }
1267 }
1268 }
1269 if verbose {
1270 write!(f, "{{")?;
1271 }
1272 write!(
1273 f,
1274 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1275 self.ranges.len(),
1276 num_mem_ranges,
1277 self.input.num_files(),
1278 num_file_ranges,
1279 )?;
1280 if num_other_ranges > 0 {
1281 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1282 }
1283 write!(f, "}}")?;
1284
1285 if let Some(selector) = &self.input.series_row_selector {
1286 write!(f, ", \"selector\":\"{}\"", selector)?;
1287 }
1288 if let Some(distribution) = &self.input.distribution {
1289 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1290 }
1291
1292 if verbose {
1293 self.format_verbose_content(f)?;
1294 }
1295
1296 Ok(())
1297 }
1298
1299 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1300 struct FileWrapper<'a> {
1301 file: &'a FileHandle,
1302 }
1303
1304 impl fmt::Debug for FileWrapper<'_> {
1305 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1306 let (start, end) = self.file.time_range();
1307 write!(
1308 f,
1309 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1310 self.file.file_id(),
1311 start.value(),
1312 start.unit(),
1313 end.value(),
1314 end.unit(),
1315 self.file.num_rows(),
1316 self.file.size(),
1317 self.file.index_size()
1318 )
1319 }
1320 }
1321
1322 struct InputWrapper<'a> {
1323 input: &'a ScanInput,
1324 }
1325
1326 #[cfg(feature = "enterprise")]
1327 impl InputWrapper<'_> {
1328 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1329 if self.input.extension_ranges.is_empty() {
1330 return Ok(());
1331 }
1332
1333 let mut delimiter = "";
1334 write!(f, ", extension_ranges: [")?;
1335 for range in self.input.extension_ranges() {
1336 write!(f, "{}{:?}", delimiter, range)?;
1337 delimiter = ", ";
1338 }
1339 write!(f, "]")?;
1340 Ok(())
1341 }
1342 }
1343
1344 impl fmt::Debug for InputWrapper<'_> {
1345 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1346 let output_schema = self.input.mapper.output_schema();
1347 if !output_schema.is_empty() {
1348 let names: Vec<_> = output_schema
1349 .column_schemas()
1350 .iter()
1351 .map(|col| &col.name)
1352 .collect();
1353 write!(f, ", \"projection\": {:?}", names)?;
1354 }
1355 if let Some(predicate) = &self.input.predicate.predicate()
1356 && !predicate.exprs().is_empty()
1357 {
1358 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1359 write!(f, ", \"filters\": {:?}", exprs)?;
1360 }
1361 if !self.input.files.is_empty() {
1362 write!(f, ", \"files\": ")?;
1363 f.debug_list()
1364 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1365 .finish()?;
1366 }
1367
1368 #[cfg(feature = "enterprise")]
1369 self.format_extension_ranges(f)?;
1370
1371 Ok(())
1372 }
1373 }
1374
1375 write!(f, "{:?}", InputWrapper { input: &self.input })
1376 }
1377}
1378
1379#[derive(Clone, Default)]
1382pub struct PredicateGroup {
1383 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1384 predicate_all: Option<Predicate>,
1386 predicate_without_region: Option<Predicate>,
1388 region_partition_expr: Option<PartitionExpr>,
1390}
1391
1392impl PredicateGroup {
1393 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1395 let mut combined_exprs = exprs.to_vec();
1396 let mut region_partition_expr = None;
1397
1398 if let Some(expr_json) = metadata.partition_expr.as_ref()
1399 && !expr_json.is_empty()
1400 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1401 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1402 {
1403 let logical_expr = expr
1404 .try_as_logical_expr()
1405 .context(InvalidPartitionExprSnafu {
1406 expr: expr_json.clone(),
1407 })?;
1408
1409 combined_exprs.push(logical_expr);
1410 region_partition_expr = Some(expr);
1411 }
1412
1413 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1414 let mut columns = HashSet::new();
1416 for expr in &combined_exprs {
1417 columns.clear();
1418 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1419 continue;
1420 };
1421 time_filters.push(filter);
1422 }
1423 let time_filters = if time_filters.is_empty() {
1424 None
1425 } else {
1426 Some(Arc::new(time_filters))
1427 };
1428
1429 let predicate_all = if combined_exprs.is_empty() {
1430 None
1431 } else {
1432 Some(Predicate::new(combined_exprs))
1433 };
1434 let predicate_without_region = if exprs.is_empty() {
1435 None
1436 } else {
1437 Some(Predicate::new(exprs.to_vec()))
1438 };
1439
1440 Ok(Self {
1441 time_filters,
1442 predicate_all,
1443 predicate_without_region,
1444 region_partition_expr,
1445 })
1446 }
1447
1448 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1450 self.time_filters.clone()
1451 }
1452
1453 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1455 self.predicate_all.as_ref()
1456 }
1457
1458 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1460 self.predicate_without_region.as_ref()
1461 }
1462
1463 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1465 self.region_partition_expr.as_ref()
1466 }
1467
1468 fn expr_to_filter(
1469 expr: &Expr,
1470 metadata: &RegionMetadata,
1471 columns: &mut HashSet<Column>,
1472 ) -> Option<SimpleFilterEvaluator> {
1473 columns.clear();
1474 expr_to_columns(expr, columns).ok()?;
1477 if columns.len() > 1 {
1478 return None;
1480 }
1481 let column = columns.iter().next()?;
1482 let column_meta = metadata.column_by_name(&column.name)?;
1483 if column_meta.semantic_type == SemanticType::Timestamp {
1484 SimpleFilterEvaluator::try_new(expr)
1485 } else {
1486 None
1487 }
1488 }
1489}