1use std::collections::HashSet;
18use std::fmt;
19use std::sync::Arc;
20use std::time::Instant;
21
22use api::v1::SemanticType;
23use common_error::ext::BoxedError;
24use common_recordbatch::filter::SimpleFilterEvaluator;
25use common_recordbatch::SendableRecordBatchStream;
26use common_telemetry::{debug, error, tracing, warn};
27use common_time::range::TimestampRange;
28use datafusion_common::Column;
29use datafusion_expr::utils::expr_to_columns;
30use datafusion_expr::Expr;
31use smallvec::SmallVec;
32use store_api::metadata::RegionMetadata;
33use store_api::region_engine::{PartitionRange, RegionScannerRef};
34use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
35use table::predicate::{build_time_range_predicate, Predicate};
36use tokio::sync::{mpsc, Semaphore};
37use tokio_stream::wrappers::ReceiverStream;
38
39use crate::access_layer::AccessLayerRef;
40use crate::cache::CacheStrategy;
41use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
42use crate::error::Result;
43use crate::memtable::MemtableRange;
44use crate::metrics::READ_SST_COUNT;
45use crate::read::compat::{self, CompatBatch};
46use crate::read::projection::ProjectionMapper;
47use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
48use crate::read::seq_scan::SeqScan;
49use crate::read::unordered_scan::UnorderedScan;
50use crate::read::{Batch, Source};
51use crate::region::options::MergeMode;
52use crate::region::version::VersionRef;
53use crate::sst::file::FileHandle;
54use crate::sst::index::bloom_filter::applier::{
55 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
56};
57use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
58use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
59use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
60use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
61use crate::sst::parquet::reader::ReaderMetrics;
62
63pub(crate) enum Scanner {
65 Seq(SeqScan),
67 Unordered(UnorderedScan),
69}
70
71impl Scanner {
72 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
74 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
75 match self {
76 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
77 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
78 }
79 }
80}
81
82#[cfg(test)]
83impl Scanner {
84 pub(crate) fn num_files(&self) -> usize {
86 match self {
87 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
88 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
89 }
90 }
91
92 pub(crate) fn num_memtables(&self) -> usize {
94 match self {
95 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
96 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
97 }
98 }
99
100 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
102 match self {
103 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
104 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
105 }
106 }
107
108 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
110 use store_api::region_engine::{PrepareRequest, RegionScanner};
111
112 let request = PrepareRequest::default().with_target_partitions(target_partitions);
113 match self {
114 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
115 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
116 }
117 }
118}
119
120#[cfg_attr(doc, aquamarine::aquamarine)]
121pub(crate) struct ScanRegion {
171 version: VersionRef,
173 access_layer: AccessLayerRef,
175 request: ScanRequest,
177 cache_strategy: CacheStrategy,
179 parallel_scan_channel_size: usize,
181 ignore_inverted_index: bool,
183 ignore_fulltext_index: bool,
185 ignore_bloom_filter: bool,
187 start_time: Option<Instant>,
189}
190
191impl ScanRegion {
192 pub(crate) fn new(
194 version: VersionRef,
195 access_layer: AccessLayerRef,
196 request: ScanRequest,
197 cache_strategy: CacheStrategy,
198 ) -> ScanRegion {
199 ScanRegion {
200 version,
201 access_layer,
202 request,
203 cache_strategy,
204 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
205 ignore_inverted_index: false,
206 ignore_fulltext_index: false,
207 ignore_bloom_filter: false,
208 start_time: None,
209 }
210 }
211
212 #[must_use]
214 pub(crate) fn with_parallel_scan_channel_size(
215 mut self,
216 parallel_scan_channel_size: usize,
217 ) -> Self {
218 self.parallel_scan_channel_size = parallel_scan_channel_size;
219 self
220 }
221
222 #[must_use]
224 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
225 self.ignore_inverted_index = ignore;
226 self
227 }
228
229 #[must_use]
231 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
232 self.ignore_fulltext_index = ignore;
233 self
234 }
235
236 #[must_use]
238 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
239 self.ignore_bloom_filter = ignore;
240 self
241 }
242
243 #[must_use]
244 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
245 self.start_time = Some(now);
246 self
247 }
248
249 pub(crate) fn scanner(self) -> Result<Scanner> {
251 if self.use_unordered_scan() {
252 self.unordered_scan().map(Scanner::Unordered)
255 } else {
256 self.seq_scan().map(Scanner::Seq)
257 }
258 }
259
260 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
262 pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
263 if self.use_unordered_scan() {
264 self.unordered_scan().map(|scanner| Box::new(scanner) as _)
265 } else {
266 self.seq_scan().map(|scanner| Box::new(scanner) as _)
267 }
268 }
269
270 pub(crate) fn seq_scan(self) -> Result<SeqScan> {
272 let input = self.scan_input(true)?;
273 Ok(SeqScan::new(input, false))
274 }
275
276 pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
278 let input = self.scan_input(true)?;
279 Ok(UnorderedScan::new(input))
280 }
281
282 #[cfg(test)]
283 pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
284 let input = self.scan_input(false)?;
285 Ok(SeqScan::new(input, false))
286 }
287
288 fn use_unordered_scan(&self) -> bool {
290 self.version.options.append_mode
297 && self.request.series_row_selector.is_none()
298 && (self.request.distribution.is_none()
299 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
300 }
301
302 fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
304 let time_range = self.build_time_range_predicate();
305
306 let ssts = &self.version.ssts;
307 let mut files = Vec::new();
308 for level in ssts.levels() {
309 for file in level.files.values() {
310 if file_in_range(file, &time_range) {
312 files.push(file.clone());
313 }
314 }
318 }
319
320 let memtables = self.version.memtables.list_memtables();
321 let memtables: Vec<_> = memtables
323 .into_iter()
324 .filter(|mem| {
325 let Some((start, end)) = mem.stats().time_range() else {
327 return false;
328 };
329 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
331 memtable_range.intersects(&time_range)
332 })
333 .collect();
334
335 debug!(
336 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
337 self.version.metadata.region_id,
338 self.request,
339 time_range,
340 memtables.len(),
341 files.len(),
342 self.version.options.append_mode,
343 );
344
345 self.maybe_remove_field_filters();
347
348 let inverted_index_applier = self.build_invereted_index_applier();
349 let bloom_filter_applier = self.build_bloom_filter_applier();
350 let fulltext_index_applier = self.build_fulltext_index_applier();
351 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
352 let mapper = match &self.request.projection {
354 Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
355 None => ProjectionMapper::all(&self.version.metadata)?,
356 };
357 let memtables = memtables
359 .into_iter()
360 .map(|mem| {
361 let ranges = mem.ranges(
362 Some(mapper.column_ids()),
363 predicate.clone(),
364 self.request.sequence,
365 );
366 MemRangeBuilder::new(ranges)
367 })
368 .collect();
369
370 let input = ScanInput::new(self.access_layer, mapper)
371 .with_time_range(Some(time_range))
372 .with_predicate(predicate)
373 .with_memtables(memtables)
374 .with_files(files)
375 .with_cache(self.cache_strategy)
376 .with_inverted_index_applier(inverted_index_applier)
377 .with_bloom_filter_index_applier(bloom_filter_applier)
378 .with_fulltext_index_applier(fulltext_index_applier)
379 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
380 .with_start_time(self.start_time)
381 .with_append_mode(self.version.options.append_mode)
382 .with_filter_deleted(filter_deleted)
383 .with_merge_mode(self.version.options.merge_mode())
384 .with_series_row_selector(self.request.series_row_selector)
385 .with_distribution(self.request.distribution);
386 Ok(input)
387 }
388
389 fn build_time_range_predicate(&self) -> TimestampRange {
391 let time_index = self.version.metadata.time_index_column();
392 let unit = time_index
393 .column_schema
394 .data_type
395 .as_timestamp()
396 .expect("Time index must have timestamp-compatible type")
397 .unit();
398 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
399 }
400
401 fn maybe_remove_field_filters(&mut self) {
403 if self.version.options.merge_mode() != MergeMode::LastNonNull {
404 return;
405 }
406
407 let field_columns = self
409 .version
410 .metadata
411 .field_columns()
412 .map(|col| &col.column_schema.name)
413 .collect::<HashSet<_>>();
414 let mut columns = HashSet::new();
416
417 self.request.filters.retain(|expr| {
418 columns.clear();
419 if expr_to_columns(expr, &mut columns).is_err() {
421 return false;
422 }
423 for column in &columns {
424 if field_columns.contains(&column.name) {
425 return false;
427 }
428 }
429 true
430 });
431 }
432
433 fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
435 if self.ignore_inverted_index {
436 return None;
437 }
438
439 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
440 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
441
442 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
443
444 InvertedIndexApplierBuilder::new(
445 self.access_layer.region_dir().to_string(),
446 self.access_layer.object_store().clone(),
447 self.version.metadata.as_ref(),
448 self.version.metadata.inverted_indexed_column_ids(
449 self.version
450 .options
451 .index_options
452 .inverted_index
453 .ignore_column_ids
454 .iter(),
455 ),
456 self.access_layer.puffin_manager_factory().clone(),
457 )
458 .with_file_cache(file_cache)
459 .with_inverted_index_cache(inverted_index_cache)
460 .with_puffin_metadata_cache(puffin_metadata_cache)
461 .build(&self.request.filters)
462 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
463 .ok()
464 .flatten()
465 .map(Arc::new)
466 }
467
468 fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
470 if self.ignore_bloom_filter {
471 return None;
472 }
473
474 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
475 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
476 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
477
478 BloomFilterIndexApplierBuilder::new(
479 self.access_layer.region_dir().to_string(),
480 self.access_layer.object_store().clone(),
481 self.version.metadata.as_ref(),
482 self.access_layer.puffin_manager_factory().clone(),
483 )
484 .with_file_cache(file_cache)
485 .with_bloom_filter_index_cache(bloom_filter_index_cache)
486 .with_puffin_metadata_cache(puffin_metadata_cache)
487 .build(&self.request.filters)
488 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
489 .ok()
490 .flatten()
491 .map(Arc::new)
492 }
493
494 fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
496 if self.ignore_fulltext_index {
497 return None;
498 }
499
500 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
501 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
502 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
503 FulltextIndexApplierBuilder::new(
504 self.access_layer.region_dir().to_string(),
505 self.version.metadata.region_id,
506 self.access_layer.object_store().clone(),
507 self.access_layer.puffin_manager_factory().clone(),
508 self.version.metadata.as_ref(),
509 )
510 .with_file_cache(file_cache)
511 .with_puffin_metadata_cache(puffin_metadata_cache)
512 .with_bloom_filter_cache(bloom_filter_index_cache)
513 .build(&self.request.filters)
514 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
515 .ok()
516 .flatten()
517 .map(Arc::new)
518 }
519}
520
521fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
523 if predicate == &TimestampRange::min_to_max() {
524 return true;
525 }
526 let (start, end) = file.time_range();
528 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
529 file_ts_range.intersects(predicate)
530}
531
532pub(crate) struct ScanInput {
534 access_layer: AccessLayerRef,
536 pub(crate) mapper: Arc<ProjectionMapper>,
538 time_range: Option<TimestampRange>,
540 pub(crate) predicate: PredicateGroup,
542 pub(crate) memtables: Vec<MemRangeBuilder>,
544 pub(crate) files: Vec<FileHandle>,
546 pub(crate) cache_strategy: CacheStrategy,
548 ignore_file_not_found: bool,
550 pub(crate) parallel_scan_channel_size: usize,
552 inverted_index_applier: Option<InvertedIndexApplierRef>,
554 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
555 fulltext_index_applier: Option<FulltextIndexApplierRef>,
556 pub(crate) query_start: Option<Instant>,
558 pub(crate) append_mode: bool,
560 pub(crate) filter_deleted: bool,
562 pub(crate) merge_mode: MergeMode,
564 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
566 pub(crate) distribution: Option<TimeSeriesDistribution>,
568}
569
570impl ScanInput {
571 #[must_use]
573 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
574 ScanInput {
575 access_layer,
576 mapper: Arc::new(mapper),
577 time_range: None,
578 predicate: PredicateGroup::default(),
579 memtables: Vec::new(),
580 files: Vec::new(),
581 cache_strategy: CacheStrategy::Disabled,
582 ignore_file_not_found: false,
583 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
584 inverted_index_applier: None,
585 bloom_filter_index_applier: None,
586 fulltext_index_applier: None,
587 query_start: None,
588 append_mode: false,
589 filter_deleted: true,
590 merge_mode: MergeMode::default(),
591 series_row_selector: None,
592 distribution: None,
593 }
594 }
595
596 #[must_use]
598 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
599 self.time_range = time_range;
600 self
601 }
602
603 #[must_use]
605 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
606 self.predicate = predicate;
607 self
608 }
609
610 #[must_use]
612 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
613 self.memtables = memtables;
614 self
615 }
616
617 #[must_use]
619 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
620 self.files = files;
621 self
622 }
623
624 #[must_use]
626 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
627 self.cache_strategy = cache;
628 self
629 }
630
631 #[must_use]
633 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
634 self.ignore_file_not_found = ignore;
635 self
636 }
637
638 #[must_use]
640 pub(crate) fn with_parallel_scan_channel_size(
641 mut self,
642 parallel_scan_channel_size: usize,
643 ) -> Self {
644 self.parallel_scan_channel_size = parallel_scan_channel_size;
645 self
646 }
647
648 #[must_use]
650 pub(crate) fn with_inverted_index_applier(
651 mut self,
652 applier: Option<InvertedIndexApplierRef>,
653 ) -> Self {
654 self.inverted_index_applier = applier;
655 self
656 }
657
658 #[must_use]
660 pub(crate) fn with_bloom_filter_index_applier(
661 mut self,
662 applier: Option<BloomFilterIndexApplierRef>,
663 ) -> Self {
664 self.bloom_filter_index_applier = applier;
665 self
666 }
667
668 #[must_use]
670 pub(crate) fn with_fulltext_index_applier(
671 mut self,
672 applier: Option<FulltextIndexApplierRef>,
673 ) -> Self {
674 self.fulltext_index_applier = applier;
675 self
676 }
677
678 #[must_use]
680 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
681 self.query_start = now;
682 self
683 }
684
685 #[must_use]
686 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
687 self.append_mode = is_append_mode;
688 self
689 }
690
691 #[must_use]
693 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
694 self.filter_deleted = filter_deleted;
695 self
696 }
697
698 #[must_use]
700 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
701 self.merge_mode = merge_mode;
702 self
703 }
704
705 #[must_use]
707 pub(crate) fn with_distribution(
708 mut self,
709 distribution: Option<TimeSeriesDistribution>,
710 ) -> Self {
711 self.distribution = distribution;
712 self
713 }
714
715 #[must_use]
717 pub(crate) fn with_series_row_selector(
718 mut self,
719 series_row_selector: Option<TimeSeriesRowSelector>,
720 ) -> Self {
721 self.series_row_selector = series_row_selector;
722 self
723 }
724
725 pub(crate) fn create_parallel_sources(
729 &self,
730 sources: Vec<Source>,
731 semaphore: Arc<Semaphore>,
732 ) -> Result<Vec<Source>> {
733 if sources.len() <= 1 {
734 return Ok(sources);
735 }
736
737 let sources = sources
739 .into_iter()
740 .map(|source| {
741 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
742 self.spawn_scan_task(source, semaphore.clone(), sender);
743 let stream = Box::pin(ReceiverStream::new(receiver));
744 Source::Stream(stream)
745 })
746 .collect();
747 Ok(sources)
748 }
749
750 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
752 let memtable = &self.memtables[index.index];
753 let mut ranges = SmallVec::new();
754 memtable.build_ranges(index.row_group_index, &mut ranges);
755 ranges
756 }
757
758 pub(crate) async fn prune_file(
760 &self,
761 file_index: usize,
762 reader_metrics: &mut ReaderMetrics,
763 ) -> Result<FileRangeBuilder> {
764 let file = &self.files[file_index];
765 let res = self
766 .access_layer
767 .read_sst(file.clone())
768 .predicate(self.predicate.predicate().cloned())
769 .projection(Some(self.mapper.column_ids().to_vec()))
770 .cache(self.cache_strategy.clone())
771 .inverted_index_applier(self.inverted_index_applier.clone())
772 .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
773 .fulltext_index_applier(self.fulltext_index_applier.clone())
774 .expected_metadata(Some(self.mapper.metadata().clone()))
775 .build_reader_input(reader_metrics)
776 .await;
777 let (mut file_range_ctx, row_groups) = match res {
778 Ok(x) => x,
779 Err(e) => {
780 if e.is_object_not_found() && self.ignore_file_not_found {
781 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
782 return Ok(FileRangeBuilder::default());
783 } else {
784 return Err(e);
785 }
786 }
787 };
788 if !compat::has_same_columns_and_pk_encoding(
789 self.mapper.metadata(),
790 file_range_ctx.read_format().metadata(),
791 ) {
792 let compat = CompatBatch::new(
795 &self.mapper,
796 file_range_ctx.read_format().metadata().clone(),
797 )?;
798 file_range_ctx.set_compat_batch(Some(compat));
799 }
800 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
801 }
802
803 pub(crate) fn spawn_scan_task(
805 &self,
806 mut input: Source,
807 semaphore: Arc<Semaphore>,
808 sender: mpsc::Sender<Result<Batch>>,
809 ) {
810 common_runtime::spawn_global(async move {
811 loop {
812 let maybe_batch = {
815 let _permit = semaphore.acquire().await.unwrap();
817 input.next_batch().await
818 };
819 match maybe_batch {
820 Ok(Some(batch)) => {
821 let _ = sender.send(Ok(batch)).await;
822 }
823 Ok(None) => break,
824 Err(e) => {
825 let _ = sender.send(Err(e)).await;
826 break;
827 }
828 }
829 }
830 });
831 }
832
833 pub(crate) fn total_rows(&self) -> usize {
834 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
835 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
836 rows_in_files + rows_in_memtables
837 }
838
839 pub(crate) fn predicate(&self) -> Option<&Predicate> {
841 self.predicate.predicate()
842 }
843
844 pub(crate) fn num_memtables(&self) -> usize {
846 self.memtables.len()
847 }
848
849 pub(crate) fn num_files(&self) -> usize {
851 self.files.len()
852 }
853}
854
855#[cfg(test)]
856impl ScanInput {
857 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
859 self.files.iter().map(|file| file.file_id()).collect()
860 }
861}
862
863pub(crate) struct StreamContext {
866 pub(crate) input: ScanInput,
868 pub(crate) ranges: Vec<RangeMeta>,
870
871 pub(crate) query_start: Instant,
874}
875
876impl StreamContext {
877 pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
879 let query_start = input.query_start.unwrap_or_else(Instant::now);
880 let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
881 READ_SST_COUNT.observe(input.num_files() as f64);
882
883 Self {
884 input,
885 ranges,
886 query_start,
887 }
888 }
889
890 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
892 let query_start = input.query_start.unwrap_or_else(Instant::now);
893 let ranges = RangeMeta::unordered_scan_ranges(&input);
894 READ_SST_COUNT.observe(input.num_files() as f64);
895
896 Self {
897 input,
898 ranges,
899 query_start,
900 }
901 }
902
903 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
905 self.input.num_memtables() > index.index
906 }
907
908 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
910 self.ranges
911 .iter()
912 .enumerate()
913 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
914 .collect()
915 }
916
917 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
919 let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
920 for range_meta in &self.ranges {
921 for idx in &range_meta.row_group_indices {
922 if self.is_mem_range_index(*idx) {
923 num_mem_ranges += 1;
924 } else {
925 num_file_ranges += 1;
926 }
927 }
928 }
929 write!(
930 f,
931 "partition_count={} ({} memtable ranges, {} file {} ranges)",
932 self.ranges.len(),
933 num_mem_ranges,
934 self.input.num_files(),
935 num_file_ranges,
936 )?;
937 if let Some(selector) = &self.input.series_row_selector {
938 write!(f, ", selector={}", selector)?;
939 }
940 if let Some(distribution) = &self.input.distribution {
941 write!(f, ", distribution={}", distribution)?;
942 }
943
944 if verbose {
945 self.format_verbose_content(f)?;
946 }
947
948 Ok(())
949 }
950
951 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
952 struct FileWrapper<'a> {
953 file: &'a FileHandle,
954 }
955
956 impl fmt::Debug for FileWrapper<'_> {
957 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
958 write!(
959 f,
960 "[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
961 self.file.file_id(),
962 self.file.time_range().0.value(),
963 self.file.time_range().0.unit(),
964 self.file.time_range().1.value(),
965 self.file.time_range().1.unit(),
966 self.file.num_rows(),
967 self.file.size(),
968 self.file.index_size()
969 )
970 }
971 }
972
973 struct InputWrapper<'a> {
974 input: &'a ScanInput,
975 }
976
977 impl fmt::Debug for InputWrapper<'_> {
978 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
979 let output_schema = self.input.mapper.output_schema();
980 if !output_schema.is_empty() {
981 write!(f, ", projection=")?;
982 f.debug_list()
983 .entries(output_schema.column_schemas().iter().map(|col| &col.name))
984 .finish()?;
985 }
986 if let Some(predicate) = &self.input.predicate.predicate() {
987 if !predicate.exprs().is_empty() {
988 write!(f, ", filters=[")?;
989 for (i, expr) in predicate.exprs().iter().enumerate() {
990 if i == predicate.exprs().len() - 1 {
991 write!(f, "{}]", expr)?;
992 } else {
993 write!(f, "{}, ", expr)?;
994 }
995 }
996 }
997 }
998 if !self.input.files.is_empty() {
999 write!(f, ", files=")?;
1000 f.debug_list()
1001 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1002 .finish()?;
1003 }
1004
1005 Ok(())
1006 }
1007 }
1008
1009 write!(f, "{:?}", InputWrapper { input: &self.input })
1010 }
1011}
1012
1013#[derive(Clone, Default)]
1016pub struct PredicateGroup {
1017 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1018
1019 predicate: Option<Predicate>,
1022}
1023
1024impl PredicateGroup {
1025 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1027 let mut time_filters = Vec::with_capacity(exprs.len());
1028 let mut columns = HashSet::new();
1030 for expr in exprs {
1031 columns.clear();
1032 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1033 continue;
1034 };
1035 time_filters.push(filter);
1036 }
1037 let time_filters = if time_filters.is_empty() {
1038 None
1039 } else {
1040 Some(Arc::new(time_filters))
1041 };
1042 let predicate = Predicate::new(exprs.to_vec());
1043
1044 Self {
1045 time_filters,
1046 predicate: Some(predicate),
1047 }
1048 }
1049
1050 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1052 self.time_filters.clone()
1053 }
1054
1055 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1057 self.predicate.as_ref()
1058 }
1059
1060 fn expr_to_filter(
1061 expr: &Expr,
1062 metadata: &RegionMetadata,
1063 columns: &mut HashSet<Column>,
1064 ) -> Option<SimpleFilterEvaluator> {
1065 columns.clear();
1066 expr_to_columns(expr, columns).ok()?;
1069 if columns.len() > 1 {
1070 return None;
1072 }
1073 let column = columns.iter().next()?;
1074 let column_meta = metadata.column_by_name(&column.name)?;
1075 if column_meta.semantic_type == SemanticType::Timestamp {
1076 SimpleFilterEvaluator::try_new(expr)
1077 } else {
1078 None
1079 }
1080 }
1081}