1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::filter::SimpleFilterEvaluator;
26use common_recordbatch::SendableRecordBatchStream;
27use common_telemetry::{debug, error, tracing, warn};
28use common_time::range::TimestampRange;
29use datafusion_common::Column;
30use datafusion_expr::utils::expr_to_columns;
31use datafusion_expr::Expr;
32use smallvec::SmallVec;
33use store_api::metadata::RegionMetadata;
34use store_api::region_engine::{PartitionRange, RegionScannerRef};
35use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
36use table::predicate::{build_time_range_predicate, Predicate};
37use tokio::sync::{mpsc, Semaphore};
38use tokio_stream::wrappers::ReceiverStream;
39
40use crate::access_layer::AccessLayerRef;
41use crate::cache::CacheStrategy;
42use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
43use crate::error::Result;
44use crate::memtable::MemtableRange;
45use crate::metrics::READ_SST_COUNT;
46use crate::read::compat::{self, CompatBatch};
47use crate::read::projection::ProjectionMapper;
48use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
49use crate::read::seq_scan::SeqScan;
50use crate::read::series_scan::SeriesScan;
51use crate::read::unordered_scan::UnorderedScan;
52use crate::read::{Batch, Source};
53use crate::region::options::MergeMode;
54use crate::region::version::VersionRef;
55use crate::sst::file::FileHandle;
56use crate::sst::index::bloom_filter::applier::{
57 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
58};
59use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
60use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
61use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
62use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
63use crate::sst::parquet::reader::ReaderMetrics;
64
65pub(crate) enum Scanner {
67 Seq(SeqScan),
69 Unordered(UnorderedScan),
71 Series(SeriesScan),
73}
74
75impl Scanner {
76 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
78 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
79 match self {
80 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
81 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
82 Scanner::Series(series_scan) => series_scan.build_stream().await,
83 }
84 }
85}
86
87#[cfg(test)]
88impl Scanner {
89 pub(crate) fn num_files(&self) -> usize {
91 match self {
92 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
93 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
94 Scanner::Series(series_scan) => series_scan.input().num_files(),
95 }
96 }
97
98 pub(crate) fn num_memtables(&self) -> usize {
100 match self {
101 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
102 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
103 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
104 }
105 }
106
107 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
109 match self {
110 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
111 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
112 Scanner::Series(series_scan) => series_scan.input().file_ids(),
113 }
114 }
115
116 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
118 use store_api::region_engine::{PrepareRequest, RegionScanner};
119
120 let request = PrepareRequest::default().with_target_partitions(target_partitions);
121 match self {
122 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
123 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
124 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
125 }
126 }
127}
128
129#[cfg_attr(doc, aquamarine::aquamarine)]
130pub(crate) struct ScanRegion {
180 version: VersionRef,
182 access_layer: AccessLayerRef,
184 request: ScanRequest,
186 cache_strategy: CacheStrategy,
188 parallel_scan_channel_size: usize,
190 ignore_inverted_index: bool,
192 ignore_fulltext_index: bool,
194 ignore_bloom_filter: bool,
196 start_time: Option<Instant>,
198}
199
200impl ScanRegion {
201 pub(crate) fn new(
203 version: VersionRef,
204 access_layer: AccessLayerRef,
205 request: ScanRequest,
206 cache_strategy: CacheStrategy,
207 ) -> ScanRegion {
208 ScanRegion {
209 version,
210 access_layer,
211 request,
212 cache_strategy,
213 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
214 ignore_inverted_index: false,
215 ignore_fulltext_index: false,
216 ignore_bloom_filter: false,
217 start_time: None,
218 }
219 }
220
221 #[must_use]
223 pub(crate) fn with_parallel_scan_channel_size(
224 mut self,
225 parallel_scan_channel_size: usize,
226 ) -> Self {
227 self.parallel_scan_channel_size = parallel_scan_channel_size;
228 self
229 }
230
231 #[must_use]
233 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
234 self.ignore_inverted_index = ignore;
235 self
236 }
237
238 #[must_use]
240 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
241 self.ignore_fulltext_index = ignore;
242 self
243 }
244
245 #[must_use]
247 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
248 self.ignore_bloom_filter = ignore;
249 self
250 }
251
252 #[must_use]
253 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
254 self.start_time = Some(now);
255 self
256 }
257
258 pub(crate) fn scanner(self) -> Result<Scanner> {
260 if self.use_series_scan() {
261 self.series_scan().map(Scanner::Series)
262 } else if self.use_unordered_scan() {
263 self.unordered_scan().map(Scanner::Unordered)
266 } else {
267 self.seq_scan().map(Scanner::Seq)
268 }
269 }
270
271 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273 pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
274 if self.use_series_scan() {
275 self.series_scan().map(|scanner| Box::new(scanner) as _)
276 } else if self.use_unordered_scan() {
277 self.unordered_scan().map(|scanner| Box::new(scanner) as _)
278 } else {
279 self.seq_scan().map(|scanner| Box::new(scanner) as _)
280 }
281 }
282
283 pub(crate) fn seq_scan(self) -> Result<SeqScan> {
285 let input = self.scan_input(true)?;
286 Ok(SeqScan::new(input, false))
287 }
288
289 pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
291 let input = self.scan_input(true)?;
292 Ok(UnorderedScan::new(input))
293 }
294
295 pub(crate) fn series_scan(self) -> Result<SeriesScan> {
297 let input = self.scan_input(true)?;
298 Ok(SeriesScan::new(input))
299 }
300
301 #[cfg(test)]
302 pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
303 let input = self.scan_input(false)?;
304 Ok(SeqScan::new(input, false))
305 }
306
307 fn use_unordered_scan(&self) -> bool {
309 self.version.options.append_mode
316 && self.request.series_row_selector.is_none()
317 && (self.request.distribution.is_none()
318 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
319 }
320
321 fn use_series_scan(&self) -> bool {
323 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
324 }
325
326 fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
328 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
329 let time_range = self.build_time_range_predicate();
330
331 let ssts = &self.version.ssts;
332 let mut files = Vec::new();
333 for level in ssts.levels() {
334 for file in level.files.values() {
335 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
336 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
337 (Some(_), None) => true,
343 (None, _) => true,
344 };
345
346 if exceed_min_sequence && file_in_range(file, &time_range) {
348 files.push(file.clone());
349 }
350 }
354 }
355
356 let memtables = self.version.memtables.list_memtables();
357 let memtables: Vec<_> = memtables
359 .into_iter()
360 .filter(|mem| {
361 let Some((start, end)) = mem.stats().time_range() else {
363 return false;
364 };
365 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
367 memtable_range.intersects(&time_range)
368 })
369 .collect();
370
371 debug!(
372 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
373 self.version.metadata.region_id,
374 self.request,
375 time_range,
376 memtables.len(),
377 files.len(),
378 self.version.options.append_mode,
379 );
380
381 self.maybe_remove_field_filters();
383
384 let inverted_index_applier = self.build_invereted_index_applier();
385 let bloom_filter_applier = self.build_bloom_filter_applier();
386 let fulltext_index_applier = self.build_fulltext_index_applier();
387 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
388 let mapper = match &self.request.projection {
390 Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
391 None => ProjectionMapper::all(&self.version.metadata)?,
392 };
393 let memtables = memtables
395 .into_iter()
396 .map(|mem| {
397 mem.ranges(
398 Some(mapper.column_ids()),
399 predicate.clone(),
400 self.request.sequence,
401 )
402 .map(MemRangeBuilder::new)
403 })
404 .collect::<Result<Vec<_>>>()?;
405
406 let input = ScanInput::new(self.access_layer, mapper)
407 .with_time_range(Some(time_range))
408 .with_predicate(predicate)
409 .with_memtables(memtables)
410 .with_files(files)
411 .with_cache(self.cache_strategy)
412 .with_inverted_index_applier(inverted_index_applier)
413 .with_bloom_filter_index_applier(bloom_filter_applier)
414 .with_fulltext_index_applier(fulltext_index_applier)
415 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
416 .with_start_time(self.start_time)
417 .with_append_mode(self.version.options.append_mode)
418 .with_filter_deleted(filter_deleted)
419 .with_merge_mode(self.version.options.merge_mode())
420 .with_series_row_selector(self.request.series_row_selector)
421 .with_distribution(self.request.distribution);
422 Ok(input)
423 }
424
425 fn build_time_range_predicate(&self) -> TimestampRange {
427 let time_index = self.version.metadata.time_index_column();
428 let unit = time_index
429 .column_schema
430 .data_type
431 .as_timestamp()
432 .expect("Time index must have timestamp-compatible type")
433 .unit();
434 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
435 }
436
437 fn maybe_remove_field_filters(&mut self) {
439 if self.version.options.merge_mode() != MergeMode::LastNonNull {
440 return;
441 }
442
443 let field_columns = self
445 .version
446 .metadata
447 .field_columns()
448 .map(|col| &col.column_schema.name)
449 .collect::<HashSet<_>>();
450 let mut columns = HashSet::new();
452
453 self.request.filters.retain(|expr| {
454 columns.clear();
455 if expr_to_columns(expr, &mut columns).is_err() {
457 return false;
458 }
459 for column in &columns {
460 if field_columns.contains(&column.name) {
461 return false;
463 }
464 }
465 true
466 });
467 }
468
469 fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
471 if self.ignore_inverted_index {
472 return None;
473 }
474
475 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
476 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
477
478 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
479
480 InvertedIndexApplierBuilder::new(
481 self.access_layer.region_dir().to_string(),
482 self.access_layer.object_store().clone(),
483 self.version.metadata.as_ref(),
484 self.version.metadata.inverted_indexed_column_ids(
485 self.version
486 .options
487 .index_options
488 .inverted_index
489 .ignore_column_ids
490 .iter(),
491 ),
492 self.access_layer.puffin_manager_factory().clone(),
493 )
494 .with_file_cache(file_cache)
495 .with_inverted_index_cache(inverted_index_cache)
496 .with_puffin_metadata_cache(puffin_metadata_cache)
497 .build(&self.request.filters)
498 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
499 .ok()
500 .flatten()
501 .map(Arc::new)
502 }
503
504 fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
506 if self.ignore_bloom_filter {
507 return None;
508 }
509
510 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
511 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
512 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
513
514 BloomFilterIndexApplierBuilder::new(
515 self.access_layer.region_dir().to_string(),
516 self.access_layer.object_store().clone(),
517 self.version.metadata.as_ref(),
518 self.access_layer.puffin_manager_factory().clone(),
519 )
520 .with_file_cache(file_cache)
521 .with_bloom_filter_index_cache(bloom_filter_index_cache)
522 .with_puffin_metadata_cache(puffin_metadata_cache)
523 .build(&self.request.filters)
524 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
525 .ok()
526 .flatten()
527 .map(Arc::new)
528 }
529
530 fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
532 if self.ignore_fulltext_index {
533 return None;
534 }
535
536 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
537 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
538 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
539 FulltextIndexApplierBuilder::new(
540 self.access_layer.region_dir().to_string(),
541 self.version.metadata.region_id,
542 self.access_layer.object_store().clone(),
543 self.access_layer.puffin_manager_factory().clone(),
544 self.version.metadata.as_ref(),
545 )
546 .with_file_cache(file_cache)
547 .with_puffin_metadata_cache(puffin_metadata_cache)
548 .with_bloom_filter_cache(bloom_filter_index_cache)
549 .build(&self.request.filters)
550 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
551 .ok()
552 .flatten()
553 .map(Arc::new)
554 }
555}
556
557fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
559 if predicate == &TimestampRange::min_to_max() {
560 return true;
561 }
562 let (start, end) = file.time_range();
564 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
565 file_ts_range.intersects(predicate)
566}
567
568pub(crate) struct ScanInput {
570 access_layer: AccessLayerRef,
572 pub(crate) mapper: Arc<ProjectionMapper>,
574 time_range: Option<TimestampRange>,
576 pub(crate) predicate: PredicateGroup,
578 pub(crate) memtables: Vec<MemRangeBuilder>,
580 pub(crate) files: Vec<FileHandle>,
582 pub(crate) cache_strategy: CacheStrategy,
584 ignore_file_not_found: bool,
586 pub(crate) parallel_scan_channel_size: usize,
588 inverted_index_applier: Option<InvertedIndexApplierRef>,
590 bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
591 fulltext_index_applier: Option<FulltextIndexApplierRef>,
592 pub(crate) query_start: Option<Instant>,
594 pub(crate) append_mode: bool,
596 pub(crate) filter_deleted: bool,
598 pub(crate) merge_mode: MergeMode,
600 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
602 pub(crate) distribution: Option<TimeSeriesDistribution>,
604}
605
606impl ScanInput {
607 #[must_use]
609 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
610 ScanInput {
611 access_layer,
612 mapper: Arc::new(mapper),
613 time_range: None,
614 predicate: PredicateGroup::default(),
615 memtables: Vec::new(),
616 files: Vec::new(),
617 cache_strategy: CacheStrategy::Disabled,
618 ignore_file_not_found: false,
619 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
620 inverted_index_applier: None,
621 bloom_filter_index_applier: None,
622 fulltext_index_applier: None,
623 query_start: None,
624 append_mode: false,
625 filter_deleted: true,
626 merge_mode: MergeMode::default(),
627 series_row_selector: None,
628 distribution: None,
629 }
630 }
631
632 #[must_use]
634 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
635 self.time_range = time_range;
636 self
637 }
638
639 #[must_use]
641 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
642 self.predicate = predicate;
643 self
644 }
645
646 #[must_use]
648 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
649 self.memtables = memtables;
650 self
651 }
652
653 #[must_use]
655 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
656 self.files = files;
657 self
658 }
659
660 #[must_use]
662 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
663 self.cache_strategy = cache;
664 self
665 }
666
667 #[must_use]
669 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
670 self.ignore_file_not_found = ignore;
671 self
672 }
673
674 #[must_use]
676 pub(crate) fn with_parallel_scan_channel_size(
677 mut self,
678 parallel_scan_channel_size: usize,
679 ) -> Self {
680 self.parallel_scan_channel_size = parallel_scan_channel_size;
681 self
682 }
683
684 #[must_use]
686 pub(crate) fn with_inverted_index_applier(
687 mut self,
688 applier: Option<InvertedIndexApplierRef>,
689 ) -> Self {
690 self.inverted_index_applier = applier;
691 self
692 }
693
694 #[must_use]
696 pub(crate) fn with_bloom_filter_index_applier(
697 mut self,
698 applier: Option<BloomFilterIndexApplierRef>,
699 ) -> Self {
700 self.bloom_filter_index_applier = applier;
701 self
702 }
703
704 #[must_use]
706 pub(crate) fn with_fulltext_index_applier(
707 mut self,
708 applier: Option<FulltextIndexApplierRef>,
709 ) -> Self {
710 self.fulltext_index_applier = applier;
711 self
712 }
713
714 #[must_use]
716 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
717 self.query_start = now;
718 self
719 }
720
721 #[must_use]
722 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
723 self.append_mode = is_append_mode;
724 self
725 }
726
727 #[must_use]
729 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
730 self.filter_deleted = filter_deleted;
731 self
732 }
733
734 #[must_use]
736 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
737 self.merge_mode = merge_mode;
738 self
739 }
740
741 #[must_use]
743 pub(crate) fn with_distribution(
744 mut self,
745 distribution: Option<TimeSeriesDistribution>,
746 ) -> Self {
747 self.distribution = distribution;
748 self
749 }
750
751 #[must_use]
753 pub(crate) fn with_series_row_selector(
754 mut self,
755 series_row_selector: Option<TimeSeriesRowSelector>,
756 ) -> Self {
757 self.series_row_selector = series_row_selector;
758 self
759 }
760
761 pub(crate) fn create_parallel_sources(
765 &self,
766 sources: Vec<Source>,
767 semaphore: Arc<Semaphore>,
768 ) -> Result<Vec<Source>> {
769 if sources.len() <= 1 {
770 return Ok(sources);
771 }
772
773 let sources = sources
775 .into_iter()
776 .map(|source| {
777 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
778 self.spawn_scan_task(source, semaphore.clone(), sender);
779 let stream = Box::pin(ReceiverStream::new(receiver));
780 Source::Stream(stream)
781 })
782 .collect();
783 Ok(sources)
784 }
785
786 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
788 let memtable = &self.memtables[index.index];
789 let mut ranges = SmallVec::new();
790 memtable.build_ranges(index.row_group_index, &mut ranges);
791 ranges
792 }
793
794 pub(crate) async fn prune_file(
796 &self,
797 file_index: usize,
798 reader_metrics: &mut ReaderMetrics,
799 ) -> Result<FileRangeBuilder> {
800 let file = &self.files[file_index];
801 let res = self
802 .access_layer
803 .read_sst(file.clone())
804 .predicate(self.predicate.predicate().cloned())
805 .projection(Some(self.mapper.column_ids().to_vec()))
806 .cache(self.cache_strategy.clone())
807 .inverted_index_applier(self.inverted_index_applier.clone())
808 .bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
809 .fulltext_index_applier(self.fulltext_index_applier.clone())
810 .expected_metadata(Some(self.mapper.metadata().clone()))
811 .build_reader_input(reader_metrics)
812 .await;
813 let (mut file_range_ctx, selection) = match res {
814 Ok(x) => x,
815 Err(e) => {
816 if e.is_object_not_found() && self.ignore_file_not_found {
817 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
818 return Ok(FileRangeBuilder::default());
819 } else {
820 return Err(e);
821 }
822 }
823 };
824 if !compat::has_same_columns_and_pk_encoding(
825 self.mapper.metadata(),
826 file_range_ctx.read_format().metadata(),
827 ) {
828 let compat = CompatBatch::new(
831 &self.mapper,
832 file_range_ctx.read_format().metadata().clone(),
833 )?;
834 file_range_ctx.set_compat_batch(Some(compat));
835 }
836 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
837 }
838
839 pub(crate) fn spawn_scan_task(
841 &self,
842 mut input: Source,
843 semaphore: Arc<Semaphore>,
844 sender: mpsc::Sender<Result<Batch>>,
845 ) {
846 common_runtime::spawn_global(async move {
847 loop {
848 let maybe_batch = {
851 let _permit = semaphore.acquire().await.unwrap();
853 input.next_batch().await
854 };
855 match maybe_batch {
856 Ok(Some(batch)) => {
857 let _ = sender.send(Ok(batch)).await;
858 }
859 Ok(None) => break,
860 Err(e) => {
861 let _ = sender.send(Err(e)).await;
862 break;
863 }
864 }
865 }
866 });
867 }
868
869 pub(crate) fn total_rows(&self) -> usize {
870 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
871 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
872 rows_in_files + rows_in_memtables
873 }
874
875 pub(crate) fn predicate(&self) -> Option<&Predicate> {
877 self.predicate.predicate()
878 }
879
880 pub(crate) fn num_memtables(&self) -> usize {
882 self.memtables.len()
883 }
884
885 pub(crate) fn num_files(&self) -> usize {
887 self.files.len()
888 }
889}
890
891#[cfg(test)]
892impl ScanInput {
893 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
895 self.files.iter().map(|file| file.file_id()).collect()
896 }
897}
898
899pub(crate) struct StreamContext {
902 pub(crate) input: ScanInput,
904 pub(crate) ranges: Vec<RangeMeta>,
906
907 pub(crate) query_start: Instant,
910}
911
912impl StreamContext {
913 pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
915 let query_start = input.query_start.unwrap_or_else(Instant::now);
916 let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
917 READ_SST_COUNT.observe(input.num_files() as f64);
918
919 Self {
920 input,
921 ranges,
922 query_start,
923 }
924 }
925
926 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
928 let query_start = input.query_start.unwrap_or_else(Instant::now);
929 let ranges = RangeMeta::unordered_scan_ranges(&input);
930 READ_SST_COUNT.observe(input.num_files() as f64);
931
932 Self {
933 input,
934 ranges,
935 query_start,
936 }
937 }
938
939 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
941 self.input.num_memtables() > index.index
942 }
943
944 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
946 self.ranges
947 .iter()
948 .enumerate()
949 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
950 .collect()
951 }
952
953 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
955 let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
956 for range_meta in &self.ranges {
957 for idx in &range_meta.row_group_indices {
958 if self.is_mem_range_index(*idx) {
959 num_mem_ranges += 1;
960 } else {
961 num_file_ranges += 1;
962 }
963 }
964 }
965 write!(
966 f,
967 "partition_count={} ({} memtable ranges, {} file {} ranges)",
968 self.ranges.len(),
969 num_mem_ranges,
970 self.input.num_files(),
971 num_file_ranges,
972 )?;
973 if let Some(selector) = &self.input.series_row_selector {
974 write!(f, ", selector={}", selector)?;
975 }
976 if let Some(distribution) = &self.input.distribution {
977 write!(f, ", distribution={}", distribution)?;
978 }
979
980 if verbose {
981 self.format_verbose_content(f)?;
982 }
983
984 Ok(())
985 }
986
987 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
988 struct FileWrapper<'a> {
989 file: &'a FileHandle,
990 }
991
992 impl fmt::Debug for FileWrapper<'_> {
993 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
994 write!(
995 f,
996 "[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
997 self.file.file_id(),
998 self.file.time_range().0.value(),
999 self.file.time_range().0.unit(),
1000 self.file.time_range().1.value(),
1001 self.file.time_range().1.unit(),
1002 self.file.num_rows(),
1003 self.file.size(),
1004 self.file.index_size()
1005 )
1006 }
1007 }
1008
1009 struct InputWrapper<'a> {
1010 input: &'a ScanInput,
1011 }
1012
1013 impl fmt::Debug for InputWrapper<'_> {
1014 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1015 let output_schema = self.input.mapper.output_schema();
1016 if !output_schema.is_empty() {
1017 write!(f, ", projection=")?;
1018 f.debug_list()
1019 .entries(output_schema.column_schemas().iter().map(|col| &col.name))
1020 .finish()?;
1021 }
1022 if let Some(predicate) = &self.input.predicate.predicate() {
1023 if !predicate.exprs().is_empty() {
1024 write!(f, ", filters=[")?;
1025 for (i, expr) in predicate.exprs().iter().enumerate() {
1026 if i == predicate.exprs().len() - 1 {
1027 write!(f, "{}]", expr)?;
1028 } else {
1029 write!(f, "{}, ", expr)?;
1030 }
1031 }
1032 }
1033 }
1034 if !self.input.files.is_empty() {
1035 write!(f, ", files=")?;
1036 f.debug_list()
1037 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1038 .finish()?;
1039 }
1040
1041 Ok(())
1042 }
1043 }
1044
1045 write!(f, "{:?}", InputWrapper { input: &self.input })
1046 }
1047}
1048
1049#[derive(Clone, Default)]
1052pub struct PredicateGroup {
1053 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1054
1055 predicate: Option<Predicate>,
1058}
1059
1060impl PredicateGroup {
1061 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
1063 let mut time_filters = Vec::with_capacity(exprs.len());
1064 let mut columns = HashSet::new();
1066 for expr in exprs {
1067 columns.clear();
1068 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1069 continue;
1070 };
1071 time_filters.push(filter);
1072 }
1073 let time_filters = if time_filters.is_empty() {
1074 None
1075 } else {
1076 Some(Arc::new(time_filters))
1077 };
1078 let predicate = Predicate::new(exprs.to_vec());
1079
1080 Self {
1081 time_filters,
1082 predicate: Some(predicate),
1083 }
1084 }
1085
1086 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1088 self.time_filters.clone()
1089 }
1090
1091 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1093 self.predicate.as_ref()
1094 }
1095
1096 fn expr_to_filter(
1097 expr: &Expr,
1098 metadata: &RegionMetadata,
1099 columns: &mut HashSet<Column>,
1100 ) -> Option<SimpleFilterEvaluator> {
1101 columns.clear();
1102 expr_to_columns(expr, columns).ok()?;
1105 if columns.len() > 1 {
1106 return None;
1108 }
1109 let column = columns.iter().next()?;
1110 let column_meta = metadata.column_by_name(&column.name)?;
1111 if column_meta.semantic_type == SemanticType::Timestamp {
1112 SimpleFilterEvaluator::try_new(expr)
1113 } else {
1114 None
1115 }
1116 }
1117}