1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::{OptionExt as _, ResultExt};
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40 ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83pub(crate) enum Scanner {
85 Seq(SeqScan),
87 Unordered(UnorderedScan),
89 Series(SeriesScan),
91}
92
93impl Scanner {
94 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97 match self {
98 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100 Scanner::Series(series_scan) => series_scan.build_stream().await,
101 }
102 }
103
104 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106 match self {
107 Scanner::Seq(x) => x.scan_all_partitions(),
108 Scanner::Unordered(x) => x.scan_all_partitions(),
109 Scanner::Series(x) => x.scan_all_partitions(),
110 }
111 }
112}
113
114#[cfg(test)]
115impl Scanner {
116 pub(crate) fn num_files(&self) -> usize {
118 match self {
119 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121 Scanner::Series(series_scan) => series_scan.input().num_files(),
122 }
123 }
124
125 pub(crate) fn num_memtables(&self) -> usize {
127 match self {
128 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131 }
132 }
133
134 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136 match self {
137 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139 Scanner::Series(series_scan) => series_scan.input().file_ids(),
140 }
141 }
142
143 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144 match self {
145 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147 Scanner::Series(series_scan) => series_scan.input().index_ids(),
148 }
149 }
150
151 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153 use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155 let request = PrepareRequest::default().with_target_partitions(target_partitions);
156 match self {
157 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160 }
161 }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165pub(crate) struct ScanRegion {
215 version: VersionRef,
217 access_layer: AccessLayerRef,
219 request: ScanRequest,
221 cache_strategy: CacheStrategy,
223 parallel_scan_channel_size: usize,
225 max_concurrent_scan_files: usize,
227 ignore_inverted_index: bool,
229 ignore_fulltext_index: bool,
231 ignore_bloom_filter: bool,
233 start_time: Option<Instant>,
235 filter_deleted: bool,
238 #[cfg(feature = "enterprise")]
239 extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243 pub(crate) fn new(
245 version: VersionRef,
246 access_layer: AccessLayerRef,
247 request: ScanRequest,
248 cache_strategy: CacheStrategy,
249 ) -> ScanRegion {
250 ScanRegion {
251 version,
252 access_layer,
253 request,
254 cache_strategy,
255 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257 ignore_inverted_index: false,
258 ignore_fulltext_index: false,
259 ignore_bloom_filter: false,
260 start_time: None,
261 filter_deleted: true,
262 #[cfg(feature = "enterprise")]
263 extension_range_provider: None,
264 }
265 }
266
267 #[must_use]
269 pub(crate) fn with_parallel_scan_channel_size(
270 mut self,
271 parallel_scan_channel_size: usize,
272 ) -> Self {
273 self.parallel_scan_channel_size = parallel_scan_channel_size;
274 self
275 }
276
277 #[must_use]
279 pub(crate) fn with_max_concurrent_scan_files(
280 mut self,
281 max_concurrent_scan_files: usize,
282 ) -> Self {
283 self.max_concurrent_scan_files = max_concurrent_scan_files;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290 self.ignore_inverted_index = ignore;
291 self
292 }
293
294 #[must_use]
296 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297 self.ignore_fulltext_index = ignore;
298 self
299 }
300
301 #[must_use]
303 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304 self.ignore_bloom_filter = ignore;
305 self
306 }
307
308 #[must_use]
309 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310 self.start_time = Some(now);
311 self
312 }
313
314 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315 self.filter_deleted = filter_deleted;
316 }
317
318 #[cfg(feature = "enterprise")]
319 pub(crate) fn set_extension_range_provider(
320 &mut self,
321 extension_range_provider: BoxedExtensionRangeProvider,
322 ) {
323 self.extension_range_provider = Some(extension_range_provider);
324 }
325
326 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328 pub(crate) async fn scanner(self) -> Result<Scanner> {
329 if self.use_series_scan() {
330 self.series_scan().await.map(Scanner::Series)
331 } else if self.use_unordered_scan() {
332 self.unordered_scan().await.map(Scanner::Unordered)
335 } else {
336 self.seq_scan().await.map(Scanner::Seq)
337 }
338 }
339
340 #[tracing::instrument(
342 level = tracing::Level::DEBUG,
343 skip_all,
344 fields(region_id = %self.region_id())
345 )]
346 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347 if self.use_series_scan() {
348 self.series_scan()
349 .await
350 .map(|scanner| Box::new(scanner) as _)
351 } else if self.use_unordered_scan() {
352 self.unordered_scan()
353 .await
354 .map(|scanner| Box::new(scanner) as _)
355 } else {
356 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357 }
358 }
359
360 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363 let input = self.scan_input().await?.with_compaction(false);
364 Ok(SeqScan::new(input))
365 }
366
367 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370 let input = self.scan_input().await?;
371 Ok(UnorderedScan::new(input))
372 }
373
374 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377 let input = self.scan_input().await?;
378 Ok(SeriesScan::new(input))
379 }
380
381 fn use_unordered_scan(&self) -> bool {
383 self.version.options.append_mode
390 && self.request.series_row_selector.is_none()
391 && (self.request.distribution.is_none()
392 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393 }
394
395 fn use_series_scan(&self) -> bool {
397 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398 }
399
400 fn use_flat_format(&self) -> bool {
402 self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
403 }
404
405 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407 async fn scan_input(mut self) -> Result<ScanInput> {
408 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409 let time_range = self.build_time_range_predicate();
410 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411 let flat_format = self.use_flat_format();
412
413 let read_column_ids = match &self.request.projection {
414 Some(p) => self.build_read_column_ids(p, &predicate)?,
415 None => self
416 .version
417 .metadata
418 .column_metadatas
419 .iter()
420 .map(|col| col.column_id)
421 .collect(),
422 };
423
424 let mapper = match &self.request.projection {
426 Some(p) => ProjectionMapper::new_with_read_columns(
427 &self.version.metadata,
428 p.iter().copied(),
429 flat_format,
430 read_column_ids.clone(),
431 )?,
432 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
433 };
434
435 let ssts = &self.version.ssts;
436 let mut files = Vec::new();
437 for level in ssts.levels() {
438 for file in level.files.values() {
439 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
440 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
441 (Some(_), None) => true,
447 (None, _) => true,
448 };
449
450 if exceed_min_sequence && file_in_range(file, &time_range) {
452 files.push(file.clone());
453 }
454 }
458 }
459
460 let memtables = self.version.memtables.list_memtables();
461 let mut mem_range_builders = Vec::new();
463 let filter_mode = pre_filter_mode(
464 self.version.options.append_mode,
465 self.version.options.merge_mode(),
466 );
467
468 for m in memtables {
469 let Some((start, end)) = m.stats().time_range() else {
471 continue;
472 };
473 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
475 if !memtable_range.intersects(&time_range) {
476 continue;
477 }
478 let ranges_in_memtable = m.ranges(
479 Some(read_column_ids.as_slice()),
480 RangesOptions::default()
481 .with_predicate(predicate.clone())
482 .with_sequence(SequenceRange::new(
483 self.request.memtable_min_sequence,
484 self.request.memtable_max_sequence,
485 ))
486 .with_pre_filter_mode(filter_mode),
487 )?;
488 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
489 let stats = v.stats().clone();
490 MemRangeBuilder::new(v, stats)
491 }));
492 }
493
494 let region_id = self.region_id();
495 debug!(
496 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
497 region_id,
498 self.request,
499 time_range,
500 mem_range_builders.len(),
501 files.len(),
502 self.version.options.append_mode,
503 flat_format,
504 );
505
506 let (non_field_filters, field_filters) = self.partition_by_field_filters();
507 let inverted_index_appliers = [
508 self.build_invereted_index_applier(&non_field_filters),
509 self.build_invereted_index_applier(&field_filters),
510 ];
511 let bloom_filter_appliers = [
512 self.build_bloom_filter_applier(&non_field_filters),
513 self.build_bloom_filter_applier(&field_filters),
514 ];
515 let fulltext_index_appliers = [
516 self.build_fulltext_index_applier(&non_field_filters),
517 self.build_fulltext_index_applier(&field_filters),
518 ];
519 #[cfg(feature = "vector_index")]
520 let vector_index_applier = self.build_vector_index_applier();
521 #[cfg(feature = "vector_index")]
522 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
523 if self.request.filters.is_empty() {
524 search.k
525 } else {
526 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
527 }
528 });
529
530 if flat_format {
531 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
533 }
534
535 let input = ScanInput::new(self.access_layer, mapper)
536 .with_time_range(Some(time_range))
537 .with_predicate(predicate)
538 .with_memtables(mem_range_builders)
539 .with_files(files)
540 .with_cache(self.cache_strategy)
541 .with_inverted_index_appliers(inverted_index_appliers)
542 .with_bloom_filter_index_appliers(bloom_filter_appliers)
543 .with_fulltext_index_appliers(fulltext_index_appliers)
544 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
545 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
546 .with_start_time(self.start_time)
547 .with_append_mode(self.version.options.append_mode)
548 .with_filter_deleted(self.filter_deleted)
549 .with_merge_mode(self.version.options.merge_mode())
550 .with_series_row_selector(self.request.series_row_selector)
551 .with_distribution(self.request.distribution)
552 .with_flat_format(flat_format);
553 #[cfg(feature = "vector_index")]
554 let input = input
555 .with_vector_index_applier(vector_index_applier)
556 .with_vector_index_k(vector_index_k);
557
558 #[cfg(feature = "enterprise")]
559 let input = if let Some(provider) = self.extension_range_provider {
560 let ranges = provider
561 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
562 .await?;
563 debug!("Find extension ranges: {ranges:?}");
564 input.with_extension_ranges(ranges)
565 } else {
566 input
567 };
568 Ok(input)
569 }
570
571 fn region_id(&self) -> RegionId {
572 self.version.metadata.region_id
573 }
574
575 fn build_time_range_predicate(&self) -> TimestampRange {
577 let time_index = self.version.metadata.time_index_column();
578 let unit = time_index
579 .column_schema
580 .data_type
581 .as_timestamp()
582 .expect("Time index must have timestamp-compatible type")
583 .unit();
584 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
585 }
586
587 fn build_read_column_ids(
589 &self,
590 projection: &[usize],
591 predicate: &PredicateGroup,
592 ) -> Result<Vec<ColumnId>> {
593 let metadata = &self.version.metadata;
594 let mut read_column_ids = Vec::new();
596 let mut seen = HashSet::new();
597
598 for idx in projection {
599 let column =
600 metadata
601 .column_metadatas
602 .get(*idx)
603 .with_context(|| InvalidRequestSnafu {
604 region_id: metadata.region_id,
605 reason: format!("projection index {} is out of bound", idx),
606 })?;
607 seen.insert(column.column_id);
608 read_column_ids.push(column.column_id);
610 }
611
612 if projection.is_empty() {
613 let time_index = metadata.time_index_column().column_id;
614 if seen.insert(time_index) {
615 read_column_ids.push(time_index);
616 }
617 }
618
619 let mut extra_names = HashSet::new();
620 let mut columns = HashSet::new();
621
622 for expr in &self.request.filters {
623 columns.clear();
624 if expr_to_columns(expr, &mut columns).is_err() {
625 continue;
626 }
627 extra_names.extend(columns.iter().map(|column| column.name.clone()));
628 }
629
630 if let Some(expr) = predicate.region_partition_expr() {
631 expr.collect_column_names(&mut extra_names);
632 }
633
634 if !extra_names.is_empty() {
635 for column in &metadata.column_metadatas {
636 if extra_names.contains(column.column_schema.name.as_str())
637 && !seen.contains(&column.column_id)
638 {
639 read_column_ids.push(column.column_id);
640 }
641 extra_names.remove(column.column_schema.name.as_str());
642 }
643 if !extra_names.is_empty() {
644 warn!(
645 "Some columns in filters are not found in region {}: {:?}",
646 metadata.region_id, extra_names
647 );
648 }
649 }
650 Ok(read_column_ids)
651 }
652
653 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
656 let field_columns = self
657 .version
658 .metadata
659 .field_columns()
660 .map(|col| &col.column_schema.name)
661 .collect::<HashSet<_>>();
662
663 let mut columns = HashSet::new();
664
665 self.request.filters.iter().cloned().partition(|expr| {
666 columns.clear();
667 if expr_to_columns(expr, &mut columns).is_err() {
669 return true;
671 }
672 !columns
674 .iter()
675 .any(|column| field_columns.contains(&column.name))
676 })
677 }
678
679 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
681 if self.ignore_inverted_index {
682 return None;
683 }
684
685 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
686 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
687
688 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
689
690 InvertedIndexApplierBuilder::new(
691 self.access_layer.table_dir().to_string(),
692 self.access_layer.path_type(),
693 self.access_layer.object_store().clone(),
694 self.version.metadata.as_ref(),
695 self.version.metadata.inverted_indexed_column_ids(
696 self.version
697 .options
698 .index_options
699 .inverted_index
700 .ignore_column_ids
701 .iter(),
702 ),
703 self.access_layer.puffin_manager_factory().clone(),
704 )
705 .with_file_cache(file_cache)
706 .with_inverted_index_cache(inverted_index_cache)
707 .with_puffin_metadata_cache(puffin_metadata_cache)
708 .build(filters)
709 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
710 .ok()
711 .flatten()
712 .map(Arc::new)
713 }
714
715 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
717 if self.ignore_bloom_filter {
718 return None;
719 }
720
721 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
722 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
723 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
724
725 BloomFilterIndexApplierBuilder::new(
726 self.access_layer.table_dir().to_string(),
727 self.access_layer.path_type(),
728 self.access_layer.object_store().clone(),
729 self.version.metadata.as_ref(),
730 self.access_layer.puffin_manager_factory().clone(),
731 )
732 .with_file_cache(file_cache)
733 .with_bloom_filter_index_cache(bloom_filter_index_cache)
734 .with_puffin_metadata_cache(puffin_metadata_cache)
735 .build(filters)
736 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
737 .ok()
738 .flatten()
739 .map(Arc::new)
740 }
741
742 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
744 if self.ignore_fulltext_index {
745 return None;
746 }
747
748 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
749 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
750 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
751 FulltextIndexApplierBuilder::new(
752 self.access_layer.table_dir().to_string(),
753 self.access_layer.path_type(),
754 self.access_layer.object_store().clone(),
755 self.access_layer.puffin_manager_factory().clone(),
756 self.version.metadata.as_ref(),
757 )
758 .with_file_cache(file_cache)
759 .with_puffin_metadata_cache(puffin_metadata_cache)
760 .with_bloom_filter_cache(bloom_filter_index_cache)
761 .build(filters)
762 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
763 .ok()
764 .flatten()
765 .map(Arc::new)
766 }
767
768 #[cfg(feature = "vector_index")]
770 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
771 let vector_search = self.request.vector_search.as_ref()?;
772
773 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
774 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
775 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
776
777 let applier = VectorIndexApplier::new(
778 self.access_layer.table_dir().to_string(),
779 self.access_layer.path_type(),
780 self.access_layer.object_store().clone(),
781 self.access_layer.puffin_manager_factory().clone(),
782 vector_search.column_id,
783 vector_search.query_vector.clone(),
784 vector_search.metric,
785 )
786 .with_file_cache(file_cache)
787 .with_puffin_metadata_cache(puffin_metadata_cache)
788 .with_vector_index_cache(vector_index_cache);
789
790 Some(Arc::new(applier))
791 }
792}
793
794fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
796 if predicate == &TimestampRange::min_to_max() {
797 return true;
798 }
799 let (start, end) = file.time_range();
801 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
802 file_ts_range.intersects(predicate)
803}
804
805pub struct ScanInput {
807 access_layer: AccessLayerRef,
809 pub(crate) mapper: Arc<ProjectionMapper>,
811 pub(crate) read_column_ids: Vec<ColumnId>,
815 time_range: Option<TimestampRange>,
817 pub(crate) predicate: PredicateGroup,
819 region_partition_expr: Option<PartitionExpr>,
821 pub(crate) memtables: Vec<MemRangeBuilder>,
823 pub(crate) files: Vec<FileHandle>,
825 pub(crate) cache_strategy: CacheStrategy,
827 ignore_file_not_found: bool,
829 pub(crate) parallel_scan_channel_size: usize,
831 pub(crate) max_concurrent_scan_files: usize,
833 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
835 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
836 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
837 #[cfg(feature = "vector_index")]
839 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
840 #[cfg(feature = "vector_index")]
842 pub(crate) vector_index_k: Option<usize>,
843 pub(crate) query_start: Option<Instant>,
845 pub(crate) append_mode: bool,
847 pub(crate) filter_deleted: bool,
849 pub(crate) merge_mode: MergeMode,
851 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
853 pub(crate) distribution: Option<TimeSeriesDistribution>,
855 pub(crate) flat_format: bool,
857 pub(crate) compaction: bool,
859 #[cfg(feature = "enterprise")]
860 extension_ranges: Vec<BoxedExtensionRange>,
861}
862
863impl ScanInput {
864 #[must_use]
866 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
867 ScanInput {
868 access_layer,
869 read_column_ids: mapper.column_ids().to_vec(),
870 mapper: Arc::new(mapper),
871 time_range: None,
872 predicate: PredicateGroup::default(),
873 region_partition_expr: None,
874 memtables: Vec::new(),
875 files: Vec::new(),
876 cache_strategy: CacheStrategy::Disabled,
877 ignore_file_not_found: false,
878 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
879 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
880 inverted_index_appliers: [None, None],
881 bloom_filter_index_appliers: [None, None],
882 fulltext_index_appliers: [None, None],
883 #[cfg(feature = "vector_index")]
884 vector_index_applier: None,
885 #[cfg(feature = "vector_index")]
886 vector_index_k: None,
887 query_start: None,
888 append_mode: false,
889 filter_deleted: true,
890 merge_mode: MergeMode::default(),
891 series_row_selector: None,
892 distribution: None,
893 flat_format: false,
894 compaction: false,
895 #[cfg(feature = "enterprise")]
896 extension_ranges: Vec::new(),
897 }
898 }
899
900 #[must_use]
902 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
903 self.time_range = time_range;
904 self
905 }
906
907 #[must_use]
909 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
910 self.region_partition_expr = predicate.region_partition_expr().cloned();
911 self.predicate = predicate;
912 self
913 }
914
915 #[must_use]
917 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
918 self.memtables = memtables;
919 self
920 }
921
922 #[must_use]
924 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
925 self.files = files;
926 self
927 }
928
929 #[must_use]
931 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
932 self.cache_strategy = cache;
933 self
934 }
935
936 #[must_use]
938 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
939 self.ignore_file_not_found = ignore;
940 self
941 }
942
943 #[must_use]
945 pub(crate) fn with_parallel_scan_channel_size(
946 mut self,
947 parallel_scan_channel_size: usize,
948 ) -> Self {
949 self.parallel_scan_channel_size = parallel_scan_channel_size;
950 self
951 }
952
953 #[must_use]
955 pub(crate) fn with_max_concurrent_scan_files(
956 mut self,
957 max_concurrent_scan_files: usize,
958 ) -> Self {
959 self.max_concurrent_scan_files = max_concurrent_scan_files;
960 self
961 }
962
963 #[must_use]
965 pub(crate) fn with_inverted_index_appliers(
966 mut self,
967 appliers: [Option<InvertedIndexApplierRef>; 2],
968 ) -> Self {
969 self.inverted_index_appliers = appliers;
970 self
971 }
972
973 #[must_use]
975 pub(crate) fn with_bloom_filter_index_appliers(
976 mut self,
977 appliers: [Option<BloomFilterIndexApplierRef>; 2],
978 ) -> Self {
979 self.bloom_filter_index_appliers = appliers;
980 self
981 }
982
983 #[must_use]
985 pub(crate) fn with_fulltext_index_appliers(
986 mut self,
987 appliers: [Option<FulltextIndexApplierRef>; 2],
988 ) -> Self {
989 self.fulltext_index_appliers = appliers;
990 self
991 }
992
993 #[cfg(feature = "vector_index")]
995 #[must_use]
996 pub(crate) fn with_vector_index_applier(
997 mut self,
998 applier: Option<VectorIndexApplierRef>,
999 ) -> Self {
1000 self.vector_index_applier = applier;
1001 self
1002 }
1003
1004 #[cfg(feature = "vector_index")]
1006 #[must_use]
1007 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1008 self.vector_index_k = k;
1009 self
1010 }
1011
1012 #[must_use]
1014 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1015 self.query_start = now;
1016 self
1017 }
1018
1019 #[must_use]
1020 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1021 self.append_mode = is_append_mode;
1022 self
1023 }
1024
1025 #[must_use]
1027 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1028 self.filter_deleted = filter_deleted;
1029 self
1030 }
1031
1032 #[must_use]
1034 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1035 self.merge_mode = merge_mode;
1036 self
1037 }
1038
1039 #[must_use]
1041 pub(crate) fn with_distribution(
1042 mut self,
1043 distribution: Option<TimeSeriesDistribution>,
1044 ) -> Self {
1045 self.distribution = distribution;
1046 self
1047 }
1048
1049 #[must_use]
1051 pub(crate) fn with_series_row_selector(
1052 mut self,
1053 series_row_selector: Option<TimeSeriesRowSelector>,
1054 ) -> Self {
1055 self.series_row_selector = series_row_selector;
1056 self
1057 }
1058
1059 #[must_use]
1061 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1062 self.flat_format = flat_format;
1063 self
1064 }
1065
1066 #[must_use]
1068 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1069 self.compaction = compaction;
1070 self
1071 }
1072
1073 #[tracing::instrument(
1077 skip(self, sources, semaphore),
1078 fields(
1079 region_id = %self.region_metadata().region_id,
1080 source_count = sources.len()
1081 )
1082 )]
1083 pub(crate) fn create_parallel_sources(
1084 &self,
1085 sources: Vec<Source>,
1086 semaphore: Arc<Semaphore>,
1087 ) -> Result<Vec<Source>> {
1088 if sources.len() <= 1 {
1089 return Ok(sources);
1090 }
1091
1092 let sources = sources
1094 .into_iter()
1095 .map(|source| {
1096 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1097 self.spawn_scan_task(source, semaphore.clone(), sender);
1098 let stream = Box::pin(ReceiverStream::new(receiver));
1099 Source::Stream(stream)
1100 })
1101 .collect();
1102 Ok(sources)
1103 }
1104
1105 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1107 let memtable = &self.memtables[index.index];
1108 let mut ranges = SmallVec::new();
1109 memtable.build_ranges(index.row_group_index, &mut ranges);
1110 ranges
1111 }
1112
1113 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1114 if self.should_skip_region_partition(file) {
1115 self.predicate.predicate_without_region().cloned()
1116 } else {
1117 self.predicate.predicate().cloned()
1118 }
1119 }
1120
1121 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1122 match (
1123 self.region_partition_expr.as_ref(),
1124 file.meta_ref().partition_expr.as_ref(),
1125 ) {
1126 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1127 _ => false,
1128 }
1129 }
1130
1131 #[tracing::instrument(
1133 skip_all,
1134 fields(
1135 region_id = %self.region_metadata().region_id,
1136 file_id = %file.file_id()
1137 )
1138 )]
1139 pub async fn prune_file(
1140 &self,
1141 file: &FileHandle,
1142 reader_metrics: &mut ReaderMetrics,
1143 ) -> Result<FileRangeBuilder> {
1144 let predicate = self.predicate_for_file(file);
1145 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1146 let decode_pk_values = !self.compaction && self.mapper.has_tags();
1147 let reader = self
1148 .access_layer
1149 .read_sst(file.clone())
1150 .predicate(predicate)
1151 .projection(Some(self.read_column_ids.clone()))
1152 .cache(self.cache_strategy.clone())
1153 .inverted_index_appliers(self.inverted_index_appliers.clone())
1154 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1155 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1156 #[cfg(feature = "vector_index")]
1157 let reader = {
1158 let mut reader = reader;
1159 reader =
1160 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1161 reader
1162 };
1163 let res = reader
1164 .expected_metadata(Some(self.mapper.metadata().clone()))
1165 .flat_format(self.flat_format)
1166 .compaction(self.compaction)
1167 .pre_filter_mode(filter_mode)
1168 .decode_primary_key_values(decode_pk_values)
1169 .build_reader_input(reader_metrics)
1170 .await;
1171 let (mut file_range_ctx, selection) = match res {
1172 Ok(x) => x,
1173 Err(e) => {
1174 if e.is_object_not_found() && self.ignore_file_not_found {
1175 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1176 return Ok(FileRangeBuilder::default());
1177 } else {
1178 return Err(e);
1179 }
1180 }
1181 };
1182
1183 let need_compat = !compat::has_same_columns_and_pk_encoding(
1184 self.mapper.metadata(),
1185 file_range_ctx.read_format().metadata(),
1186 );
1187 if need_compat {
1188 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1191 let mapper = self.mapper.as_flat().unwrap();
1192 FlatCompatBatch::try_new(
1193 mapper,
1194 flat_format.metadata(),
1195 flat_format.format_projection(),
1196 self.compaction,
1197 )?
1198 .map(CompatBatch::Flat)
1199 } else {
1200 let compact_batch = PrimaryKeyCompatBatch::new(
1201 &self.mapper,
1202 file_range_ctx.read_format().metadata().clone(),
1203 )?;
1204 Some(CompatBatch::PrimaryKey(compact_batch))
1205 };
1206 file_range_ctx.set_compat_batch(compat);
1207 }
1208 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1209 }
1210
1211 #[tracing::instrument(
1213 skip(self, input, semaphore, sender),
1214 fields(region_id = %self.region_metadata().region_id)
1215 )]
1216 pub(crate) fn spawn_scan_task(
1217 &self,
1218 mut input: Source,
1219 semaphore: Arc<Semaphore>,
1220 sender: mpsc::Sender<Result<Batch>>,
1221 ) {
1222 let region_id = self.region_metadata().region_id;
1223 let span = tracing::info_span!(
1224 "ScanInput::parallel_scan_task",
1225 region_id = %region_id,
1226 stream_kind = "batch"
1227 );
1228 common_runtime::spawn_global(
1229 async move {
1230 loop {
1231 let maybe_batch = {
1234 let _permit = semaphore.acquire().await.unwrap();
1236 input.next_batch().await
1237 };
1238 match maybe_batch {
1239 Ok(Some(batch)) => {
1240 let _ = sender.send(Ok(batch)).await;
1241 }
1242 Ok(None) => break,
1243 Err(e) => {
1244 let _ = sender.send(Err(e)).await;
1245 break;
1246 }
1247 }
1248 }
1249 }
1250 .instrument(span),
1251 );
1252 }
1253
1254 #[tracing::instrument(
1258 skip(self, sources, semaphore),
1259 fields(
1260 region_id = %self.region_metadata().region_id,
1261 source_count = sources.len()
1262 )
1263 )]
1264 pub(crate) fn create_parallel_flat_sources(
1265 &self,
1266 sources: Vec<BoxedRecordBatchStream>,
1267 semaphore: Arc<Semaphore>,
1268 ) -> Result<Vec<BoxedRecordBatchStream>> {
1269 if sources.len() <= 1 {
1270 return Ok(sources);
1271 }
1272
1273 let sources = sources
1275 .into_iter()
1276 .map(|source| {
1277 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1278 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1279 let stream = Box::pin(ReceiverStream::new(receiver));
1280 Box::pin(stream) as _
1281 })
1282 .collect();
1283 Ok(sources)
1284 }
1285
1286 #[tracing::instrument(
1288 skip(self, input, semaphore, sender),
1289 fields(region_id = %self.region_metadata().region_id)
1290 )]
1291 pub(crate) fn spawn_flat_scan_task(
1292 &self,
1293 mut input: BoxedRecordBatchStream,
1294 semaphore: Arc<Semaphore>,
1295 sender: mpsc::Sender<Result<RecordBatch>>,
1296 ) {
1297 let region_id = self.region_metadata().region_id;
1298 let span = tracing::info_span!(
1299 "ScanInput::parallel_scan_task",
1300 region_id = %region_id,
1301 stream_kind = "flat"
1302 );
1303 common_runtime::spawn_global(
1304 async move {
1305 loop {
1306 let maybe_batch = {
1309 let _permit = semaphore.acquire().await.unwrap();
1311 input.next().await
1312 };
1313 match maybe_batch {
1314 Some(Ok(batch)) => {
1315 let _ = sender.send(Ok(batch)).await;
1316 }
1317 Some(Err(e)) => {
1318 let _ = sender.send(Err(e)).await;
1319 break;
1320 }
1321 None => break,
1322 }
1323 }
1324 }
1325 .instrument(span),
1326 );
1327 }
1328
1329 pub(crate) fn total_rows(&self) -> usize {
1330 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1331 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1332
1333 let rows = rows_in_files + rows_in_memtables;
1334 #[cfg(feature = "enterprise")]
1335 let rows = rows
1336 + self
1337 .extension_ranges
1338 .iter()
1339 .map(|x| x.num_rows())
1340 .sum::<u64>() as usize;
1341 rows
1342 }
1343
1344 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1345 &self.predicate
1346 }
1347
1348 pub(crate) fn num_memtables(&self) -> usize {
1350 self.memtables.len()
1351 }
1352
1353 pub(crate) fn num_files(&self) -> usize {
1355 self.files.len()
1356 }
1357
1358 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1360 let file_index = index.index - self.num_memtables();
1361 &self.files[file_index]
1362 }
1363
1364 pub fn region_metadata(&self) -> &RegionMetadataRef {
1365 self.mapper.metadata()
1366 }
1367}
1368
1369#[cfg(feature = "enterprise")]
1370impl ScanInput {
1371 #[must_use]
1372 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1373 Self {
1374 extension_ranges,
1375 ..self
1376 }
1377 }
1378
1379 #[cfg(feature = "enterprise")]
1380 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1381 &self.extension_ranges
1382 }
1383
1384 #[cfg(feature = "enterprise")]
1386 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1387 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1388 }
1389}
1390
1391#[cfg(test)]
1392impl ScanInput {
1393 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1395 self.files.iter().map(|file| file.file_id()).collect()
1396 }
1397
1398 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1399 self.files.iter().map(|file| file.index_id()).collect()
1400 }
1401}
1402
1403fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1404 if append_mode {
1405 return PreFilterMode::All;
1406 }
1407
1408 match merge_mode {
1409 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1410 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1411 }
1412}
1413
1414pub struct StreamContext {
1417 pub input: ScanInput,
1419 pub(crate) ranges: Vec<RangeMeta>,
1421
1422 pub(crate) query_start: Instant,
1425}
1426
1427impl StreamContext {
1428 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1430 let query_start = input.query_start.unwrap_or_else(Instant::now);
1431 let ranges = RangeMeta::seq_scan_ranges(&input);
1432 READ_SST_COUNT.observe(input.num_files() as f64);
1433
1434 Self {
1435 input,
1436 ranges,
1437 query_start,
1438 }
1439 }
1440
1441 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1443 let query_start = input.query_start.unwrap_or_else(Instant::now);
1444 let ranges = RangeMeta::unordered_scan_ranges(&input);
1445 READ_SST_COUNT.observe(input.num_files() as f64);
1446
1447 Self {
1448 input,
1449 ranges,
1450 query_start,
1451 }
1452 }
1453
1454 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1456 self.input.num_memtables() > index.index
1457 }
1458
1459 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1460 !self.is_mem_range_index(index)
1461 && index.index < self.input.num_files() + self.input.num_memtables()
1462 }
1463
1464 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1466 self.ranges
1467 .iter()
1468 .enumerate()
1469 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1470 .collect()
1471 }
1472
1473 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1475 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1476 for range_meta in &self.ranges {
1477 for idx in &range_meta.row_group_indices {
1478 if self.is_mem_range_index(*idx) {
1479 num_mem_ranges += 1;
1480 } else if self.is_file_range_index(*idx) {
1481 num_file_ranges += 1;
1482 } else {
1483 num_other_ranges += 1;
1484 }
1485 }
1486 }
1487 if verbose {
1488 write!(f, "{{")?;
1489 }
1490 write!(
1491 f,
1492 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1493 self.ranges.len(),
1494 num_mem_ranges,
1495 self.input.num_files(),
1496 num_file_ranges,
1497 )?;
1498 if num_other_ranges > 0 {
1499 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1500 }
1501 write!(f, "}}")?;
1502
1503 if let Some(selector) = &self.input.series_row_selector {
1504 write!(f, ", \"selector\":\"{}\"", selector)?;
1505 }
1506 if let Some(distribution) = &self.input.distribution {
1507 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1508 }
1509
1510 if verbose {
1511 self.format_verbose_content(f)?;
1512 }
1513
1514 Ok(())
1515 }
1516
1517 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1518 struct FileWrapper<'a> {
1519 file: &'a FileHandle,
1520 }
1521
1522 impl fmt::Debug for FileWrapper<'_> {
1523 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1524 let (start, end) = self.file.time_range();
1525 write!(
1526 f,
1527 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1528 self.file.file_id(),
1529 start.value(),
1530 start.unit(),
1531 end.value(),
1532 end.unit(),
1533 self.file.num_rows(),
1534 self.file.size(),
1535 self.file.index_size()
1536 )
1537 }
1538 }
1539
1540 struct InputWrapper<'a> {
1541 input: &'a ScanInput,
1542 }
1543
1544 #[cfg(feature = "enterprise")]
1545 impl InputWrapper<'_> {
1546 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1547 if self.input.extension_ranges.is_empty() {
1548 return Ok(());
1549 }
1550
1551 let mut delimiter = "";
1552 write!(f, ", extension_ranges: [")?;
1553 for range in self.input.extension_ranges() {
1554 write!(f, "{}{:?}", delimiter, range)?;
1555 delimiter = ", ";
1556 }
1557 write!(f, "]")?;
1558 Ok(())
1559 }
1560 }
1561
1562 impl fmt::Debug for InputWrapper<'_> {
1563 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1564 let output_schema = self.input.mapper.output_schema();
1565 if !output_schema.is_empty() {
1566 let names: Vec<_> = output_schema
1567 .column_schemas()
1568 .iter()
1569 .map(|col| &col.name)
1570 .collect();
1571 write!(f, ", \"projection\": {:?}", names)?;
1572 }
1573 if let Some(predicate) = &self.input.predicate.predicate()
1574 && !predicate.exprs().is_empty()
1575 {
1576 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1577 write!(f, ", \"filters\": {:?}", exprs)?;
1578 }
1579 if !self.input.files.is_empty() {
1580 write!(f, ", \"files\": ")?;
1581 f.debug_list()
1582 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1583 .finish()?;
1584 }
1585 write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1586
1587 #[cfg(feature = "enterprise")]
1588 self.format_extension_ranges(f)?;
1589
1590 Ok(())
1591 }
1592 }
1593
1594 write!(f, "{:?}", InputWrapper { input: &self.input })
1595 }
1596}
1597
1598#[derive(Clone, Default)]
1601pub struct PredicateGroup {
1602 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1603 predicate_all: Option<Predicate>,
1605 predicate_without_region: Option<Predicate>,
1607 region_partition_expr: Option<PartitionExpr>,
1609}
1610
1611impl PredicateGroup {
1612 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1614 let mut combined_exprs = exprs.to_vec();
1615 let mut region_partition_expr = None;
1616
1617 if let Some(expr_json) = metadata.partition_expr.as_ref()
1618 && !expr_json.is_empty()
1619 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1620 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1621 {
1622 let logical_expr = expr
1623 .try_as_logical_expr()
1624 .context(InvalidPartitionExprSnafu {
1625 expr: expr_json.clone(),
1626 })?;
1627
1628 combined_exprs.push(logical_expr);
1629 region_partition_expr = Some(expr);
1630 }
1631
1632 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1633 let mut columns = HashSet::new();
1635 for expr in &combined_exprs {
1636 columns.clear();
1637 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1638 continue;
1639 };
1640 time_filters.push(filter);
1641 }
1642 let time_filters = if time_filters.is_empty() {
1643 None
1644 } else {
1645 Some(Arc::new(time_filters))
1646 };
1647
1648 let predicate_all = if combined_exprs.is_empty() {
1649 None
1650 } else {
1651 Some(Predicate::new(combined_exprs))
1652 };
1653 let predicate_without_region = if exprs.is_empty() {
1654 None
1655 } else {
1656 Some(Predicate::new(exprs.to_vec()))
1657 };
1658
1659 Ok(Self {
1660 time_filters,
1661 predicate_all,
1662 predicate_without_region,
1663 region_partition_expr,
1664 })
1665 }
1666
1667 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1669 self.time_filters.clone()
1670 }
1671
1672 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1674 self.predicate_all.as_ref()
1675 }
1676
1677 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1679 self.predicate_without_region.as_ref()
1680 }
1681
1682 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1684 self.region_partition_expr.as_ref()
1685 }
1686
1687 fn expr_to_filter(
1688 expr: &Expr,
1689 metadata: &RegionMetadata,
1690 columns: &mut HashSet<Column>,
1691 ) -> Option<SimpleFilterEvaluator> {
1692 columns.clear();
1693 expr_to_columns(expr, columns).ok()?;
1696 if columns.len() > 1 {
1697 return None;
1699 }
1700 let column = columns.iter().next()?;
1701 let column_meta = metadata.column_by_name(&column.name)?;
1702 if column_meta.semantic_type == SemanticType::Timestamp {
1703 SimpleFilterEvaluator::try_new(expr)
1704 } else {
1705 None
1706 }
1707 }
1708}
1709
1710#[cfg(test)]
1711mod tests {
1712 use std::sync::Arc;
1713
1714 use datafusion_expr::{col, lit};
1715 use store_api::storage::ScanRequest;
1716
1717 use super::*;
1718 use crate::memtable::time_partition::TimePartitions;
1719 use crate::region::version::VersionBuilder;
1720 use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1721 use crate::test_util::scheduler_util::SchedulerEnv;
1722
1723 fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1724 let mutable = Arc::new(TimePartitions::new(
1725 metadata.clone(),
1726 Arc::new(EmptyMemtableBuilder::default()),
1727 0,
1728 None,
1729 ));
1730 Arc::new(VersionBuilder::new(metadata, mutable).build())
1731 }
1732
1733 #[tokio::test]
1734 async fn test_build_read_column_ids_includes_filters() {
1735 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1736 let version = new_version(metadata.clone());
1737 let env = SchedulerEnv::new().await;
1738 let request = ScanRequest {
1739 projection: Some(vec![4]),
1740 filters: vec![
1741 col("v0").gt(lit(1)),
1742 col("ts").gt(lit(0)),
1743 col("k0").eq(lit("foo")),
1744 ],
1745 ..Default::default()
1746 };
1747 let scan_region = ScanRegion::new(
1748 version,
1749 env.access_layer.clone(),
1750 request,
1751 CacheStrategy::Disabled,
1752 );
1753 let predicate =
1754 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1755 let projection = scan_region.request.projection.as_ref().unwrap();
1756 let read_ids = scan_region
1757 .build_read_column_ids(projection, &predicate)
1758 .unwrap();
1759 assert_eq!(vec![4, 0, 2, 3], read_ids);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_build_read_column_ids_empty_projection() {
1764 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1765 let version = new_version(metadata.clone());
1766 let env = SchedulerEnv::new().await;
1767 let request = ScanRequest {
1768 projection: Some(vec![]),
1769 ..Default::default()
1770 };
1771 let scan_region = ScanRegion::new(
1772 version,
1773 env.access_layer.clone(),
1774 request,
1775 CacheStrategy::Disabled,
1776 );
1777 let predicate =
1778 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1779 let projection = scan_region.request.projection.as_ref().unwrap();
1780 let read_ids = scan_region
1781 .build_read_column_ids(projection, &predicate)
1782 .unwrap();
1783 assert_eq!(vec![2], read_ids);
1785 }
1786
1787 #[tokio::test]
1788 async fn test_build_read_column_ids_keeps_projection_order() {
1789 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1790 let version = new_version(metadata.clone());
1791 let env = SchedulerEnv::new().await;
1792 let request = ScanRequest {
1793 projection: Some(vec![4, 1]),
1794 filters: vec![col("v0").gt(lit(1))],
1795 ..Default::default()
1796 };
1797 let scan_region = ScanRegion::new(
1798 version,
1799 env.access_layer.clone(),
1800 request,
1801 CacheStrategy::Disabled,
1802 );
1803 let predicate =
1804 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1805 let projection = scan_region.request.projection.as_ref().unwrap();
1806 let read_ids = scan_region
1807 .build_read_column_ids(projection, &predicate)
1808 .unwrap();
1809 assert_eq!(vec![4, 1, 3], read_ids);
1811 }
1812}