1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::ResultExt;
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40 RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83pub(crate) enum Scanner {
85 Seq(SeqScan),
87 Unordered(UnorderedScan),
89 Series(SeriesScan),
91}
92
93impl Scanner {
94 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97 match self {
98 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100 Scanner::Series(series_scan) => series_scan.build_stream().await,
101 }
102 }
103
104 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106 match self {
107 Scanner::Seq(x) => x.scan_all_partitions(),
108 Scanner::Unordered(x) => x.scan_all_partitions(),
109 Scanner::Series(x) => x.scan_all_partitions(),
110 }
111 }
112}
113
114#[cfg(test)]
115impl Scanner {
116 pub(crate) fn num_files(&self) -> usize {
118 match self {
119 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121 Scanner::Series(series_scan) => series_scan.input().num_files(),
122 }
123 }
124
125 pub(crate) fn num_memtables(&self) -> usize {
127 match self {
128 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131 }
132 }
133
134 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136 match self {
137 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139 Scanner::Series(series_scan) => series_scan.input().file_ids(),
140 }
141 }
142
143 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144 match self {
145 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147 Scanner::Series(series_scan) => series_scan.input().index_ids(),
148 }
149 }
150
151 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153 use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155 let request = PrepareRequest::default().with_target_partitions(target_partitions);
156 match self {
157 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160 }
161 }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165pub(crate) struct ScanRegion {
215 version: VersionRef,
217 access_layer: AccessLayerRef,
219 request: ScanRequest,
221 cache_strategy: CacheStrategy,
223 parallel_scan_channel_size: usize,
225 max_concurrent_scan_files: usize,
227 ignore_inverted_index: bool,
229 ignore_fulltext_index: bool,
231 ignore_bloom_filter: bool,
233 start_time: Option<Instant>,
235 filter_deleted: bool,
238 #[cfg(feature = "enterprise")]
239 extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243 pub(crate) fn new(
245 version: VersionRef,
246 access_layer: AccessLayerRef,
247 request: ScanRequest,
248 cache_strategy: CacheStrategy,
249 ) -> ScanRegion {
250 ScanRegion {
251 version,
252 access_layer,
253 request,
254 cache_strategy,
255 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257 ignore_inverted_index: false,
258 ignore_fulltext_index: false,
259 ignore_bloom_filter: false,
260 start_time: None,
261 filter_deleted: true,
262 #[cfg(feature = "enterprise")]
263 extension_range_provider: None,
264 }
265 }
266
267 #[must_use]
269 pub(crate) fn with_parallel_scan_channel_size(
270 mut self,
271 parallel_scan_channel_size: usize,
272 ) -> Self {
273 self.parallel_scan_channel_size = parallel_scan_channel_size;
274 self
275 }
276
277 #[must_use]
279 pub(crate) fn with_max_concurrent_scan_files(
280 mut self,
281 max_concurrent_scan_files: usize,
282 ) -> Self {
283 self.max_concurrent_scan_files = max_concurrent_scan_files;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290 self.ignore_inverted_index = ignore;
291 self
292 }
293
294 #[must_use]
296 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297 self.ignore_fulltext_index = ignore;
298 self
299 }
300
301 #[must_use]
303 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304 self.ignore_bloom_filter = ignore;
305 self
306 }
307
308 #[must_use]
309 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310 self.start_time = Some(now);
311 self
312 }
313
314 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315 self.filter_deleted = filter_deleted;
316 }
317
318 #[cfg(feature = "enterprise")]
319 pub(crate) fn set_extension_range_provider(
320 &mut self,
321 extension_range_provider: BoxedExtensionRangeProvider,
322 ) {
323 self.extension_range_provider = Some(extension_range_provider);
324 }
325
326 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328 pub(crate) async fn scanner(self) -> Result<Scanner> {
329 if self.use_series_scan() {
330 self.series_scan().await.map(Scanner::Series)
331 } else if self.use_unordered_scan() {
332 self.unordered_scan().await.map(Scanner::Unordered)
335 } else {
336 self.seq_scan().await.map(Scanner::Seq)
337 }
338 }
339
340 #[tracing::instrument(
342 level = tracing::Level::DEBUG,
343 skip_all,
344 fields(region_id = %self.region_id())
345 )]
346 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347 if self.use_series_scan() {
348 self.series_scan()
349 .await
350 .map(|scanner| Box::new(scanner) as _)
351 } else if self.use_unordered_scan() {
352 self.unordered_scan()
353 .await
354 .map(|scanner| Box::new(scanner) as _)
355 } else {
356 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357 }
358 }
359
360 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363 let input = self.scan_input().await?.with_compaction(false);
364 Ok(SeqScan::new(input))
365 }
366
367 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370 let input = self.scan_input().await?;
371 Ok(UnorderedScan::new(input))
372 }
373
374 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377 let input = self.scan_input().await?;
378 Ok(SeriesScan::new(input))
379 }
380
381 fn use_unordered_scan(&self) -> bool {
383 self.version.options.append_mode
390 && self.request.series_row_selector.is_none()
391 && (self.request.distribution.is_none()
392 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393 }
394
395 fn use_series_scan(&self) -> bool {
397 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398 }
399
400 fn use_flat_format(&self) -> bool {
402 self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
403 }
404
405 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407 async fn scan_input(mut self) -> Result<ScanInput> {
408 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409 let time_range = self.build_time_range_predicate();
410 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411 let flat_format = self.use_flat_format();
412
413 let mapper = match &self.request.projection {
415 Some(p) => {
416 ProjectionMapper::new(&self.version.metadata, p.iter().copied(), flat_format)?
417 }
418 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
419 };
420
421 let ssts = &self.version.ssts;
422 let mut files = Vec::new();
423 for level in ssts.levels() {
424 for file in level.files.values() {
425 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
426 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
427 (Some(_), None) => true,
433 (None, _) => true,
434 };
435
436 if exceed_min_sequence && file_in_range(file, &time_range) {
438 files.push(file.clone());
439 }
440 }
444 }
445
446 let memtables = self.version.memtables.list_memtables();
447 let mut mem_range_builders = Vec::new();
449 let filter_mode = pre_filter_mode(
450 self.version.options.append_mode,
451 self.version.options.merge_mode(),
452 );
453
454 for m in memtables {
455 let Some((start, end)) = m.stats().time_range() else {
457 continue;
458 };
459 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
461 if !memtable_range.intersects(&time_range) {
462 continue;
463 }
464 let ranges_in_memtable = m.ranges(
465 Some(mapper.column_ids()),
466 RangesOptions::default()
467 .with_predicate(predicate.clone())
468 .with_sequence(SequenceRange::new(
469 self.request.memtable_min_sequence,
470 self.request.memtable_max_sequence,
471 ))
472 .with_pre_filter_mode(filter_mode),
473 )?;
474 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
475 let stats = v.stats().clone();
476 MemRangeBuilder::new(v, stats)
477 }));
478 }
479
480 let region_id = self.region_id();
481 debug!(
482 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
483 region_id,
484 self.request,
485 time_range,
486 mem_range_builders.len(),
487 files.len(),
488 self.version.options.append_mode,
489 flat_format,
490 );
491
492 let (non_field_filters, field_filters) = self.partition_by_field_filters();
493 let inverted_index_appliers = [
494 self.build_invereted_index_applier(&non_field_filters),
495 self.build_invereted_index_applier(&field_filters),
496 ];
497 let bloom_filter_appliers = [
498 self.build_bloom_filter_applier(&non_field_filters),
499 self.build_bloom_filter_applier(&field_filters),
500 ];
501 let fulltext_index_appliers = [
502 self.build_fulltext_index_applier(&non_field_filters),
503 self.build_fulltext_index_applier(&field_filters),
504 ];
505 #[cfg(feature = "vector_index")]
506 let vector_index_applier = self.build_vector_index_applier();
507 #[cfg(feature = "vector_index")]
508 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
509 if self.request.filters.is_empty() {
510 search.k
511 } else {
512 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
513 }
514 });
515 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
516
517 if flat_format {
518 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
520 }
521
522 let input = ScanInput::new(self.access_layer, mapper)
523 .with_time_range(Some(time_range))
524 .with_predicate(predicate)
525 .with_memtables(mem_range_builders)
526 .with_files(files)
527 .with_cache(self.cache_strategy)
528 .with_inverted_index_appliers(inverted_index_appliers)
529 .with_bloom_filter_index_appliers(bloom_filter_appliers)
530 .with_fulltext_index_appliers(fulltext_index_appliers)
531 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
532 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
533 .with_start_time(self.start_time)
534 .with_append_mode(self.version.options.append_mode)
535 .with_filter_deleted(self.filter_deleted)
536 .with_merge_mode(self.version.options.merge_mode())
537 .with_series_row_selector(self.request.series_row_selector)
538 .with_distribution(self.request.distribution)
539 .with_flat_format(flat_format);
540 #[cfg(feature = "vector_index")]
541 let input = input
542 .with_vector_index_applier(vector_index_applier)
543 .with_vector_index_k(vector_index_k);
544
545 #[cfg(feature = "enterprise")]
546 let input = if let Some(provider) = self.extension_range_provider {
547 let ranges = provider
548 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
549 .await?;
550 debug!("Find extension ranges: {ranges:?}");
551 input.with_extension_ranges(ranges)
552 } else {
553 input
554 };
555 Ok(input)
556 }
557
558 fn region_id(&self) -> RegionId {
559 self.version.metadata.region_id
560 }
561
562 fn build_time_range_predicate(&self) -> TimestampRange {
564 let time_index = self.version.metadata.time_index_column();
565 let unit = time_index
566 .column_schema
567 .data_type
568 .as_timestamp()
569 .expect("Time index must have timestamp-compatible type")
570 .unit();
571 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
572 }
573
574 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
577 let field_columns = self
578 .version
579 .metadata
580 .field_columns()
581 .map(|col| &col.column_schema.name)
582 .collect::<HashSet<_>>();
583
584 let mut columns = HashSet::new();
585
586 self.request.filters.iter().cloned().partition(|expr| {
587 columns.clear();
588 if expr_to_columns(expr, &mut columns).is_err() {
590 return true;
592 }
593 !columns
595 .iter()
596 .any(|column| field_columns.contains(&column.name))
597 })
598 }
599
600 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
602 if self.ignore_inverted_index {
603 return None;
604 }
605
606 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
607 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
608
609 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
610
611 InvertedIndexApplierBuilder::new(
612 self.access_layer.table_dir().to_string(),
613 self.access_layer.path_type(),
614 self.access_layer.object_store().clone(),
615 self.version.metadata.as_ref(),
616 self.version.metadata.inverted_indexed_column_ids(
617 self.version
618 .options
619 .index_options
620 .inverted_index
621 .ignore_column_ids
622 .iter(),
623 ),
624 self.access_layer.puffin_manager_factory().clone(),
625 )
626 .with_file_cache(file_cache)
627 .with_inverted_index_cache(inverted_index_cache)
628 .with_puffin_metadata_cache(puffin_metadata_cache)
629 .build(filters)
630 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
631 .ok()
632 .flatten()
633 .map(Arc::new)
634 }
635
636 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
638 if self.ignore_bloom_filter {
639 return None;
640 }
641
642 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
643 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
644 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
645
646 BloomFilterIndexApplierBuilder::new(
647 self.access_layer.table_dir().to_string(),
648 self.access_layer.path_type(),
649 self.access_layer.object_store().clone(),
650 self.version.metadata.as_ref(),
651 self.access_layer.puffin_manager_factory().clone(),
652 )
653 .with_file_cache(file_cache)
654 .with_bloom_filter_index_cache(bloom_filter_index_cache)
655 .with_puffin_metadata_cache(puffin_metadata_cache)
656 .build(filters)
657 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
658 .ok()
659 .flatten()
660 .map(Arc::new)
661 }
662
663 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
665 if self.ignore_fulltext_index {
666 return None;
667 }
668
669 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
670 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
671 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
672 FulltextIndexApplierBuilder::new(
673 self.access_layer.table_dir().to_string(),
674 self.access_layer.path_type(),
675 self.access_layer.object_store().clone(),
676 self.access_layer.puffin_manager_factory().clone(),
677 self.version.metadata.as_ref(),
678 )
679 .with_file_cache(file_cache)
680 .with_puffin_metadata_cache(puffin_metadata_cache)
681 .with_bloom_filter_cache(bloom_filter_index_cache)
682 .build(filters)
683 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
684 .ok()
685 .flatten()
686 .map(Arc::new)
687 }
688
689 #[cfg(feature = "vector_index")]
691 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
692 let vector_search = self.request.vector_search.as_ref()?;
693
694 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
695 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
696 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
697
698 let applier = VectorIndexApplier::new(
699 self.access_layer.table_dir().to_string(),
700 self.access_layer.path_type(),
701 self.access_layer.object_store().clone(),
702 self.access_layer.puffin_manager_factory().clone(),
703 vector_search.column_id,
704 vector_search.query_vector.clone(),
705 vector_search.metric,
706 )
707 .with_file_cache(file_cache)
708 .with_puffin_metadata_cache(puffin_metadata_cache)
709 .with_vector_index_cache(vector_index_cache);
710
711 Some(Arc::new(applier))
712 }
713}
714
715fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
717 if predicate == &TimestampRange::min_to_max() {
718 return true;
719 }
720 let (start, end) = file.time_range();
722 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
723 file_ts_range.intersects(predicate)
724}
725
726pub struct ScanInput {
728 access_layer: AccessLayerRef,
730 pub(crate) mapper: Arc<ProjectionMapper>,
732 time_range: Option<TimestampRange>,
734 pub(crate) predicate: PredicateGroup,
736 region_partition_expr: Option<PartitionExpr>,
738 pub(crate) memtables: Vec<MemRangeBuilder>,
740 pub(crate) files: Vec<FileHandle>,
742 pub(crate) cache_strategy: CacheStrategy,
744 ignore_file_not_found: bool,
746 pub(crate) parallel_scan_channel_size: usize,
748 pub(crate) max_concurrent_scan_files: usize,
750 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
752 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
753 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
754 #[cfg(feature = "vector_index")]
756 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
757 #[cfg(feature = "vector_index")]
759 pub(crate) vector_index_k: Option<usize>,
760 pub(crate) query_start: Option<Instant>,
762 pub(crate) append_mode: bool,
764 pub(crate) filter_deleted: bool,
766 pub(crate) merge_mode: MergeMode,
768 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
770 pub(crate) distribution: Option<TimeSeriesDistribution>,
772 pub(crate) flat_format: bool,
774 pub(crate) compaction: bool,
776 #[cfg(feature = "enterprise")]
777 extension_ranges: Vec<BoxedExtensionRange>,
778}
779
780impl ScanInput {
781 #[must_use]
783 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
784 ScanInput {
785 access_layer,
786 mapper: Arc::new(mapper),
787 time_range: None,
788 predicate: PredicateGroup::default(),
789 region_partition_expr: None,
790 memtables: Vec::new(),
791 files: Vec::new(),
792 cache_strategy: CacheStrategy::Disabled,
793 ignore_file_not_found: false,
794 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
795 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
796 inverted_index_appliers: [None, None],
797 bloom_filter_index_appliers: [None, None],
798 fulltext_index_appliers: [None, None],
799 #[cfg(feature = "vector_index")]
800 vector_index_applier: None,
801 #[cfg(feature = "vector_index")]
802 vector_index_k: None,
803 query_start: None,
804 append_mode: false,
805 filter_deleted: true,
806 merge_mode: MergeMode::default(),
807 series_row_selector: None,
808 distribution: None,
809 flat_format: false,
810 compaction: false,
811 #[cfg(feature = "enterprise")]
812 extension_ranges: Vec::new(),
813 }
814 }
815
816 #[must_use]
818 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
819 self.time_range = time_range;
820 self
821 }
822
823 #[must_use]
825 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
826 self.region_partition_expr = predicate.region_partition_expr().cloned();
827 self.predicate = predicate;
828 self
829 }
830
831 #[must_use]
833 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
834 self.memtables = memtables;
835 self
836 }
837
838 #[must_use]
840 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
841 self.files = files;
842 self
843 }
844
845 #[must_use]
847 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
848 self.cache_strategy = cache;
849 self
850 }
851
852 #[must_use]
854 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
855 self.ignore_file_not_found = ignore;
856 self
857 }
858
859 #[must_use]
861 pub(crate) fn with_parallel_scan_channel_size(
862 mut self,
863 parallel_scan_channel_size: usize,
864 ) -> Self {
865 self.parallel_scan_channel_size = parallel_scan_channel_size;
866 self
867 }
868
869 #[must_use]
871 pub(crate) fn with_max_concurrent_scan_files(
872 mut self,
873 max_concurrent_scan_files: usize,
874 ) -> Self {
875 self.max_concurrent_scan_files = max_concurrent_scan_files;
876 self
877 }
878
879 #[must_use]
881 pub(crate) fn with_inverted_index_appliers(
882 mut self,
883 appliers: [Option<InvertedIndexApplierRef>; 2],
884 ) -> Self {
885 self.inverted_index_appliers = appliers;
886 self
887 }
888
889 #[must_use]
891 pub(crate) fn with_bloom_filter_index_appliers(
892 mut self,
893 appliers: [Option<BloomFilterIndexApplierRef>; 2],
894 ) -> Self {
895 self.bloom_filter_index_appliers = appliers;
896 self
897 }
898
899 #[must_use]
901 pub(crate) fn with_fulltext_index_appliers(
902 mut self,
903 appliers: [Option<FulltextIndexApplierRef>; 2],
904 ) -> Self {
905 self.fulltext_index_appliers = appliers;
906 self
907 }
908
909 #[cfg(feature = "vector_index")]
911 #[must_use]
912 pub(crate) fn with_vector_index_applier(
913 mut self,
914 applier: Option<VectorIndexApplierRef>,
915 ) -> Self {
916 self.vector_index_applier = applier;
917 self
918 }
919
920 #[cfg(feature = "vector_index")]
922 #[must_use]
923 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
924 self.vector_index_k = k;
925 self
926 }
927
928 #[must_use]
930 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
931 self.query_start = now;
932 self
933 }
934
935 #[must_use]
936 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
937 self.append_mode = is_append_mode;
938 self
939 }
940
941 #[must_use]
943 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
944 self.filter_deleted = filter_deleted;
945 self
946 }
947
948 #[must_use]
950 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
951 self.merge_mode = merge_mode;
952 self
953 }
954
955 #[must_use]
957 pub(crate) fn with_distribution(
958 mut self,
959 distribution: Option<TimeSeriesDistribution>,
960 ) -> Self {
961 self.distribution = distribution;
962 self
963 }
964
965 #[must_use]
967 pub(crate) fn with_series_row_selector(
968 mut self,
969 series_row_selector: Option<TimeSeriesRowSelector>,
970 ) -> Self {
971 self.series_row_selector = series_row_selector;
972 self
973 }
974
975 #[must_use]
977 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
978 self.flat_format = flat_format;
979 self
980 }
981
982 #[must_use]
984 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
985 self.compaction = compaction;
986 self
987 }
988
989 #[tracing::instrument(
993 skip(self, sources, semaphore),
994 fields(
995 region_id = %self.region_metadata().region_id,
996 source_count = sources.len()
997 )
998 )]
999 pub(crate) fn create_parallel_sources(
1000 &self,
1001 sources: Vec<Source>,
1002 semaphore: Arc<Semaphore>,
1003 ) -> Result<Vec<Source>> {
1004 if sources.len() <= 1 {
1005 return Ok(sources);
1006 }
1007
1008 let sources = sources
1010 .into_iter()
1011 .map(|source| {
1012 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1013 self.spawn_scan_task(source, semaphore.clone(), sender);
1014 let stream = Box::pin(ReceiverStream::new(receiver));
1015 Source::Stream(stream)
1016 })
1017 .collect();
1018 Ok(sources)
1019 }
1020
1021 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1023 let memtable = &self.memtables[index.index];
1024 let mut ranges = SmallVec::new();
1025 memtable.build_ranges(index.row_group_index, &mut ranges);
1026 ranges
1027 }
1028
1029 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1030 if self.should_skip_region_partition(file) {
1031 self.predicate.predicate_without_region().cloned()
1032 } else {
1033 self.predicate.predicate().cloned()
1034 }
1035 }
1036
1037 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1038 match (
1039 self.region_partition_expr.as_ref(),
1040 file.meta_ref().partition_expr.as_ref(),
1041 ) {
1042 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1043 _ => false,
1044 }
1045 }
1046
1047 #[tracing::instrument(
1049 skip_all,
1050 fields(
1051 region_id = %self.region_metadata().region_id,
1052 file_id = %file.file_id()
1053 )
1054 )]
1055 pub async fn prune_file(
1056 &self,
1057 file: &FileHandle,
1058 reader_metrics: &mut ReaderMetrics,
1059 ) -> Result<FileRangeBuilder> {
1060 let predicate = self.predicate_for_file(file);
1061 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1062 let decode_pk_values = !self.compaction && self.mapper.has_tags();
1063 let reader = self
1064 .access_layer
1065 .read_sst(file.clone())
1066 .predicate(predicate)
1067 .projection(Some(self.mapper.column_ids().to_vec()))
1068 .cache(self.cache_strategy.clone())
1069 .inverted_index_appliers(self.inverted_index_appliers.clone())
1070 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1071 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1072 #[cfg(feature = "vector_index")]
1073 let reader = {
1074 let mut reader = reader;
1075 reader =
1076 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1077 reader
1078 };
1079 let res = reader
1080 .expected_metadata(Some(self.mapper.metadata().clone()))
1081 .flat_format(self.flat_format)
1082 .compaction(self.compaction)
1083 .pre_filter_mode(filter_mode)
1084 .decode_primary_key_values(decode_pk_values)
1085 .build_reader_input(reader_metrics)
1086 .await;
1087 let (mut file_range_ctx, selection) = match res {
1088 Ok(x) => x,
1089 Err(e) => {
1090 if e.is_object_not_found() && self.ignore_file_not_found {
1091 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1092 return Ok(FileRangeBuilder::default());
1093 } else {
1094 return Err(e);
1095 }
1096 }
1097 };
1098
1099 let need_compat = !compat::has_same_columns_and_pk_encoding(
1100 self.mapper.metadata(),
1101 file_range_ctx.read_format().metadata(),
1102 );
1103 if need_compat {
1104 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1107 let mapper = self.mapper.as_flat().unwrap();
1108 FlatCompatBatch::try_new(
1109 mapper,
1110 flat_format.metadata(),
1111 flat_format.format_projection(),
1112 self.compaction,
1113 )?
1114 .map(CompatBatch::Flat)
1115 } else {
1116 let compact_batch = PrimaryKeyCompatBatch::new(
1117 &self.mapper,
1118 file_range_ctx.read_format().metadata().clone(),
1119 )?;
1120 Some(CompatBatch::PrimaryKey(compact_batch))
1121 };
1122 file_range_ctx.set_compat_batch(compat);
1123 }
1124 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1125 }
1126
1127 #[tracing::instrument(
1129 skip(self, input, semaphore, sender),
1130 fields(region_id = %self.region_metadata().region_id)
1131 )]
1132 pub(crate) fn spawn_scan_task(
1133 &self,
1134 mut input: Source,
1135 semaphore: Arc<Semaphore>,
1136 sender: mpsc::Sender<Result<Batch>>,
1137 ) {
1138 let region_id = self.region_metadata().region_id;
1139 let span = tracing::info_span!(
1140 "ScanInput::parallel_scan_task",
1141 region_id = %region_id,
1142 stream_kind = "batch"
1143 );
1144 common_runtime::spawn_global(
1145 async move {
1146 loop {
1147 let maybe_batch = {
1150 let _permit = semaphore.acquire().await.unwrap();
1152 input.next_batch().await
1153 };
1154 match maybe_batch {
1155 Ok(Some(batch)) => {
1156 let _ = sender.send(Ok(batch)).await;
1157 }
1158 Ok(None) => break,
1159 Err(e) => {
1160 let _ = sender.send(Err(e)).await;
1161 break;
1162 }
1163 }
1164 }
1165 }
1166 .instrument(span),
1167 );
1168 }
1169
1170 #[tracing::instrument(
1174 skip(self, sources, semaphore),
1175 fields(
1176 region_id = %self.region_metadata().region_id,
1177 source_count = sources.len()
1178 )
1179 )]
1180 pub(crate) fn create_parallel_flat_sources(
1181 &self,
1182 sources: Vec<BoxedRecordBatchStream>,
1183 semaphore: Arc<Semaphore>,
1184 ) -> Result<Vec<BoxedRecordBatchStream>> {
1185 if sources.len() <= 1 {
1186 return Ok(sources);
1187 }
1188
1189 let sources = sources
1191 .into_iter()
1192 .map(|source| {
1193 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1194 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1195 let stream = Box::pin(ReceiverStream::new(receiver));
1196 Box::pin(stream) as _
1197 })
1198 .collect();
1199 Ok(sources)
1200 }
1201
1202 #[tracing::instrument(
1204 skip(self, input, semaphore, sender),
1205 fields(region_id = %self.region_metadata().region_id)
1206 )]
1207 pub(crate) fn spawn_flat_scan_task(
1208 &self,
1209 mut input: BoxedRecordBatchStream,
1210 semaphore: Arc<Semaphore>,
1211 sender: mpsc::Sender<Result<RecordBatch>>,
1212 ) {
1213 let region_id = self.region_metadata().region_id;
1214 let span = tracing::info_span!(
1215 "ScanInput::parallel_scan_task",
1216 region_id = %region_id,
1217 stream_kind = "flat"
1218 );
1219 common_runtime::spawn_global(
1220 async move {
1221 loop {
1222 let maybe_batch = {
1225 let _permit = semaphore.acquire().await.unwrap();
1227 input.next().await
1228 };
1229 match maybe_batch {
1230 Some(Ok(batch)) => {
1231 let _ = sender.send(Ok(batch)).await;
1232 }
1233 Some(Err(e)) => {
1234 let _ = sender.send(Err(e)).await;
1235 break;
1236 }
1237 None => break,
1238 }
1239 }
1240 }
1241 .instrument(span),
1242 );
1243 }
1244
1245 pub(crate) fn total_rows(&self) -> usize {
1246 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1247 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1248
1249 let rows = rows_in_files + rows_in_memtables;
1250 #[cfg(feature = "enterprise")]
1251 let rows = rows
1252 + self
1253 .extension_ranges
1254 .iter()
1255 .map(|x| x.num_rows())
1256 .sum::<u64>() as usize;
1257 rows
1258 }
1259
1260 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1261 &self.predicate
1262 }
1263
1264 pub(crate) fn num_memtables(&self) -> usize {
1266 self.memtables.len()
1267 }
1268
1269 pub(crate) fn num_files(&self) -> usize {
1271 self.files.len()
1272 }
1273
1274 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1276 let file_index = index.index - self.num_memtables();
1277 &self.files[file_index]
1278 }
1279
1280 pub fn region_metadata(&self) -> &RegionMetadataRef {
1281 self.mapper.metadata()
1282 }
1283}
1284
1285#[cfg(feature = "enterprise")]
1286impl ScanInput {
1287 #[must_use]
1288 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1289 Self {
1290 extension_ranges,
1291 ..self
1292 }
1293 }
1294
1295 #[cfg(feature = "enterprise")]
1296 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1297 &self.extension_ranges
1298 }
1299
1300 #[cfg(feature = "enterprise")]
1302 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1303 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1304 }
1305}
1306
1307#[cfg(test)]
1308impl ScanInput {
1309 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1311 self.files.iter().map(|file| file.file_id()).collect()
1312 }
1313
1314 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1315 self.files.iter().map(|file| file.index_id()).collect()
1316 }
1317}
1318
1319fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1320 if append_mode {
1321 return PreFilterMode::All;
1322 }
1323
1324 match merge_mode {
1325 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1326 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1327 }
1328}
1329
1330pub struct StreamContext {
1333 pub input: ScanInput,
1335 pub(crate) ranges: Vec<RangeMeta>,
1337
1338 pub(crate) query_start: Instant,
1341}
1342
1343impl StreamContext {
1344 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1346 let query_start = input.query_start.unwrap_or_else(Instant::now);
1347 let ranges = RangeMeta::seq_scan_ranges(&input);
1348 READ_SST_COUNT.observe(input.num_files() as f64);
1349
1350 Self {
1351 input,
1352 ranges,
1353 query_start,
1354 }
1355 }
1356
1357 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1359 let query_start = input.query_start.unwrap_or_else(Instant::now);
1360 let ranges = RangeMeta::unordered_scan_ranges(&input);
1361 READ_SST_COUNT.observe(input.num_files() as f64);
1362
1363 Self {
1364 input,
1365 ranges,
1366 query_start,
1367 }
1368 }
1369
1370 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1372 self.input.num_memtables() > index.index
1373 }
1374
1375 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1376 !self.is_mem_range_index(index)
1377 && index.index < self.input.num_files() + self.input.num_memtables()
1378 }
1379
1380 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1382 self.ranges
1383 .iter()
1384 .enumerate()
1385 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1386 .collect()
1387 }
1388
1389 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1391 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1392 for range_meta in &self.ranges {
1393 for idx in &range_meta.row_group_indices {
1394 if self.is_mem_range_index(*idx) {
1395 num_mem_ranges += 1;
1396 } else if self.is_file_range_index(*idx) {
1397 num_file_ranges += 1;
1398 } else {
1399 num_other_ranges += 1;
1400 }
1401 }
1402 }
1403 if verbose {
1404 write!(f, "{{")?;
1405 }
1406 write!(
1407 f,
1408 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1409 self.ranges.len(),
1410 num_mem_ranges,
1411 self.input.num_files(),
1412 num_file_ranges,
1413 )?;
1414 if num_other_ranges > 0 {
1415 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1416 }
1417 write!(f, "}}")?;
1418
1419 if let Some(selector) = &self.input.series_row_selector {
1420 write!(f, ", \"selector\":\"{}\"", selector)?;
1421 }
1422 if let Some(distribution) = &self.input.distribution {
1423 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1424 }
1425
1426 if verbose {
1427 self.format_verbose_content(f)?;
1428 }
1429
1430 Ok(())
1431 }
1432
1433 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1434 struct FileWrapper<'a> {
1435 file: &'a FileHandle,
1436 }
1437
1438 impl fmt::Debug for FileWrapper<'_> {
1439 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1440 let (start, end) = self.file.time_range();
1441 write!(
1442 f,
1443 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1444 self.file.file_id(),
1445 start.value(),
1446 start.unit(),
1447 end.value(),
1448 end.unit(),
1449 self.file.num_rows(),
1450 self.file.size(),
1451 self.file.index_size()
1452 )
1453 }
1454 }
1455
1456 struct InputWrapper<'a> {
1457 input: &'a ScanInput,
1458 }
1459
1460 #[cfg(feature = "enterprise")]
1461 impl InputWrapper<'_> {
1462 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1463 if self.input.extension_ranges.is_empty() {
1464 return Ok(());
1465 }
1466
1467 let mut delimiter = "";
1468 write!(f, ", extension_ranges: [")?;
1469 for range in self.input.extension_ranges() {
1470 write!(f, "{}{:?}", delimiter, range)?;
1471 delimiter = ", ";
1472 }
1473 write!(f, "]")?;
1474 Ok(())
1475 }
1476 }
1477
1478 impl fmt::Debug for InputWrapper<'_> {
1479 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1480 let output_schema = self.input.mapper.output_schema();
1481 if !output_schema.is_empty() {
1482 let names: Vec<_> = output_schema
1483 .column_schemas()
1484 .iter()
1485 .map(|col| &col.name)
1486 .collect();
1487 write!(f, ", \"projection\": {:?}", names)?;
1488 }
1489 if let Some(predicate) = &self.input.predicate.predicate()
1490 && !predicate.exprs().is_empty()
1491 {
1492 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1493 write!(f, ", \"filters\": {:?}", exprs)?;
1494 }
1495 if !self.input.files.is_empty() {
1496 write!(f, ", \"files\": ")?;
1497 f.debug_list()
1498 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1499 .finish()?;
1500 }
1501 write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1502
1503 #[cfg(feature = "enterprise")]
1504 self.format_extension_ranges(f)?;
1505
1506 Ok(())
1507 }
1508 }
1509
1510 write!(f, "{:?}", InputWrapper { input: &self.input })
1511 }
1512}
1513
1514#[derive(Clone, Default)]
1517pub struct PredicateGroup {
1518 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1519 predicate_all: Option<Predicate>,
1521 predicate_without_region: Option<Predicate>,
1523 region_partition_expr: Option<PartitionExpr>,
1525}
1526
1527impl PredicateGroup {
1528 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1530 let mut combined_exprs = exprs.to_vec();
1531 let mut region_partition_expr = None;
1532
1533 if let Some(expr_json) = metadata.partition_expr.as_ref()
1534 && !expr_json.is_empty()
1535 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1536 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1537 {
1538 let logical_expr = expr
1539 .try_as_logical_expr()
1540 .context(InvalidPartitionExprSnafu {
1541 expr: expr_json.clone(),
1542 })?;
1543
1544 combined_exprs.push(logical_expr);
1545 region_partition_expr = Some(expr);
1546 }
1547
1548 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1549 let mut columns = HashSet::new();
1551 for expr in &combined_exprs {
1552 columns.clear();
1553 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1554 continue;
1555 };
1556 time_filters.push(filter);
1557 }
1558 let time_filters = if time_filters.is_empty() {
1559 None
1560 } else {
1561 Some(Arc::new(time_filters))
1562 };
1563
1564 let predicate_all = if combined_exprs.is_empty() {
1565 None
1566 } else {
1567 Some(Predicate::new(combined_exprs))
1568 };
1569 let predicate_without_region = if exprs.is_empty() {
1570 None
1571 } else {
1572 Some(Predicate::new(exprs.to_vec()))
1573 };
1574
1575 Ok(Self {
1576 time_filters,
1577 predicate_all,
1578 predicate_without_region,
1579 region_partition_expr,
1580 })
1581 }
1582
1583 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1585 self.time_filters.clone()
1586 }
1587
1588 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1590 self.predicate_all.as_ref()
1591 }
1592
1593 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1595 self.predicate_without_region.as_ref()
1596 }
1597
1598 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1600 self.region_partition_expr.as_ref()
1601 }
1602
1603 fn expr_to_filter(
1604 expr: &Expr,
1605 metadata: &RegionMetadata,
1606 columns: &mut HashSet<Column>,
1607 ) -> Option<SimpleFilterEvaluator> {
1608 columns.clear();
1609 expr_to_columns(expr, columns).ok()?;
1612 if columns.len() > 1 {
1613 return None;
1615 }
1616 let column = columns.iter().next()?;
1617 let column_meta = metadata.column_by_name(&column.name)?;
1618 if column_meta.semantic_type == SemanticType::Timestamp {
1619 SimpleFilterEvaluator::try_new(expr)
1620 } else {
1621 None
1622 }
1623 }
1624}