1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::Expr;
31use datafusion_expr::utils::expr_to_columns;
32use futures::StreamExt;
33use partition::expr::PartitionExpr;
34use smallvec::SmallVec;
35use snafu::ResultExt;
36use store_api::metadata::{RegionMetadata, RegionMetadataRef};
37use store_api::region_engine::{PartitionRange, RegionScannerRef};
38use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
39use table::predicate::{Predicate, build_time_range_predicate};
40use tokio::sync::{Semaphore, mpsc};
41use tokio_stream::wrappers::ReceiverStream;
42
43use crate::access_layer::AccessLayerRef;
44use crate::cache::CacheStrategy;
45use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
46use crate::error::{InvalidPartitionExprSnafu, Result};
47#[cfg(feature = "enterprise")]
48use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
49use crate::memtable::MemtableRange;
50use crate::metrics::READ_SST_COUNT;
51use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
52use crate::read::projection::ProjectionMapper;
53use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
54use crate::read::seq_scan::SeqScan;
55use crate::read::series_scan::SeriesScan;
56use crate::read::stream::ScanBatchStream;
57use crate::read::unordered_scan::UnorderedScan;
58use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
59use crate::region::options::MergeMode;
60use crate::region::version::VersionRef;
61use crate::sst::file::FileHandle;
62use crate::sst::index::bloom_filter::applier::{
63 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
64};
65use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
66use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
67use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
68use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
69use crate::sst::parquet::reader::ReaderMetrics;
70
71const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
73
74pub(crate) enum Scanner {
76 Seq(SeqScan),
78 Unordered(UnorderedScan),
80 Series(SeriesScan),
82}
83
84impl Scanner {
85 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
87 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
88 match self {
89 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
90 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
91 Scanner::Series(series_scan) => series_scan.build_stream().await,
92 }
93 }
94
95 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
97 match self {
98 Scanner::Seq(x) => x.scan_all_partitions(),
99 Scanner::Unordered(x) => x.scan_all_partitions(),
100 Scanner::Series(x) => x.scan_all_partitions(),
101 }
102 }
103}
104
105#[cfg(test)]
106impl Scanner {
107 pub(crate) fn num_files(&self) -> usize {
109 match self {
110 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
111 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
112 Scanner::Series(series_scan) => series_scan.input().num_files(),
113 }
114 }
115
116 pub(crate) fn num_memtables(&self) -> usize {
118 match self {
119 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
120 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
121 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
122 }
123 }
124
125 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
127 match self {
128 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
129 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
130 Scanner::Series(series_scan) => series_scan.input().file_ids(),
131 }
132 }
133
134 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
136 use store_api::region_engine::{PrepareRequest, RegionScanner};
137
138 let request = PrepareRequest::default().with_target_partitions(target_partitions);
139 match self {
140 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
141 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
142 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
143 }
144 }
145}
146
147#[cfg_attr(doc, aquamarine::aquamarine)]
148pub(crate) struct ScanRegion {
198 version: VersionRef,
200 access_layer: AccessLayerRef,
202 request: ScanRequest,
204 cache_strategy: CacheStrategy,
206 parallel_scan_channel_size: usize,
208 max_concurrent_scan_files: usize,
210 ignore_inverted_index: bool,
212 ignore_fulltext_index: bool,
214 ignore_bloom_filter: bool,
216 start_time: Option<Instant>,
218 filter_deleted: bool,
221 flat_format: bool,
223 #[cfg(feature = "enterprise")]
224 extension_range_provider: Option<BoxedExtensionRangeProvider>,
225}
226
227impl ScanRegion {
228 pub(crate) fn new(
230 version: VersionRef,
231 access_layer: AccessLayerRef,
232 request: ScanRequest,
233 cache_strategy: CacheStrategy,
234 ) -> ScanRegion {
235 ScanRegion {
236 version,
237 access_layer,
238 request,
239 cache_strategy,
240 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
241 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
242 ignore_inverted_index: false,
243 ignore_fulltext_index: false,
244 ignore_bloom_filter: false,
245 start_time: None,
246 filter_deleted: true,
247 flat_format: false,
248 #[cfg(feature = "enterprise")]
249 extension_range_provider: None,
250 }
251 }
252
253 #[must_use]
255 pub(crate) fn with_parallel_scan_channel_size(
256 mut self,
257 parallel_scan_channel_size: usize,
258 ) -> Self {
259 self.parallel_scan_channel_size = parallel_scan_channel_size;
260 self
261 }
262
263 #[must_use]
265 pub(crate) fn with_max_concurrent_scan_files(
266 mut self,
267 max_concurrent_scan_files: usize,
268 ) -> Self {
269 self.max_concurrent_scan_files = max_concurrent_scan_files;
270 self
271 }
272
273 #[must_use]
275 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
276 self.ignore_inverted_index = ignore;
277 self
278 }
279
280 #[must_use]
282 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
283 self.ignore_fulltext_index = ignore;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
290 self.ignore_bloom_filter = ignore;
291 self
292 }
293
294 #[must_use]
295 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
296 self.start_time = Some(now);
297 self
298 }
299
300 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
301 self.filter_deleted = filter_deleted;
302 }
303
304 #[must_use]
306 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
307 self.flat_format = flat_format;
308 self
309 }
310
311 #[cfg(feature = "enterprise")]
312 pub(crate) fn set_extension_range_provider(
313 &mut self,
314 extension_range_provider: BoxedExtensionRangeProvider,
315 ) {
316 self.extension_range_provider = Some(extension_range_provider);
317 }
318
319 pub(crate) async fn scanner(self) -> Result<Scanner> {
321 if self.use_series_scan() {
322 self.series_scan().await.map(Scanner::Series)
323 } else if self.use_unordered_scan() {
324 self.unordered_scan().await.map(Scanner::Unordered)
327 } else {
328 self.seq_scan().await.map(Scanner::Seq)
329 }
330 }
331
332 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
334 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
335 if self.use_series_scan() {
336 self.series_scan()
337 .await
338 .map(|scanner| Box::new(scanner) as _)
339 } else if self.use_unordered_scan() {
340 self.unordered_scan()
341 .await
342 .map(|scanner| Box::new(scanner) as _)
343 } else {
344 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
345 }
346 }
347
348 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
350 let input = self.scan_input().await?.with_compaction(false);
351 Ok(SeqScan::new(input))
352 }
353
354 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
356 let input = self.scan_input().await?;
357 Ok(UnorderedScan::new(input))
358 }
359
360 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
362 let input = self.scan_input().await?;
363 Ok(SeriesScan::new(input))
364 }
365
366 fn use_unordered_scan(&self) -> bool {
368 self.version.options.append_mode
375 && self.request.series_row_selector.is_none()
376 && (self.request.distribution.is_none()
377 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
378 }
379
380 fn use_series_scan(&self) -> bool {
382 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
383 }
384
385 async fn scan_input(mut self) -> Result<ScanInput> {
387 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
388 let time_range = self.build_time_range_predicate();
389 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
390
391 let mapper = match &self.request.projection {
393 Some(p) => {
394 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
395 }
396 None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
397 };
398
399 let ssts = &self.version.ssts;
400 let mut files = Vec::new();
401 for level in ssts.levels() {
402 for file in level.files.values() {
403 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
404 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
405 (Some(_), None) => true,
411 (None, _) => true,
412 };
413
414 if exceed_min_sequence && file_in_range(file, &time_range) {
416 files.push(file.clone());
417 }
418 }
422 }
423
424 let memtables = self.version.memtables.list_memtables();
425 let mut mem_range_builders = Vec::new();
427
428 for m in memtables {
429 let Some((start, end)) = m.stats().time_range() else {
431 continue;
432 };
433 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
435 if !memtable_range.intersects(&time_range) {
436 continue;
437 }
438 let ranges_in_memtable = m.ranges(
439 Some(mapper.column_ids()),
440 predicate.clone(),
441 self.request.sequence,
442 false,
443 )?;
444 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
445 let mut stats = ranges_in_memtable.stats.clone();
447 stats.num_ranges = 1;
448 stats.num_rows = v.num_rows();
449 MemRangeBuilder::new(v, stats)
450 }));
451 }
452
453 let region_id = self.region_id();
454 debug!(
455 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
456 region_id,
457 self.request,
458 time_range,
459 mem_range_builders.len(),
460 files.len(),
461 self.version.options.append_mode,
462 );
463
464 self.maybe_remove_field_filters();
466
467 let inverted_index_applier = self.build_invereted_index_applier();
468 let bloom_filter_applier = self.build_bloom_filter_applier();
469 let fulltext_index_applier = self.build_fulltext_index_applier();
470 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
471
472 if self.flat_format {
473 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
475 }
476
477 let input = ScanInput::new(self.access_layer, mapper)
478 .with_time_range(Some(time_range))
479 .with_predicate(predicate)
480 .with_memtables(mem_range_builders)
481 .with_files(files)
482 .with_cache(self.cache_strategy)
483 .with_inverted_index_applier(inverted_index_applier)
484 .with_bloom_filter_index_applier(bloom_filter_applier)
485 .with_fulltext_index_applier(fulltext_index_applier)
486 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
487 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
488 .with_start_time(self.start_time)
489 .with_append_mode(self.version.options.append_mode)
490 .with_filter_deleted(self.filter_deleted)
491 .with_merge_mode(self.version.options.merge_mode())
492 .with_series_row_selector(self.request.series_row_selector)
493 .with_distribution(self.request.distribution)
494 .with_flat_format(self.flat_format);
495
496 #[cfg(feature = "enterprise")]
497 let input = if let Some(provider) = self.extension_range_provider {
498 let ranges = provider
499 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
500 .await?;
501 debug!("Find extension ranges: {ranges:?}");
502 input.with_extension_ranges(ranges)
503 } else {
504 input
505 };
506 Ok(input)
507 }
508
509 fn region_id(&self) -> RegionId {
510 self.version.metadata.region_id
511 }
512
513 fn build_time_range_predicate(&self) -> TimestampRange {
515 let time_index = self.version.metadata.time_index_column();
516 let unit = time_index
517 .column_schema
518 .data_type
519 .as_timestamp()
520 .expect("Time index must have timestamp-compatible type")
521 .unit();
522 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
523 }
524
525 fn maybe_remove_field_filters(&mut self) {
527 if self.version.options.merge_mode() != MergeMode::LastNonNull {
528 return;
529 }
530
531 let field_columns = self
533 .version
534 .metadata
535 .field_columns()
536 .map(|col| &col.column_schema.name)
537 .collect::<HashSet<_>>();
538 let mut columns = HashSet::new();
540
541 self.request.filters.retain(|expr| {
542 columns.clear();
543 if expr_to_columns(expr, &mut columns).is_err() {
545 return false;
546 }
547 for column in &columns {
548 if field_columns.contains(&column.name) {
549 return false;
551 }
552 }
553 true
554 });
555 }
556
557 fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
559 if self.ignore_inverted_index {
560 return None;
561 }
562
563 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
564 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
565
566 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
567
568 InvertedIndexApplierBuilder::new(
569 self.access_layer.table_dir().to_string(),
570 self.access_layer.path_type(),
571 self.access_layer.object_store().clone(),
572 self.version.metadata.as_ref(),
573 self.version.metadata.inverted_indexed_column_ids(
574 self.version
575 .options
576 .index_options
577 .inverted_index
578 .ignore_column_ids
579 .iter(),
580 ),
581 self.access_layer.puffin_manager_factory().clone(),
582 )
583 .with_file_cache(file_cache)
584 .with_inverted_index_cache(inverted_index_cache)
585 .with_puffin_metadata_cache(puffin_metadata_cache)
586 .build(&self.request.filters)
587 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
588 .ok()
589 .flatten()
590 .map(Arc::new)
591 }
592
593 fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
595 if self.ignore_bloom_filter {
596 return None;
597 }
598
599 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
600 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
601 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
602
603 BloomFilterIndexApplierBuilder::new(
604 self.access_layer.table_dir().to_string(),
605 self.access_layer.path_type(),
606 self.access_layer.object_store().clone(),
607 self.version.metadata.as_ref(),
608 self.access_layer.puffin_manager_factory().clone(),
609 )
610 .with_file_cache(file_cache)
611 .with_bloom_filter_index_cache(bloom_filter_index_cache)
612 .with_puffin_metadata_cache(puffin_metadata_cache)
613 .build(&self.request.filters)
614 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
615 .ok()
616 .flatten()
617 .map(Arc::new)
618 }
619
620 fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
622 if self.ignore_fulltext_index {
623 return None;
624 }
625
626 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
627 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
628 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
629 FulltextIndexApplierBuilder::new(
630 self.access_layer.table_dir().to_string(),
631 self.access_layer.path_type(),
632 self.access_layer.object_store().clone(),
633 self.access_layer.puffin_manager_factory().clone(),
634 self.version.metadata.as_ref(),
635 )
636 .with_file_cache(file_cache)
637 .with_puffin_metadata_cache(puffin_metadata_cache)
638 .with_bloom_filter_cache(bloom_filter_index_cache)
639 .build(&self.request.filters)
640 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
641 .ok()
642 .flatten()
643 .map(Arc::new)
644 }
645}
646
647fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
649 if predicate == &TimestampRange::min_to_max() {
650 return true;
651 }
652 let (start, end) = file.time_range();
654 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
655 file_ts_range.intersects(predicate)
656}
657
658pub struct ScanInput {
660 access_layer: AccessLayerRef,
662 pub(crate) mapper: Arc<ProjectionMapper>,
664 time_range: Option<TimestampRange>,
666 pub(crate) predicate: PredicateGroup,
668 region_partition_expr: Option<PartitionExpr>,
670 pub(crate) memtables: Vec<MemRangeBuilder>,
672 pub(crate) files: Vec<FileHandle>,
674 pub(crate) cache_strategy: CacheStrategy,
676 ignore_file_not_found: bool,
678 pub(crate) parallel_scan_channel_size: usize,
680 pub(crate) max_concurrent_scan_files: usize,
682 inverted_index_applier: Option<InvertedIndexApplierRef>,
684 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
685 fulltext_index_applier: Option<FulltextIndexApplierRef>,
686 pub(crate) query_start: Option<Instant>,
688 pub(crate) append_mode: bool,
690 pub(crate) filter_deleted: bool,
692 pub(crate) merge_mode: MergeMode,
694 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
696 pub(crate) distribution: Option<TimeSeriesDistribution>,
698 pub(crate) flat_format: bool,
700 pub(crate) compaction: bool,
702 #[cfg(feature = "enterprise")]
703 extension_ranges: Vec<BoxedExtensionRange>,
704}
705
706impl ScanInput {
707 #[must_use]
709 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
710 ScanInput {
711 access_layer,
712 mapper: Arc::new(mapper),
713 time_range: None,
714 predicate: PredicateGroup::default(),
715 region_partition_expr: None,
716 memtables: Vec::new(),
717 files: Vec::new(),
718 cache_strategy: CacheStrategy::Disabled,
719 ignore_file_not_found: false,
720 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
721 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
722 inverted_index_applier: None,
723 bloom_filter_index_applier: None,
724 fulltext_index_applier: None,
725 query_start: None,
726 append_mode: false,
727 filter_deleted: true,
728 merge_mode: MergeMode::default(),
729 series_row_selector: None,
730 distribution: None,
731 flat_format: false,
732 compaction: false,
733 #[cfg(feature = "enterprise")]
734 extension_ranges: Vec::new(),
735 }
736 }
737
738 #[must_use]
740 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
741 self.time_range = time_range;
742 self
743 }
744
745 #[must_use]
747 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
748 self.region_partition_expr = predicate.region_partition_expr().cloned();
749 self.predicate = predicate;
750 self
751 }
752
753 #[must_use]
755 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
756 self.memtables = memtables;
757 self
758 }
759
760 #[must_use]
762 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
763 self.files = files;
764 self
765 }
766
767 #[must_use]
769 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
770 self.cache_strategy = cache;
771 self
772 }
773
774 #[must_use]
776 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
777 self.ignore_file_not_found = ignore;
778 self
779 }
780
781 #[must_use]
783 pub(crate) fn with_parallel_scan_channel_size(
784 mut self,
785 parallel_scan_channel_size: usize,
786 ) -> Self {
787 self.parallel_scan_channel_size = parallel_scan_channel_size;
788 self
789 }
790
791 #[must_use]
793 pub(crate) fn with_max_concurrent_scan_files(
794 mut self,
795 max_concurrent_scan_files: usize,
796 ) -> Self {
797 self.max_concurrent_scan_files = max_concurrent_scan_files;
798 self
799 }
800
801 #[must_use]
803 pub(crate) fn with_inverted_index_applier(
804 mut self,
805 applier: Option<InvertedIndexApplierRef>,
806 ) -> Self {
807 self.inverted_index_applier = applier;
808 self
809 }
810
811 #[must_use]
813 pub(crate) fn with_bloom_filter_index_applier(
814 mut self,
815 applier: Option<BloomFilterIndexApplierRef>,
816 ) -> Self {
817 self.bloom_filter_index_applier = applier;
818 self
819 }
820
821 #[must_use]
823 pub(crate) fn with_fulltext_index_applier(
824 mut self,
825 applier: Option<FulltextIndexApplierRef>,
826 ) -> Self {
827 self.fulltext_index_applier = applier;
828 self
829 }
830
831 #[must_use]
833 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
834 self.query_start = now;
835 self
836 }
837
838 #[must_use]
839 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
840 self.append_mode = is_append_mode;
841 self
842 }
843
844 #[must_use]
846 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
847 self.filter_deleted = filter_deleted;
848 self
849 }
850
851 #[must_use]
853 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
854 self.merge_mode = merge_mode;
855 self
856 }
857
858 #[must_use]
860 pub(crate) fn with_distribution(
861 mut self,
862 distribution: Option<TimeSeriesDistribution>,
863 ) -> Self {
864 self.distribution = distribution;
865 self
866 }
867
868 #[must_use]
870 pub(crate) fn with_series_row_selector(
871 mut self,
872 series_row_selector: Option<TimeSeriesRowSelector>,
873 ) -> Self {
874 self.series_row_selector = series_row_selector;
875 self
876 }
877
878 #[must_use]
880 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
881 self.flat_format = flat_format;
882 self
883 }
884
885 #[must_use]
887 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
888 self.compaction = compaction;
889 self
890 }
891
892 pub(crate) fn create_parallel_sources(
896 &self,
897 sources: Vec<Source>,
898 semaphore: Arc<Semaphore>,
899 ) -> Result<Vec<Source>> {
900 if sources.len() <= 1 {
901 return Ok(sources);
902 }
903
904 let sources = sources
906 .into_iter()
907 .map(|source| {
908 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
909 self.spawn_scan_task(source, semaphore.clone(), sender);
910 let stream = Box::pin(ReceiverStream::new(receiver));
911 Source::Stream(stream)
912 })
913 .collect();
914 Ok(sources)
915 }
916
917 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
919 let memtable = &self.memtables[index.index];
920 let mut ranges = SmallVec::new();
921 memtable.build_ranges(index.row_group_index, &mut ranges);
922 ranges
923 }
924
925 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
926 if self.should_skip_region_partition(file) {
927 self.predicate.predicate_without_region().cloned()
928 } else {
929 self.predicate.predicate().cloned()
930 }
931 }
932
933 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
934 match (
935 self.region_partition_expr.as_ref(),
936 file.meta_ref().partition_expr.as_ref(),
937 ) {
938 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
939 _ => false,
940 }
941 }
942
943 pub async fn prune_file(
945 &self,
946 file: &FileHandle,
947 reader_metrics: &mut ReaderMetrics,
948 ) -> Result<FileRangeBuilder> {
949 let predicate = self.predicate_for_file(file);
950 let res = self
951 .access_layer
952 .read_sst(file.clone())
953 .predicate(predicate)
954 .projection(Some(self.mapper.column_ids().to_vec()))
955 .cache(self.cache_strategy.clone())
956 .inverted_index_applier(self.inverted_index_applier.clone())
957 .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
958 .fulltext_index_applier(self.fulltext_index_applier.clone())
959 .expected_metadata(Some(self.mapper.metadata().clone()))
960 .flat_format(self.flat_format)
961 .compaction(self.compaction)
962 .build_reader_input(reader_metrics)
963 .await;
964 let (mut file_range_ctx, selection) = match res {
965 Ok(x) => x,
966 Err(e) => {
967 if e.is_object_not_found() && self.ignore_file_not_found {
968 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
969 return Ok(FileRangeBuilder::default());
970 } else {
971 return Err(e);
972 }
973 }
974 };
975
976 let need_compat = !compat::has_same_columns_and_pk_encoding(
977 self.mapper.metadata(),
978 file_range_ctx.read_format().metadata(),
979 );
980 if need_compat {
981 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
984 let mapper = self.mapper.as_flat().unwrap();
985 FlatCompatBatch::try_new(
986 mapper,
987 flat_format.metadata(),
988 flat_format.format_projection(),
989 self.compaction,
990 )?
991 .map(CompatBatch::Flat)
992 } else {
993 let compact_batch = PrimaryKeyCompatBatch::new(
994 &self.mapper,
995 file_range_ctx.read_format().metadata().clone(),
996 )?;
997 Some(CompatBatch::PrimaryKey(compact_batch))
998 };
999 file_range_ctx.set_compat_batch(compat);
1000 }
1001 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1002 }
1003
1004 pub(crate) fn spawn_scan_task(
1006 &self,
1007 mut input: Source,
1008 semaphore: Arc<Semaphore>,
1009 sender: mpsc::Sender<Result<Batch>>,
1010 ) {
1011 common_runtime::spawn_global(async move {
1012 loop {
1013 let maybe_batch = {
1016 let _permit = semaphore.acquire().await.unwrap();
1018 input.next_batch().await
1019 };
1020 match maybe_batch {
1021 Ok(Some(batch)) => {
1022 let _ = sender.send(Ok(batch)).await;
1023 }
1024 Ok(None) => break,
1025 Err(e) => {
1026 let _ = sender.send(Err(e)).await;
1027 break;
1028 }
1029 }
1030 }
1031 });
1032 }
1033
1034 pub(crate) fn create_parallel_flat_sources(
1038 &self,
1039 sources: Vec<BoxedRecordBatchStream>,
1040 semaphore: Arc<Semaphore>,
1041 ) -> Result<Vec<BoxedRecordBatchStream>> {
1042 if sources.len() <= 1 {
1043 return Ok(sources);
1044 }
1045
1046 let sources = sources
1048 .into_iter()
1049 .map(|source| {
1050 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1051 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1052 let stream = Box::pin(ReceiverStream::new(receiver));
1053 Box::pin(stream) as _
1054 })
1055 .collect();
1056 Ok(sources)
1057 }
1058
1059 pub(crate) fn spawn_flat_scan_task(
1061 &self,
1062 mut input: BoxedRecordBatchStream,
1063 semaphore: Arc<Semaphore>,
1064 sender: mpsc::Sender<Result<RecordBatch>>,
1065 ) {
1066 common_runtime::spawn_global(async move {
1067 loop {
1068 let maybe_batch = {
1071 let _permit = semaphore.acquire().await.unwrap();
1073 input.next().await
1074 };
1075 match maybe_batch {
1076 Some(Ok(batch)) => {
1077 let _ = sender.send(Ok(batch)).await;
1078 }
1079 Some(Err(e)) => {
1080 let _ = sender.send(Err(e)).await;
1081 break;
1082 }
1083 None => break,
1084 }
1085 }
1086 });
1087 }
1088
1089 pub(crate) fn total_rows(&self) -> usize {
1090 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1091 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1092
1093 let rows = rows_in_files + rows_in_memtables;
1094 #[cfg(feature = "enterprise")]
1095 let rows = rows
1096 + self
1097 .extension_ranges
1098 .iter()
1099 .map(|x| x.num_rows())
1100 .sum::<u64>() as usize;
1101 rows
1102 }
1103
1104 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1106 self.predicate.predicate()
1107 }
1108
1109 pub(crate) fn num_memtables(&self) -> usize {
1111 self.memtables.len()
1112 }
1113
1114 pub(crate) fn num_files(&self) -> usize {
1116 self.files.len()
1117 }
1118
1119 pub fn region_metadata(&self) -> &RegionMetadataRef {
1120 self.mapper.metadata()
1121 }
1122}
1123
1124#[cfg(feature = "enterprise")]
1125impl ScanInput {
1126 #[must_use]
1127 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1128 Self {
1129 extension_ranges,
1130 ..self
1131 }
1132 }
1133
1134 #[cfg(feature = "enterprise")]
1135 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1136 &self.extension_ranges
1137 }
1138
1139 #[cfg(feature = "enterprise")]
1141 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1142 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1143 }
1144}
1145
1146#[cfg(test)]
1147impl ScanInput {
1148 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1150 self.files.iter().map(|file| file.file_id()).collect()
1151 }
1152}
1153
1154pub struct StreamContext {
1157 pub input: ScanInput,
1159 pub(crate) ranges: Vec<RangeMeta>,
1161
1162 pub(crate) query_start: Instant,
1165}
1166
1167impl StreamContext {
1168 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1170 let query_start = input.query_start.unwrap_or_else(Instant::now);
1171 let ranges = RangeMeta::seq_scan_ranges(&input);
1172 READ_SST_COUNT.observe(input.num_files() as f64);
1173
1174 Self {
1175 input,
1176 ranges,
1177 query_start,
1178 }
1179 }
1180
1181 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1183 let query_start = input.query_start.unwrap_or_else(Instant::now);
1184 let ranges = RangeMeta::unordered_scan_ranges(&input);
1185 READ_SST_COUNT.observe(input.num_files() as f64);
1186
1187 Self {
1188 input,
1189 ranges,
1190 query_start,
1191 }
1192 }
1193
1194 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1196 self.input.num_memtables() > index.index
1197 }
1198
1199 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1200 !self.is_mem_range_index(index)
1201 && index.index < self.input.num_files() + self.input.num_memtables()
1202 }
1203
1204 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1206 self.ranges
1207 .iter()
1208 .enumerate()
1209 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1210 .collect()
1211 }
1212
1213 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1215 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1216 for range_meta in &self.ranges {
1217 for idx in &range_meta.row_group_indices {
1218 if self.is_mem_range_index(*idx) {
1219 num_mem_ranges += 1;
1220 } else if self.is_file_range_index(*idx) {
1221 num_file_ranges += 1;
1222 } else {
1223 num_other_ranges += 1;
1224 }
1225 }
1226 }
1227 if verbose {
1228 write!(f, "{{")?;
1229 }
1230 write!(
1231 f,
1232 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1233 self.ranges.len(),
1234 num_mem_ranges,
1235 self.input.num_files(),
1236 num_file_ranges,
1237 )?;
1238 if num_other_ranges > 0 {
1239 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1240 }
1241 write!(f, "}}")?;
1242
1243 if let Some(selector) = &self.input.series_row_selector {
1244 write!(f, ", \"selector\":\"{}\"", selector)?;
1245 }
1246 if let Some(distribution) = &self.input.distribution {
1247 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1248 }
1249
1250 if verbose {
1251 self.format_verbose_content(f)?;
1252 }
1253
1254 Ok(())
1255 }
1256
1257 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1258 struct FileWrapper<'a> {
1259 file: &'a FileHandle,
1260 }
1261
1262 impl fmt::Debug for FileWrapper<'_> {
1263 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1264 let (start, end) = self.file.time_range();
1265 write!(
1266 f,
1267 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1268 self.file.file_id(),
1269 start.value(),
1270 start.unit(),
1271 end.value(),
1272 end.unit(),
1273 self.file.num_rows(),
1274 self.file.size(),
1275 self.file.index_size()
1276 )
1277 }
1278 }
1279
1280 struct InputWrapper<'a> {
1281 input: &'a ScanInput,
1282 }
1283
1284 #[cfg(feature = "enterprise")]
1285 impl InputWrapper<'_> {
1286 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1287 if self.input.extension_ranges.is_empty() {
1288 return Ok(());
1289 }
1290
1291 let mut delimiter = "";
1292 write!(f, ", extension_ranges: [")?;
1293 for range in self.input.extension_ranges() {
1294 write!(f, "{}{:?}", delimiter, range)?;
1295 delimiter = ", ";
1296 }
1297 write!(f, "]")?;
1298 Ok(())
1299 }
1300 }
1301
1302 impl fmt::Debug for InputWrapper<'_> {
1303 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1304 let output_schema = self.input.mapper.output_schema();
1305 if !output_schema.is_empty() {
1306 let names: Vec<_> = output_schema
1307 .column_schemas()
1308 .iter()
1309 .map(|col| &col.name)
1310 .collect();
1311 write!(f, ", \"projection\": {:?}", names)?;
1312 }
1313 if let Some(predicate) = &self.input.predicate.predicate()
1314 && !predicate.exprs().is_empty()
1315 {
1316 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1317 write!(f, ", \"filters\": {:?}", exprs)?;
1318 }
1319 if !self.input.files.is_empty() {
1320 write!(f, ", \"files\": ")?;
1321 f.debug_list()
1322 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1323 .finish()?;
1324 }
1325
1326 #[cfg(feature = "enterprise")]
1327 self.format_extension_ranges(f)?;
1328
1329 Ok(())
1330 }
1331 }
1332
1333 write!(f, "{:?}", InputWrapper { input: &self.input })
1334 }
1335}
1336
1337#[derive(Clone, Default)]
1340pub struct PredicateGroup {
1341 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1342 predicate_all: Option<Predicate>,
1344 predicate_without_region: Option<Predicate>,
1346 region_partition_expr: Option<PartitionExpr>,
1348}
1349
1350impl PredicateGroup {
1351 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1353 let mut combined_exprs = exprs.to_vec();
1354 let mut region_partition_expr = None;
1355
1356 if let Some(expr_json) = metadata.partition_expr.as_ref()
1357 && !expr_json.is_empty()
1358 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1359 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1360 {
1361 let logical_expr = expr
1362 .try_as_logical_expr()
1363 .context(InvalidPartitionExprSnafu {
1364 expr: expr_json.clone(),
1365 })?;
1366
1367 combined_exprs.push(logical_expr);
1368 region_partition_expr = Some(expr);
1369 }
1370
1371 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1372 let mut columns = HashSet::new();
1374 for expr in &combined_exprs {
1375 columns.clear();
1376 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1377 continue;
1378 };
1379 time_filters.push(filter);
1380 }
1381 let time_filters = if time_filters.is_empty() {
1382 None
1383 } else {
1384 Some(Arc::new(time_filters))
1385 };
1386
1387 let predicate_all = if combined_exprs.is_empty() {
1388 None
1389 } else {
1390 Some(Predicate::new(combined_exprs))
1391 };
1392 let predicate_without_region = if exprs.is_empty() {
1393 None
1394 } else {
1395 Some(Predicate::new(exprs.to_vec()))
1396 };
1397
1398 Ok(Self {
1399 time_filters,
1400 predicate_all,
1401 predicate_without_region,
1402 region_partition_expr,
1403 })
1404 }
1405
1406 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1408 self.time_filters.clone()
1409 }
1410
1411 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1413 self.predicate_all.as_ref()
1414 }
1415
1416 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1418 self.predicate_without_region.as_ref()
1419 }
1420
1421 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1423 self.region_partition_expr.as_ref()
1424 }
1425
1426 fn expr_to_filter(
1427 expr: &Expr,
1428 metadata: &RegionMetadata,
1429 columns: &mut HashSet<Column>,
1430 ) -> Option<SimpleFilterEvaluator> {
1431 columns.clear();
1432 expr_to_columns(expr, columns).ok()?;
1435 if columns.len() > 1 {
1436 return None;
1438 }
1439 let column = columns.iter().next()?;
1440 let column_meta = metadata.column_by_name(&column.name)?;
1441 if column_meta.semantic_type == SemanticType::Timestamp {
1442 SimpleFilterEvaluator::try_new(expr)
1443 } else {
1444 None
1445 }
1446 }
1447}