1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use futures::StreamExt;
33use smallvec::SmallVec;
34use store_api::metadata::{RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{PartitionRange, RegionScannerRef};
36use store_api::storage::{RegionId, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
37use table::predicate::{build_time_range_predicate, Predicate};
38use tokio::sync::{mpsc, Semaphore};
39use tokio_stream::wrappers::ReceiverStream;
40
41use crate::access_layer::AccessLayerRef;
42use crate::cache::CacheStrategy;
43use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
44use crate::error::Result;
45#[cfg(feature = "enterprise")]
46use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
47use crate::memtable::MemtableRange;
48use crate::metrics::READ_SST_COUNT;
49use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
50use crate::read::projection::ProjectionMapper;
51use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
52use crate::read::seq_scan::SeqScan;
53use crate::read::series_scan::SeriesScan;
54use crate::read::stream::ScanBatchStream;
55use crate::read::unordered_scan::UnorderedScan;
56use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
57use crate::region::options::MergeMode;
58use crate::region::version::VersionRef;
59use crate::sst::file::FileHandle;
60use crate::sst::index::bloom_filter::applier::{
61 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
62};
63use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
64use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
65use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
66use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
67use crate::sst::parquet::reader::ReaderMetrics;
68
69const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
71
72pub(crate) enum Scanner {
74 Seq(SeqScan),
76 Unordered(UnorderedScan),
78 Series(SeriesScan),
80}
81
82impl Scanner {
83 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
85 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
86 match self {
87 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
88 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
89 Scanner::Series(series_scan) => series_scan.build_stream().await,
90 }
91 }
92
93 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
95 match self {
96 Scanner::Seq(x) => x.scan_all_partitions(),
97 Scanner::Unordered(x) => x.scan_all_partitions(),
98 Scanner::Series(x) => x.scan_all_partitions(),
99 }
100 }
101}
102
103#[cfg(test)]
104impl Scanner {
105 pub(crate) fn num_files(&self) -> usize {
107 match self {
108 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
109 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
110 Scanner::Series(series_scan) => series_scan.input().num_files(),
111 }
112 }
113
114 pub(crate) fn num_memtables(&self) -> usize {
116 match self {
117 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
118 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
119 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
120 }
121 }
122
123 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
125 match self {
126 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
127 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
128 Scanner::Series(series_scan) => series_scan.input().file_ids(),
129 }
130 }
131
132 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
134 use store_api::region_engine::{PrepareRequest, RegionScanner};
135
136 let request = PrepareRequest::default().with_target_partitions(target_partitions);
137 match self {
138 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
139 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
140 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
141 }
142 }
143}
144
145#[cfg_attr(doc, aquamarine::aquamarine)]
146pub(crate) struct ScanRegion {
196 version: VersionRef,
198 access_layer: AccessLayerRef,
200 request: ScanRequest,
202 cache_strategy: CacheStrategy,
204 parallel_scan_channel_size: usize,
206 max_concurrent_scan_files: usize,
208 ignore_inverted_index: bool,
210 ignore_fulltext_index: bool,
212 ignore_bloom_filter: bool,
214 start_time: Option<Instant>,
216 filter_deleted: bool,
219 flat_format: bool,
221 #[cfg(feature = "enterprise")]
222 extension_range_provider: Option<BoxedExtensionRangeProvider>,
223}
224
225impl ScanRegion {
226 pub(crate) fn new(
228 version: VersionRef,
229 access_layer: AccessLayerRef,
230 request: ScanRequest,
231 cache_strategy: CacheStrategy,
232 ) -> ScanRegion {
233 ScanRegion {
234 version,
235 access_layer,
236 request,
237 cache_strategy,
238 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
239 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
240 ignore_inverted_index: false,
241 ignore_fulltext_index: false,
242 ignore_bloom_filter: false,
243 start_time: None,
244 filter_deleted: true,
245 flat_format: false,
246 #[cfg(feature = "enterprise")]
247 extension_range_provider: None,
248 }
249 }
250
251 #[must_use]
253 pub(crate) fn with_parallel_scan_channel_size(
254 mut self,
255 parallel_scan_channel_size: usize,
256 ) -> Self {
257 self.parallel_scan_channel_size = parallel_scan_channel_size;
258 self
259 }
260
261 #[must_use]
263 pub(crate) fn with_max_concurrent_scan_files(
264 mut self,
265 max_concurrent_scan_files: usize,
266 ) -> Self {
267 self.max_concurrent_scan_files = max_concurrent_scan_files;
268 self
269 }
270
271 #[must_use]
273 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
274 self.ignore_inverted_index = ignore;
275 self
276 }
277
278 #[must_use]
280 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
281 self.ignore_fulltext_index = ignore;
282 self
283 }
284
285 #[must_use]
287 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
288 self.ignore_bloom_filter = ignore;
289 self
290 }
291
292 #[must_use]
293 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
294 self.start_time = Some(now);
295 self
296 }
297
298 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
299 self.filter_deleted = filter_deleted;
300 }
301
302 #[must_use]
304 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
305 self.flat_format = flat_format;
306 self
307 }
308
309 #[cfg(feature = "enterprise")]
310 pub(crate) fn set_extension_range_provider(
311 &mut self,
312 extension_range_provider: BoxedExtensionRangeProvider,
313 ) {
314 self.extension_range_provider = Some(extension_range_provider);
315 }
316
317 pub(crate) async fn scanner(self) -> Result<Scanner> {
319 if self.use_series_scan() {
320 self.series_scan().await.map(Scanner::Series)
321 } else if self.use_unordered_scan() {
322 self.unordered_scan().await.map(Scanner::Unordered)
325 } else {
326 self.seq_scan().await.map(Scanner::Seq)
327 }
328 }
329
330 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
332 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
333 if self.use_series_scan() {
334 self.series_scan()
335 .await
336 .map(|scanner| Box::new(scanner) as _)
337 } else if self.use_unordered_scan() {
338 self.unordered_scan()
339 .await
340 .map(|scanner| Box::new(scanner) as _)
341 } else {
342 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
343 }
344 }
345
346 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
348 let input = self.scan_input().await?;
349 Ok(SeqScan::new(input, false))
350 }
351
352 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
354 let input = self.scan_input().await?;
355 Ok(UnorderedScan::new(input))
356 }
357
358 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
360 let input = self.scan_input().await?;
361 Ok(SeriesScan::new(input))
362 }
363
364 fn use_unordered_scan(&self) -> bool {
366 self.version.options.append_mode
373 && self.request.series_row_selector.is_none()
374 && (self.request.distribution.is_none()
375 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
376 }
377
378 fn use_series_scan(&self) -> bool {
380 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
381 }
382
383 async fn scan_input(mut self) -> Result<ScanInput> {
385 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
386 let time_range = self.build_time_range_predicate();
387 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
388
389 let mapper = match &self.request.projection {
391 Some(p) => {
392 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), self.flat_format)?
393 }
394 None => ProjectionMapper::all(&self.version.metadata, self.flat_format)?,
395 };
396
397 let ssts = &self.version.ssts;
398 let mut files = Vec::new();
399 for level in ssts.levels() {
400 for file in level.files.values() {
401 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
402 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
403 (Some(_), None) => true,
409 (None, _) => true,
410 };
411
412 if exceed_min_sequence && file_in_range(file, &time_range) {
414 files.push(file.clone());
415 }
416 }
420 }
421
422 let memtables = self.version.memtables.list_memtables();
423 let mut mem_range_builders = Vec::new();
425
426 for m in memtables {
427 let Some((start, end)) = m.stats().time_range() else {
429 continue;
430 };
431 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
433 if !memtable_range.intersects(&time_range) {
434 continue;
435 }
436 let ranges_in_memtable = m.ranges(
437 Some(mapper.column_ids()),
438 predicate.clone(),
439 self.request.sequence,
440 )?;
441 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
442 let mut stats = ranges_in_memtable.stats.clone();
444 stats.num_ranges = 1;
445 stats.num_rows = v.num_rows();
446 MemRangeBuilder::new(v, stats)
447 }));
448 }
449
450 let region_id = self.region_id();
451 debug!(
452 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
453 region_id,
454 self.request,
455 time_range,
456 mem_range_builders.len(),
457 files.len(),
458 self.version.options.append_mode,
459 );
460
461 self.maybe_remove_field_filters();
463
464 let inverted_index_applier = self.build_invereted_index_applier();
465 let bloom_filter_applier = self.build_bloom_filter_applier();
466 let fulltext_index_applier = self.build_fulltext_index_applier();
467 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
468
469 if self.flat_format {
470 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
472 }
473
474 let input = ScanInput::new(self.access_layer, mapper)
475 .with_time_range(Some(time_range))
476 .with_predicate(predicate)
477 .with_memtables(mem_range_builders)
478 .with_files(files)
479 .with_cache(self.cache_strategy)
480 .with_inverted_index_applier(inverted_index_applier)
481 .with_bloom_filter_index_applier(bloom_filter_applier)
482 .with_fulltext_index_applier(fulltext_index_applier)
483 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
484 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
485 .with_start_time(self.start_time)
486 .with_append_mode(self.version.options.append_mode)
487 .with_filter_deleted(self.filter_deleted)
488 .with_merge_mode(self.version.options.merge_mode())
489 .with_series_row_selector(self.request.series_row_selector)
490 .with_distribution(self.request.distribution)
491 .with_flat_format(self.flat_format);
492
493 #[cfg(feature = "enterprise")]
494 let input = if let Some(provider) = self.extension_range_provider {
495 let ranges = provider
496 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
497 .await?;
498 debug!("Find extension ranges: {ranges:?}");
499 input.with_extension_ranges(ranges)
500 } else {
501 input
502 };
503 Ok(input)
504 }
505
506 fn region_id(&self) -> RegionId {
507 self.version.metadata.region_id
508 }
509
510 fn build_time_range_predicate(&self) -> TimestampRange {
512 let time_index = self.version.metadata.time_index_column();
513 let unit = time_index
514 .column_schema
515 .data_type
516 .as_timestamp()
517 .expect("Time index must have timestamp-compatible type")
518 .unit();
519 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
520 }
521
522 fn maybe_remove_field_filters(&mut self) {
524 if self.version.options.merge_mode() != MergeMode::LastNonNull {
525 return;
526 }
527
528 let field_columns = self
530 .version
531 .metadata
532 .field_columns()
533 .map(|col| &col.column_schema.name)
534 .collect::<HashSet<_>>();
535 let mut columns = HashSet::new();
537
538 self.request.filters.retain(|expr| {
539 columns.clear();
540 if expr_to_columns(expr, &mut columns).is_err() {
542 return false;
543 }
544 for column in &columns {
545 if field_columns.contains(&column.name) {
546 return false;
548 }
549 }
550 true
551 });
552 }
553
554 fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
556 if self.ignore_inverted_index {
557 return None;
558 }
559
560 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
561 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
562
563 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
564
565 InvertedIndexApplierBuilder::new(
566 self.access_layer.table_dir().to_string(),
567 self.access_layer.path_type(),
568 self.access_layer.object_store().clone(),
569 self.version.metadata.as_ref(),
570 self.version.metadata.inverted_indexed_column_ids(
571 self.version
572 .options
573 .index_options
574 .inverted_index
575 .ignore_column_ids
576 .iter(),
577 ),
578 self.access_layer.puffin_manager_factory().clone(),
579 )
580 .with_file_cache(file_cache)
581 .with_inverted_index_cache(inverted_index_cache)
582 .with_puffin_metadata_cache(puffin_metadata_cache)
583 .build(&self.request.filters)
584 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
585 .ok()
586 .flatten()
587 .map(Arc::new)
588 }
589
590 fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
592 if self.ignore_bloom_filter {
593 return None;
594 }
595
596 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
597 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
598 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
599
600 BloomFilterIndexApplierBuilder::new(
601 self.access_layer.table_dir().to_string(),
602 self.access_layer.path_type(),
603 self.access_layer.object_store().clone(),
604 self.version.metadata.as_ref(),
605 self.access_layer.puffin_manager_factory().clone(),
606 )
607 .with_file_cache(file_cache)
608 .with_bloom_filter_index_cache(bloom_filter_index_cache)
609 .with_puffin_metadata_cache(puffin_metadata_cache)
610 .build(&self.request.filters)
611 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
612 .ok()
613 .flatten()
614 .map(Arc::new)
615 }
616
617 fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
619 if self.ignore_fulltext_index {
620 return None;
621 }
622
623 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
624 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
625 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
626 FulltextIndexApplierBuilder::new(
627 self.access_layer.table_dir().to_string(),
628 self.access_layer.path_type(),
629 self.access_layer.object_store().clone(),
630 self.access_layer.puffin_manager_factory().clone(),
631 self.version.metadata.as_ref(),
632 )
633 .with_file_cache(file_cache)
634 .with_puffin_metadata_cache(puffin_metadata_cache)
635 .with_bloom_filter_cache(bloom_filter_index_cache)
636 .build(&self.request.filters)
637 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
638 .ok()
639 .flatten()
640 .map(Arc::new)
641 }
642}
643
644fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
646 if predicate == &TimestampRange::min_to_max() {
647 return true;
648 }
649 let (start, end) = file.time_range();
651 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
652 file_ts_range.intersects(predicate)
653}
654
655pub struct ScanInput {
657 access_layer: AccessLayerRef,
659 pub(crate) mapper: Arc<ProjectionMapper>,
661 time_range: Option<TimestampRange>,
663 pub(crate) predicate: PredicateGroup,
665 pub(crate) memtables: Vec<MemRangeBuilder>,
667 pub(crate) files: Vec<FileHandle>,
669 pub(crate) cache_strategy: CacheStrategy,
671 ignore_file_not_found: bool,
673 pub(crate) parallel_scan_channel_size: usize,
675 pub(crate) max_concurrent_scan_files: usize,
677 inverted_index_applier: Option<InvertedIndexApplierRef>,
679 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
680 fulltext_index_applier: Option<FulltextIndexApplierRef>,
681 pub(crate) query_start: Option<Instant>,
683 pub(crate) append_mode: bool,
685 pub(crate) filter_deleted: bool,
687 pub(crate) merge_mode: MergeMode,
689 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
691 pub(crate) distribution: Option<TimeSeriesDistribution>,
693 pub(crate) flat_format: bool,
695 #[cfg(feature = "enterprise")]
696 extension_ranges: Vec<BoxedExtensionRange>,
697}
698
699impl ScanInput {
700 #[must_use]
702 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
703 ScanInput {
704 access_layer,
705 mapper: Arc::new(mapper),
706 time_range: None,
707 predicate: PredicateGroup::default(),
708 memtables: Vec::new(),
709 files: Vec::new(),
710 cache_strategy: CacheStrategy::Disabled,
711 ignore_file_not_found: false,
712 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
713 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
714 inverted_index_applier: None,
715 bloom_filter_index_applier: None,
716 fulltext_index_applier: None,
717 query_start: None,
718 append_mode: false,
719 filter_deleted: true,
720 merge_mode: MergeMode::default(),
721 series_row_selector: None,
722 distribution: None,
723 flat_format: false,
724 #[cfg(feature = "enterprise")]
725 extension_ranges: Vec::new(),
726 }
727 }
728
729 #[must_use]
731 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
732 self.time_range = time_range;
733 self
734 }
735
736 #[must_use]
738 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
739 self.predicate = predicate;
740 self
741 }
742
743 #[must_use]
745 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
746 self.memtables = memtables;
747 self
748 }
749
750 #[must_use]
752 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
753 self.files = files;
754 self
755 }
756
757 #[must_use]
759 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
760 self.cache_strategy = cache;
761 self
762 }
763
764 #[must_use]
766 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
767 self.ignore_file_not_found = ignore;
768 self
769 }
770
771 #[must_use]
773 pub(crate) fn with_parallel_scan_channel_size(
774 mut self,
775 parallel_scan_channel_size: usize,
776 ) -> Self {
777 self.parallel_scan_channel_size = parallel_scan_channel_size;
778 self
779 }
780
781 #[must_use]
783 pub(crate) fn with_max_concurrent_scan_files(
784 mut self,
785 max_concurrent_scan_files: usize,
786 ) -> Self {
787 self.max_concurrent_scan_files = max_concurrent_scan_files;
788 self
789 }
790
791 #[must_use]
793 pub(crate) fn with_inverted_index_applier(
794 mut self,
795 applier: Option<InvertedIndexApplierRef>,
796 ) -> Self {
797 self.inverted_index_applier = applier;
798 self
799 }
800
801 #[must_use]
803 pub(crate) fn with_bloom_filter_index_applier(
804 mut self,
805 applier: Option<BloomFilterIndexApplierRef>,
806 ) -> Self {
807 self.bloom_filter_index_applier = applier;
808 self
809 }
810
811 #[must_use]
813 pub(crate) fn with_fulltext_index_applier(
814 mut self,
815 applier: Option<FulltextIndexApplierRef>,
816 ) -> Self {
817 self.fulltext_index_applier = applier;
818 self
819 }
820
821 #[must_use]
823 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
824 self.query_start = now;
825 self
826 }
827
828 #[must_use]
829 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
830 self.append_mode = is_append_mode;
831 self
832 }
833
834 #[must_use]
836 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
837 self.filter_deleted = filter_deleted;
838 self
839 }
840
841 #[must_use]
843 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
844 self.merge_mode = merge_mode;
845 self
846 }
847
848 #[must_use]
850 pub(crate) fn with_distribution(
851 mut self,
852 distribution: Option<TimeSeriesDistribution>,
853 ) -> Self {
854 self.distribution = distribution;
855 self
856 }
857
858 #[must_use]
860 pub(crate) fn with_series_row_selector(
861 mut self,
862 series_row_selector: Option<TimeSeriesRowSelector>,
863 ) -> Self {
864 self.series_row_selector = series_row_selector;
865 self
866 }
867
868 #[must_use]
870 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
871 self.flat_format = flat_format;
872 self
873 }
874
875 pub(crate) fn create_parallel_sources(
879 &self,
880 sources: Vec<Source>,
881 semaphore: Arc<Semaphore>,
882 ) -> Result<Vec<Source>> {
883 if sources.len() <= 1 {
884 return Ok(sources);
885 }
886
887 let sources = sources
889 .into_iter()
890 .map(|source| {
891 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
892 self.spawn_scan_task(source, semaphore.clone(), sender);
893 let stream = Box::pin(ReceiverStream::new(receiver));
894 Source::Stream(stream)
895 })
896 .collect();
897 Ok(sources)
898 }
899
900 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
902 let memtable = &self.memtables[index.index];
903 let mut ranges = SmallVec::new();
904 memtable.build_ranges(index.row_group_index, &mut ranges);
905 ranges
906 }
907
908 pub async fn prune_file(
910 &self,
911 file: &FileHandle,
912 reader_metrics: &mut ReaderMetrics,
913 ) -> Result<FileRangeBuilder> {
914 let res = self
915 .access_layer
916 .read_sst(file.clone())
917 .predicate(self.predicate.predicate().cloned())
918 .projection(Some(self.mapper.column_ids().to_vec()))
919 .cache(self.cache_strategy.clone())
920 .inverted_index_applier(self.inverted_index_applier.clone())
921 .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
922 .fulltext_index_applier(self.fulltext_index_applier.clone())
923 .expected_metadata(Some(self.mapper.metadata().clone()))
924 .flat_format(self.flat_format)
925 .build_reader_input(reader_metrics)
926 .await;
927 let (mut file_range_ctx, selection) = match res {
928 Ok(x) => x,
929 Err(e) => {
930 if e.is_object_not_found() && self.ignore_file_not_found {
931 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
932 return Ok(FileRangeBuilder::default());
933 } else {
934 return Err(e);
935 }
936 }
937 };
938
939 let need_compat = !compat::has_same_columns_and_pk_encoding(
940 self.mapper.metadata(),
941 file_range_ctx.read_format().metadata(),
942 );
943 if need_compat {
944 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
947 let mapper = self.mapper.as_flat().unwrap();
948 Some(CompatBatch::Flat(FlatCompatBatch::try_new(
949 mapper,
950 flat_format.metadata(),
951 flat_format.format_projection(),
952 )?))
953 } else {
954 let compact_batch = PrimaryKeyCompatBatch::new(
955 &self.mapper,
956 file_range_ctx.read_format().metadata().clone(),
957 )?;
958 Some(CompatBatch::PrimaryKey(compact_batch))
959 };
960 file_range_ctx.set_compat_batch(compat);
961 }
962 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
963 }
964
965 pub(crate) fn spawn_scan_task(
967 &self,
968 mut input: Source,
969 semaphore: Arc<Semaphore>,
970 sender: mpsc::Sender<Result<Batch>>,
971 ) {
972 common_runtime::spawn_global(async move {
973 loop {
974 let maybe_batch = {
977 let _permit = semaphore.acquire().await.unwrap();
979 input.next_batch().await
980 };
981 match maybe_batch {
982 Ok(Some(batch)) => {
983 let _ = sender.send(Ok(batch)).await;
984 }
985 Ok(None) => break,
986 Err(e) => {
987 let _ = sender.send(Err(e)).await;
988 break;
989 }
990 }
991 }
992 });
993 }
994
995 pub(crate) fn create_parallel_flat_sources(
999 &self,
1000 sources: Vec<BoxedRecordBatchStream>,
1001 semaphore: Arc<Semaphore>,
1002 ) -> Result<Vec<BoxedRecordBatchStream>> {
1003 if sources.len() <= 1 {
1004 return Ok(sources);
1005 }
1006
1007 let sources = sources
1009 .into_iter()
1010 .map(|source| {
1011 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1012 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1013 let stream = Box::pin(ReceiverStream::new(receiver));
1014 Box::pin(stream) as _
1015 })
1016 .collect();
1017 Ok(sources)
1018 }
1019
1020 pub(crate) fn spawn_flat_scan_task(
1022 &self,
1023 mut input: BoxedRecordBatchStream,
1024 semaphore: Arc<Semaphore>,
1025 sender: mpsc::Sender<Result<RecordBatch>>,
1026 ) {
1027 common_runtime::spawn_global(async move {
1028 loop {
1029 let maybe_batch = {
1032 let _permit = semaphore.acquire().await.unwrap();
1034 input.next().await
1035 };
1036 match maybe_batch {
1037 Some(Ok(batch)) => {
1038 let _ = sender.send(Ok(batch)).await;
1039 }
1040 Some(Err(e)) => {
1041 let _ = sender.send(Err(e)).await;
1042 break;
1043 }
1044 None => break,
1045 }
1046 }
1047 });
1048 }
1049
1050 pub(crate) fn total_rows(&self) -> usize {
1051 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1052 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1053
1054 let rows = rows_in_files + rows_in_memtables;
1055 #[cfg(feature = "enterprise")]
1056 let rows = rows
1057 + self
1058 .extension_ranges
1059 .iter()
1060 .map(|x| x.num_rows())
1061 .sum::<u64>() as usize;
1062 rows
1063 }
1064
1065 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1067 self.predicate.predicate()
1068 }
1069
1070 pub(crate) fn num_memtables(&self) -> usize {
1072 self.memtables.len()
1073 }
1074
1075 pub(crate) fn num_files(&self) -> usize {
1077 self.files.len()
1078 }
1079
1080 pub fn region_metadata(&self) -> &RegionMetadataRef {
1081 self.mapper.metadata()
1082 }
1083}
1084
1085#[cfg(feature = "enterprise")]
1086impl ScanInput {
1087 #[must_use]
1088 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1089 Self {
1090 extension_ranges,
1091 ..self
1092 }
1093 }
1094
1095 #[cfg(feature = "enterprise")]
1096 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1097 &self.extension_ranges
1098 }
1099
1100 #[cfg(feature = "enterprise")]
1102 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1103 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1104 }
1105}
1106
1107#[cfg(test)]
1108impl ScanInput {
1109 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1111 self.files.iter().map(|file| file.file_id()).collect()
1112 }
1113}
1114
1115pub struct StreamContext {
1118 pub input: ScanInput,
1120 pub(crate) ranges: Vec<RangeMeta>,
1122
1123 pub(crate) query_start: Instant,
1126}
1127
1128impl StreamContext {
1129 pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
1131 let query_start = input.query_start.unwrap_or_else(Instant::now);
1132 let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
1133 READ_SST_COUNT.observe(input.num_files() as f64);
1134
1135 Self {
1136 input,
1137 ranges,
1138 query_start,
1139 }
1140 }
1141
1142 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1144 let query_start = input.query_start.unwrap_or_else(Instant::now);
1145 let ranges = RangeMeta::unordered_scan_ranges(&input);
1146 READ_SST_COUNT.observe(input.num_files() as f64);
1147
1148 Self {
1149 input,
1150 ranges,
1151 query_start,
1152 }
1153 }
1154
1155 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1157 self.input.num_memtables() > index.index
1158 }
1159
1160 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1161 !self.is_mem_range_index(index)
1162 && index.index < self.input.num_files() + self.input.num_memtables()
1163 }
1164
1165 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1167 self.ranges
1168 .iter()
1169 .enumerate()
1170 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1171 .collect()
1172 }
1173
1174 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1176 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1177 for range_meta in &self.ranges {
1178 for idx in &range_meta.row_group_indices {
1179 if self.is_mem_range_index(*idx) {
1180 num_mem_ranges += 1;
1181 } else if self.is_file_range_index(*idx) {
1182 num_file_ranges += 1;
1183 } else {
1184 num_other_ranges += 1;
1185 }
1186 }
1187 }
1188 if verbose {
1189 write!(f, "{{")?;
1190 }
1191 write!(
1192 f,
1193 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1194 self.ranges.len(),
1195 num_mem_ranges,
1196 self.input.num_files(),
1197 num_file_ranges,
1198 )?;
1199 if num_other_ranges > 0 {
1200 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1201 }
1202 write!(f, "}}")?;
1203
1204 if let Some(selector) = &self.input.series_row_selector {
1205 write!(f, ", \"selector\":\"{}\"", selector)?;
1206 }
1207 if let Some(distribution) = &self.input.distribution {
1208 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1209 }
1210
1211 if verbose {
1212 self.format_verbose_content(f)?;
1213 }
1214
1215 Ok(())
1216 }
1217
1218 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1219 struct FileWrapper<'a> {
1220 file: &'a FileHandle,
1221 }
1222
1223 impl fmt::Debug for FileWrapper<'_> {
1224 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1225 let (start, end) = self.file.time_range();
1226 write!(
1227 f,
1228 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1229 self.file.file_id(),
1230 start.value(),
1231 start.unit(),
1232 end.value(),
1233 end.unit(),
1234 self.file.num_rows(),
1235 self.file.size(),
1236 self.file.index_size()
1237 )
1238 }
1239 }
1240
1241 struct InputWrapper<'a> {
1242 input: &'a ScanInput,
1243 }
1244
1245 #[cfg(feature = "enterprise")]
1246 impl InputWrapper<'_> {
1247 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1248 if self.input.extension_ranges.is_empty() {
1249 return Ok(());
1250 }
1251
1252 let mut delimiter = "";
1253 write!(f, ", extension_ranges: [")?;
1254 for range in self.input.extension_ranges() {
1255 write!(f, "{}{:?}", delimiter, range)?;
1256 delimiter = ", ";
1257 }
1258 write!(f, "]")?;
1259 Ok(())
1260 }
1261 }
1262
1263 impl fmt::Debug for InputWrapper<'_> {
1264 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1265 let output_schema = self.input.mapper.output_schema();
1266 if !output_schema.is_empty() {
1267 let names: Vec<_> = output_schema
1268 .column_schemas()
1269 .iter()
1270 .map(|col| &col.name)
1271 .collect();
1272 write!(f, ", \"projection\": {:?}", names)?;
1273 }
1274 if let Some(predicate) = &self.input.predicate.predicate() {
1275 if !predicate.exprs().is_empty() {
1276 let exprs: Vec<_> =
1277 predicate.exprs().iter().map(|e| e.to_string()).collect();
1278 write!(f, ", \"filters\": {:?}", exprs)?;
1279 }
1280 }
1281 if !self.input.files.is_empty() {
1282 write!(f, ", \"files\": ")?;
1283 f.debug_list()
1284 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1285 .finish()?;
1286 }
1287
1288 #[cfg(feature = "enterprise")]
1289 self.format_extension_ranges(f)?;
1290
1291 Ok(())
1292 }
1293 }
1294
1295 write!(f, "{:?}", InputWrapper { input: &self.input })
1296 }
1297}
1298
1299#[derive(Clone, Default)]
1302pub struct PredicateGroup {
1303 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1304
1305 predicate: Option<Predicate>,
1308}
1309
1310impl PredicateGroup {
1311 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1313 let mut time_filters = Vec::with_capacity(exprs.len());
1314 let mut columns = HashSet::new();
1316 for expr in exprs {
1317 columns.clear();
1318 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1319 continue;
1320 };
1321 time_filters.push(filter);
1322 }
1323 let time_filters = if time_filters.is_empty() {
1324 None
1325 } else {
1326 Some(Arc::new(time_filters))
1327 };
1328 let predicate = Predicate::new(exprs.to_vec());
1329
1330 Self {
1331 time_filters,
1332 predicate: Some(predicate),
1333 }
1334 }
1335
1336 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1338 self.time_filters.clone()
1339 }
1340
1341 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1343 self.predicate.as_ref()
1344 }
1345
1346 fn expr_to_filter(
1347 expr: &Expr,
1348 metadata: &RegionMetadata,
1349 columns: &mut HashSet<Column>,
1350 ) -> Option<SimpleFilterEvaluator> {
1351 columns.clear();
1352 expr_to_columns(expr, columns).ok()?;
1355 if columns.len() > 1 {
1356 return None;
1358 }
1359 let column = columns.iter().next()?;
1360 let column_meta = metadata.column_by_name(&column.name)?;
1361 if column_meta.semantic_type == SemanticType::Timestamp {
1362 SimpleFilterEvaluator::try_new(expr)
1363 } else {
1364 None
1365 }
1366 }
1367}