1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use smallvec::SmallVec;
33use store_api::metadata::{RegionMetadata, RegionMetadataRef};
34use store_api::region_engine::{PartitionRange, RegionScannerRef};
35use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
36use table::predicate::{build_time_range_predicate, Predicate};
37use tokio::sync::{mpsc, Semaphore};
38use tokio_stream::wrappers::ReceiverStream;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheStrategy;
42use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
43use crate::error::Result;
44#[cfg(feature = "enterprise")]
45use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
46use crate::memtable::MemtableRange;
47use crate::metrics::READ_SST_COUNT;
48use crate::read::compat::{self, CompatBatch};
49use crate::read::projection::ProjectionMapper;
50use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
51use crate::read::seq_scan::SeqScan;
52use crate::read::series_scan::SeriesScan;
53use crate::read::stream::ScanBatchStream;
54use crate::read::unordered_scan::UnorderedScan;
55use crate::read::{Batch, Source};
56use crate::region::options::MergeMode;
57use crate::region::version::VersionRef;
58use crate::sst::file::FileHandle;
59use crate::sst::index::bloom_filter::applier::{
60 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
61};
62use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
63use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
64use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
65use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
66use crate::sst::parquet::reader::ReaderMetrics;
67
68pub(crate) enum Scanner {
70 Seq(SeqScan),
72 Unordered(UnorderedScan),
74 Series(SeriesScan),
76}
77
78impl Scanner {
79 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
81 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
82 match self {
83 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
84 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
85 Scanner::Series(series_scan) => series_scan.build_stream().await,
86 }
87 }
88
89 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
91 match self {
92 Scanner::Seq(x) => x.scan_all_partitions(),
93 Scanner::Unordered(x) => x.scan_all_partitions(),
94 Scanner::Series(x) => x.scan_all_partitions(),
95 }
96 }
97}
98
99#[cfg(test)]
100impl Scanner {
101 pub(crate) fn num_files(&self) -> usize {
103 match self {
104 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
105 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
106 Scanner::Series(series_scan) => series_scan.input().num_files(),
107 }
108 }
109
110 pub(crate) fn num_memtables(&self) -> usize {
112 match self {
113 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
114 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
115 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
116 }
117 }
118
119 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
121 match self {
122 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
123 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
124 Scanner::Series(series_scan) => series_scan.input().file_ids(),
125 }
126 }
127
128 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
130 use store_api::region_engine::{PrepareRequest, RegionScanner};
131
132 let request = PrepareRequest::default().with_target_partitions(target_partitions);
133 match self {
134 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
135 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
136 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
137 }
138 }
139}
140
141#[cfg_attr(doc, aquamarine::aquamarine)]
142pub(crate) struct ScanRegion {
192 version: VersionRef,
194 access_layer: AccessLayerRef,
196 request: ScanRequest,
198 cache_strategy: CacheStrategy,
200 parallel_scan_channel_size: usize,
202 ignore_inverted_index: bool,
204 ignore_fulltext_index: bool,
206 ignore_bloom_filter: bool,
208 start_time: Option<Instant>,
210 filter_deleted: bool,
213 #[cfg(feature = "enterprise")]
214 extension_range_provider: Option<BoxedExtensionRangeProvider>,
215}
216
217impl ScanRegion {
218 pub(crate) fn new(
220 version: VersionRef,
221 access_layer: AccessLayerRef,
222 request: ScanRequest,
223 cache_strategy: CacheStrategy,
224 ) -> ScanRegion {
225 ScanRegion {
226 version,
227 access_layer,
228 request,
229 cache_strategy,
230 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
231 ignore_inverted_index: false,
232 ignore_fulltext_index: false,
233 ignore_bloom_filter: false,
234 start_time: None,
235 filter_deleted: true,
236 #[cfg(feature = "enterprise")]
237 extension_range_provider: None,
238 }
239 }
240
241 #[must_use]
243 pub(crate) fn with_parallel_scan_channel_size(
244 mut self,
245 parallel_scan_channel_size: usize,
246 ) -> Self {
247 self.parallel_scan_channel_size = parallel_scan_channel_size;
248 self
249 }
250
251 #[must_use]
253 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
254 self.ignore_inverted_index = ignore;
255 self
256 }
257
258 #[must_use]
260 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
261 self.ignore_fulltext_index = ignore;
262 self
263 }
264
265 #[must_use]
267 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
268 self.ignore_bloom_filter = ignore;
269 self
270 }
271
272 #[must_use]
273 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
274 self.start_time = Some(now);
275 self
276 }
277
278 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
279 self.filter_deleted = filter_deleted;
280 }
281
282 #[cfg(feature = "enterprise")]
283 pub(crate) fn set_extension_range_provider(
284 &mut self,
285 extension_range_provider: BoxedExtensionRangeProvider,
286 ) {
287 self.extension_range_provider = Some(extension_range_provider);
288 }
289
290 pub(crate) async fn scanner(self) -> Result<Scanner> {
292 if self.use_series_scan() {
293 self.series_scan().await.map(Scanner::Series)
294 } else if self.use_unordered_scan() {
295 self.unordered_scan().await.map(Scanner::Unordered)
298 } else {
299 self.seq_scan().await.map(Scanner::Seq)
300 }
301 }
302
303 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
305 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
306 if self.use_series_scan() {
307 self.series_scan()
308 .await
309 .map(|scanner| Box::new(scanner) as _)
310 } else if self.use_unordered_scan() {
311 self.unordered_scan()
312 .await
313 .map(|scanner| Box::new(scanner) as _)
314 } else {
315 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
316 }
317 }
318
319 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
321 let input = self.scan_input().await?;
322 Ok(SeqScan::new(input, false))
323 }
324
325 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
327 let input = self.scan_input().await?;
328 Ok(UnorderedScan::new(input))
329 }
330
331 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
333 let input = self.scan_input().await?;
334 Ok(SeriesScan::new(input))
335 }
336
337 fn use_unordered_scan(&self) -> bool {
339 self.version.options.append_mode
346 && self.request.series_row_selector.is_none()
347 && (self.request.distribution.is_none()
348 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
349 }
350
351 fn use_series_scan(&self) -> bool {
353 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
354 }
355
356 async fn scan_input(mut self) -> Result<ScanInput> {
358 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
359 let time_range = self.build_time_range_predicate();
360
361 let ssts = &self.version.ssts;
362 let mut files = Vec::new();
363 for level in ssts.levels() {
364 for file in level.files.values() {
365 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
366 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
367 (Some(_), None) => true,
373 (None, _) => true,
374 };
375
376 if exceed_min_sequence && file_in_range(file, &time_range) {
378 files.push(file.clone());
379 }
380 }
384 }
385
386 let memtables = self.version.memtables.list_memtables();
387 let memtables: Vec<_> = memtables
389 .into_iter()
390 .filter(|mem| {
391 let Some((start, end)) = mem.stats().time_range() else {
393 return false;
394 };
395 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
397 memtable_range.intersects(&time_range)
398 })
399 .collect();
400
401 let region_id = self.region_id();
402 debug!(
403 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
404 region_id,
405 self.request,
406 time_range,
407 memtables.len(),
408 files.len(),
409 self.version.options.append_mode,
410 );
411
412 self.maybe_remove_field_filters();
414
415 let inverted_index_applier = self.build_invereted_index_applier();
416 let bloom_filter_applier = self.build_bloom_filter_applier();
417 let fulltext_index_applier = self.build_fulltext_index_applier();
418 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
419 let mapper = match &self.request.projection {
421 Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
422 None => ProjectionMapper::all(&self.version.metadata)?,
423 };
424 let memtables = memtables
426 .into_iter()
427 .map(|mem| {
428 mem.ranges(
429 Some(mapper.column_ids()),
430 predicate.clone(),
431 self.request.sequence,
432 )
433 .map(MemRangeBuilder::new)
434 })
435 .collect::<Result<Vec<_>>>()?;
436
437 let input = ScanInput::new(self.access_layer, mapper)
438 .with_time_range(Some(time_range))
439 .with_predicate(predicate)
440 .with_memtables(memtables)
441 .with_files(files)
442 .with_cache(self.cache_strategy)
443 .with_inverted_index_applier(inverted_index_applier)
444 .with_bloom_filter_index_applier(bloom_filter_applier)
445 .with_fulltext_index_applier(fulltext_index_applier)
446 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
447 .with_start_time(self.start_time)
448 .with_append_mode(self.version.options.append_mode)
449 .with_filter_deleted(self.filter_deleted)
450 .with_merge_mode(self.version.options.merge_mode())
451 .with_series_row_selector(self.request.series_row_selector)
452 .with_distribution(self.request.distribution);
453
454 #[cfg(feature = "enterprise")]
455 let input = if let Some(provider) = self.extension_range_provider {
456 let ranges = provider
457 .find_extension_ranges(time_range, &self.request)
458 .await?;
459 input.with_extension_ranges(ranges)
460 } else {
461 input
462 };
463 Ok(input)
464 }
465
466 fn region_id(&self) -> RegionId {
467 self.version.metadata.region_id
468 }
469
470 fn build_time_range_predicate(&self) -> TimestampRange {
472 let time_index = self.version.metadata.time_index_column();
473 let unit = time_index
474 .column_schema
475 .data_type
476 .as_timestamp()
477 .expect("Time index must have timestamp-compatible type")
478 .unit();
479 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
480 }
481
482 fn maybe_remove_field_filters(&mut self) {
484 if self.version.options.merge_mode() != MergeMode::LastNonNull {
485 return;
486 }
487
488 let field_columns = self
490 .version
491 .metadata
492 .field_columns()
493 .map(|col| &col.column_schema.name)
494 .collect::<HashSet<_>>();
495 let mut columns = HashSet::new();
497
498 self.request.filters.retain(|expr| {
499 columns.clear();
500 if expr_to_columns(expr, &mut columns).is_err() {
502 return false;
503 }
504 for column in &columns {
505 if field_columns.contains(&column.name) {
506 return false;
508 }
509 }
510 true
511 });
512 }
513
514 fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
516 if self.ignore_inverted_index {
517 return None;
518 }
519
520 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
521 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
522
523 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
524
525 InvertedIndexApplierBuilder::new(
526 self.access_layer.region_dir().to_string(),
527 self.access_layer.object_store().clone(),
528 self.version.metadata.as_ref(),
529 self.version.metadata.inverted_indexed_column_ids(
530 self.version
531 .options
532 .index_options
533 .inverted_index
534 .ignore_column_ids
535 .iter(),
536 ),
537 self.access_layer.puffin_manager_factory().clone(),
538 )
539 .with_file_cache(file_cache)
540 .with_inverted_index_cache(inverted_index_cache)
541 .with_puffin_metadata_cache(puffin_metadata_cache)
542 .build(&self.request.filters)
543 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
544 .ok()
545 .flatten()
546 .map(Arc::new)
547 }
548
549 fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
551 if self.ignore_bloom_filter {
552 return None;
553 }
554
555 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
556 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
557 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
558
559 BloomFilterIndexApplierBuilder::new(
560 self.access_layer.region_dir().to_string(),
561 self.access_layer.object_store().clone(),
562 self.version.metadata.as_ref(),
563 self.access_layer.puffin_manager_factory().clone(),
564 )
565 .with_file_cache(file_cache)
566 .with_bloom_filter_index_cache(bloom_filter_index_cache)
567 .with_puffin_metadata_cache(puffin_metadata_cache)
568 .build(&self.request.filters)
569 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
570 .ok()
571 .flatten()
572 .map(Arc::new)
573 }
574
575 fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
577 if self.ignore_fulltext_index {
578 return None;
579 }
580
581 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
582 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
583 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
584 FulltextIndexApplierBuilder::new(
585 self.access_layer.region_dir().to_string(),
586 self.region_id(),
587 self.access_layer.object_store().clone(),
588 self.access_layer.puffin_manager_factory().clone(),
589 self.version.metadata.as_ref(),
590 )
591 .with_file_cache(file_cache)
592 .with_puffin_metadata_cache(puffin_metadata_cache)
593 .with_bloom_filter_cache(bloom_filter_index_cache)
594 .build(&self.request.filters)
595 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
596 .ok()
597 .flatten()
598 .map(Arc::new)
599 }
600}
601
602fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
604 if predicate == &TimestampRange::min_to_max() {
605 return true;
606 }
607 let (start, end) = file.time_range();
609 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
610 file_ts_range.intersects(predicate)
611}
612
613pub struct ScanInput {
615 access_layer: AccessLayerRef,
617 pub(crate) mapper: Arc<ProjectionMapper>,
619 time_range: Option<TimestampRange>,
621 pub(crate) predicate: PredicateGroup,
623 pub(crate) memtables: Vec<MemRangeBuilder>,
625 pub(crate) files: Vec<FileHandle>,
627 pub(crate) cache_strategy: CacheStrategy,
629 ignore_file_not_found: bool,
631 pub(crate) parallel_scan_channel_size: usize,
633 inverted_index_applier: Option<InvertedIndexApplierRef>,
635 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
636 fulltext_index_applier: Option<FulltextIndexApplierRef>,
637 pub(crate) query_start: Option<Instant>,
639 pub(crate) append_mode: bool,
641 pub(crate) filter_deleted: bool,
643 pub(crate) merge_mode: MergeMode,
645 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
647 pub(crate) distribution: Option<TimeSeriesDistribution>,
649 #[cfg(feature = "enterprise")]
650 extension_ranges: Vec<BoxedExtensionRange>,
651}
652
653impl ScanInput {
654 #[must_use]
656 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
657 ScanInput {
658 access_layer,
659 mapper: Arc::new(mapper),
660 time_range: None,
661 predicate: PredicateGroup::default(),
662 memtables: Vec::new(),
663 files: Vec::new(),
664 cache_strategy: CacheStrategy::Disabled,
665 ignore_file_not_found: false,
666 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
667 inverted_index_applier: None,
668 bloom_filter_index_applier: None,
669 fulltext_index_applier: None,
670 query_start: None,
671 append_mode: false,
672 filter_deleted: true,
673 merge_mode: MergeMode::default(),
674 series_row_selector: None,
675 distribution: None,
676 #[cfg(feature = "enterprise")]
677 extension_ranges: Vec::new(),
678 }
679 }
680
681 #[must_use]
683 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
684 self.time_range = time_range;
685 self
686 }
687
688 #[must_use]
690 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
691 self.predicate = predicate;
692 self
693 }
694
695 #[must_use]
697 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
698 self.memtables = memtables;
699 self
700 }
701
702 #[must_use]
704 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
705 self.files = files;
706 self
707 }
708
709 #[must_use]
711 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
712 self.cache_strategy = cache;
713 self
714 }
715
716 #[must_use]
718 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
719 self.ignore_file_not_found = ignore;
720 self
721 }
722
723 #[must_use]
725 pub(crate) fn with_parallel_scan_channel_size(
726 mut self,
727 parallel_scan_channel_size: usize,
728 ) -> Self {
729 self.parallel_scan_channel_size = parallel_scan_channel_size;
730 self
731 }
732
733 #[must_use]
735 pub(crate) fn with_inverted_index_applier(
736 mut self,
737 applier: Option<InvertedIndexApplierRef>,
738 ) -> Self {
739 self.inverted_index_applier = applier;
740 self
741 }
742
743 #[must_use]
745 pub(crate) fn with_bloom_filter_index_applier(
746 mut self,
747 applier: Option<BloomFilterIndexApplierRef>,
748 ) -> Self {
749 self.bloom_filter_index_applier = applier;
750 self
751 }
752
753 #[must_use]
755 pub(crate) fn with_fulltext_index_applier(
756 mut self,
757 applier: Option<FulltextIndexApplierRef>,
758 ) -> Self {
759 self.fulltext_index_applier = applier;
760 self
761 }
762
763 #[must_use]
765 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
766 self.query_start = now;
767 self
768 }
769
770 #[must_use]
771 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
772 self.append_mode = is_append_mode;
773 self
774 }
775
776 #[must_use]
778 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
779 self.filter_deleted = filter_deleted;
780 self
781 }
782
783 #[must_use]
785 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
786 self.merge_mode = merge_mode;
787 self
788 }
789
790 #[must_use]
792 pub(crate) fn with_distribution(
793 mut self,
794 distribution: Option<TimeSeriesDistribution>,
795 ) -> Self {
796 self.distribution = distribution;
797 self
798 }
799
800 #[must_use]
802 pub(crate) fn with_series_row_selector(
803 mut self,
804 series_row_selector: Option<TimeSeriesRowSelector>,
805 ) -> Self {
806 self.series_row_selector = series_row_selector;
807 self
808 }
809
810 pub(crate) fn create_parallel_sources(
814 &self,
815 sources: Vec<Source>,
816 semaphore: Arc<Semaphore>,
817 ) -> Result<Vec<Source>> {
818 if sources.len() <= 1 {
819 return Ok(sources);
820 }
821
822 let sources = sources
824 .into_iter()
825 .map(|source| {
826 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
827 self.spawn_scan_task(source, semaphore.clone(), sender);
828 let stream = Box::pin(ReceiverStream::new(receiver));
829 Source::Stream(stream)
830 })
831 .collect();
832 Ok(sources)
833 }
834
835 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
837 let memtable = &self.memtables[index.index];
838 let mut ranges = SmallVec::new();
839 memtable.build_ranges(index.row_group_index, &mut ranges);
840 ranges
841 }
842
843 pub async fn prune_file(
845 &self,
846 file: &FileHandle,
847 reader_metrics: &mut ReaderMetrics,
848 ) -> Result<FileRangeBuilder> {
849 let res = self
850 .access_layer
851 .read_sst(file.clone())
852 .predicate(self.predicate.predicate().cloned())
853 .projection(Some(self.mapper.column_ids().to_vec()))
854 .cache(self.cache_strategy.clone())
855 .inverted_index_applier(self.inverted_index_applier.clone())
856 .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
857 .fulltext_index_applier(self.fulltext_index_applier.clone())
858 .expected_metadata(Some(self.mapper.metadata().clone()))
859 .build_reader_input(reader_metrics)
860 .await;
861 let (mut file_range_ctx, selection) = match res {
862 Ok(x) => x,
863 Err(e) => {
864 if e.is_object_not_found() && self.ignore_file_not_found {
865 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
866 return Ok(FileRangeBuilder::default());
867 } else {
868 return Err(e);
869 }
870 }
871 };
872 if !compat::has_same_columns_and_pk_encoding(
873 self.mapper.metadata(),
874 file_range_ctx.read_format().metadata(),
875 ) {
876 let compat = CompatBatch::new(
879 &self.mapper,
880 file_range_ctx.read_format().metadata().clone(),
881 )?;
882 file_range_ctx.set_compat_batch(Some(compat));
883 }
884 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
885 }
886
887 pub(crate) fn spawn_scan_task(
889 &self,
890 mut input: Source,
891 semaphore: Arc<Semaphore>,
892 sender: mpsc::Sender<Result<Batch>>,
893 ) {
894 common_runtime::spawn_global(async move {
895 loop {
896 let maybe_batch = {
899 let _permit = semaphore.acquire().await.unwrap();
901 input.next_batch().await
902 };
903 match maybe_batch {
904 Ok(Some(batch)) => {
905 let _ = sender.send(Ok(batch)).await;
906 }
907 Ok(None) => break,
908 Err(e) => {
909 let _ = sender.send(Err(e)).await;
910 break;
911 }
912 }
913 }
914 });
915 }
916
917 pub(crate) fn total_rows(&self) -> usize {
918 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
919 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
920
921 let rows = rows_in_files + rows_in_memtables;
922 #[cfg(feature = "enterprise")]
923 let rows = rows
924 + self
925 .extension_ranges
926 .iter()
927 .map(|x| x.num_rows())
928 .sum::<u64>() as usize;
929 rows
930 }
931
932 pub(crate) fn predicate(&self) -> Option<&Predicate> {
934 self.predicate.predicate()
935 }
936
937 pub(crate) fn num_memtables(&self) -> usize {
939 self.memtables.len()
940 }
941
942 pub(crate) fn num_files(&self) -> usize {
944 self.files.len()
945 }
946
947 pub fn region_metadata(&self) -> &RegionMetadataRef {
948 self.mapper.metadata()
949 }
950}
951
952#[cfg(feature = "enterprise")]
953impl ScanInput {
954 #[must_use]
955 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
956 Self {
957 extension_ranges,
958 ..self
959 }
960 }
961
962 #[cfg(feature = "enterprise")]
963 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
964 &self.extension_ranges
965 }
966
967 #[cfg(feature = "enterprise")]
969 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
970 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
971 }
972}
973
974#[cfg(test)]
975impl ScanInput {
976 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
978 self.files.iter().map(|file| file.file_id()).collect()
979 }
980}
981
982pub struct StreamContext {
985 pub input: ScanInput,
987 pub(crate) ranges: Vec<RangeMeta>,
989
990 pub(crate) query_start: Instant,
993}
994
995impl StreamContext {
996 pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
998 let query_start = input.query_start.unwrap_or_else(Instant::now);
999 let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
1000 READ_SST_COUNT.observe(input.num_files() as f64);
1001
1002 Self {
1003 input,
1004 ranges,
1005 query_start,
1006 }
1007 }
1008
1009 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1011 let query_start = input.query_start.unwrap_or_else(Instant::now);
1012 let ranges = RangeMeta::unordered_scan_ranges(&input);
1013 READ_SST_COUNT.observe(input.num_files() as f64);
1014
1015 Self {
1016 input,
1017 ranges,
1018 query_start,
1019 }
1020 }
1021
1022 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1024 self.input.num_memtables() > index.index
1025 }
1026
1027 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1028 !self.is_mem_range_index(index)
1029 && index.index < self.input.num_files() + self.input.num_memtables()
1030 }
1031
1032 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1034 self.ranges
1035 .iter()
1036 .enumerate()
1037 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1038 .collect()
1039 }
1040
1041 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1043 let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
1044 for range_meta in &self.ranges {
1045 for idx in &range_meta.row_group_indices {
1046 if self.is_mem_range_index(*idx) {
1047 num_mem_ranges += 1;
1048 } else {
1049 num_file_ranges += 1;
1050 }
1051 }
1052 }
1053 if verbose {
1054 write!(f, "{{")?;
1055 }
1056 write!(
1057 f,
1058 "\"partition_count\":{{\"count\":{}, \"mem_ranges\":{}, \"files\":{}, \"file_ranges\":{}}}",
1059 self.ranges.len(),
1060 num_mem_ranges,
1061 self.input.num_files(),
1062 num_file_ranges,
1063 )?;
1064 if let Some(selector) = &self.input.series_row_selector {
1065 write!(f, ", \"selector\":\"{}\"", selector)?;
1066 }
1067 if let Some(distribution) = &self.input.distribution {
1068 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1069 }
1070
1071 if verbose {
1072 self.format_verbose_content(f)?;
1073 }
1074
1075 Ok(())
1076 }
1077
1078 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1079 struct FileWrapper<'a> {
1080 file: &'a FileHandle,
1081 }
1082
1083 impl fmt::Debug for FileWrapper<'_> {
1084 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1085 let (start, end) = self.file.time_range();
1086 write!(
1087 f,
1088 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1089 self.file.file_id(),
1090 start.value(),
1091 start.unit(),
1092 end.value(),
1093 end.unit(),
1094 self.file.num_rows(),
1095 self.file.size(),
1096 self.file.index_size()
1097 )
1098 }
1099 }
1100
1101 struct InputWrapper<'a> {
1102 input: &'a ScanInput,
1103 }
1104
1105 impl fmt::Debug for InputWrapper<'_> {
1106 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1107 let output_schema = self.input.mapper.output_schema();
1108 if !output_schema.is_empty() {
1109 let names: Vec<_> = output_schema
1110 .column_schemas()
1111 .iter()
1112 .map(|col| &col.name)
1113 .collect();
1114 write!(f, ", \"projection\": {:?}", names)?;
1115 }
1116 if let Some(predicate) = &self.input.predicate.predicate() {
1117 if !predicate.exprs().is_empty() {
1118 let exprs: Vec<_> =
1119 predicate.exprs().iter().map(|e| e.to_string()).collect();
1120 write!(f, ", \"filters\": {:?}", exprs)?;
1121 }
1122 }
1123 if !self.input.files.is_empty() {
1124 write!(f, ", \"files\": ")?;
1125 f.debug_list()
1126 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1127 .finish()?;
1128 }
1129
1130 Ok(())
1131 }
1132 }
1133
1134 write!(f, "{:?}", InputWrapper { input: &self.input })
1135 }
1136}
1137
1138#[derive(Clone, Default)]
1141pub struct PredicateGroup {
1142 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1143
1144 predicate: Option<Predicate>,
1147}
1148
1149impl PredicateGroup {
1150 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1152 let mut time_filters = Vec::with_capacity(exprs.len());
1153 let mut columns = HashSet::new();
1155 for expr in exprs {
1156 columns.clear();
1157 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1158 continue;
1159 };
1160 time_filters.push(filter);
1161 }
1162 let time_filters = if time_filters.is_empty() {
1163 None
1164 } else {
1165 Some(Arc::new(time_filters))
1166 };
1167 let predicate = Predicate::new(exprs.to_vec());
1168
1169 Self {
1170 time_filters,
1171 predicate: Some(predicate),
1172 }
1173 }
1174
1175 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1177 self.time_filters.clone()
1178 }
1179
1180 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1182 self.predicate.as_ref()
1183 }
1184
1185 fn expr_to_filter(
1186 expr: &Expr,
1187 metadata: &RegionMetadata,
1188 columns: &mut HashSet<Column>,
1189 ) -> Option<SimpleFilterEvaluator> {
1190 columns.clear();
1191 expr_to_columns(expr, columns).ok()?;
1194 if columns.len() > 1 {
1195 return None;
1197 }
1198 let column = columns.iter().next()?;
1199 let column_meta = metadata.column_by_name(&column.name)?;
1200 if column_meta.semantic_type == SemanticType::Timestamp {
1201 SimpleFilterEvaluator::try_new(expr)
1202 } else {
1203 None
1204 }
1205 }
1206}