1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::schema::ext::ArrowSchemaExt;
35use futures::StreamExt;
36use partition::expr::PartitionExpr;
37use smallvec::SmallVec;
38use snafu::ResultExt;
39use store_api::metadata::{RegionMetadata, RegionMetadataRef};
40use store_api::region_engine::{PartitionRange, RegionScannerRef};
41use store_api::storage::{
42 RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
43 TimeSeriesRowSelector,
44};
45use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
46use tokio::sync::{Semaphore, mpsc};
47use tokio_stream::wrappers::ReceiverStream;
48
49use crate::access_layer::AccessLayerRef;
50use crate::cache::CacheStrategy;
51use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
52use crate::error::{InvalidPartitionExprSnafu, Result};
53#[cfg(feature = "enterprise")]
54use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
55use crate::memtable::{MemtableRange, RangesOptions};
56use crate::metrics::READ_SST_COUNT;
57use crate::read::compat::{self, FlatCompatBatch};
58use crate::read::flat_projection::FlatProjectionMapper;
59use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
60use crate::read::range_cache::ScanRequestFingerprint;
61use crate::read::read_columns::{
62 ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
63};
64use crate::read::seq_scan::SeqScan;
65use crate::read::series_scan::SeriesScan;
66use crate::read::stream::ScanBatchStream;
67use crate::read::unordered_scan::UnorderedScan;
68use crate::read::{BoxedRecordBatchStream, RecordBatch};
69use crate::region::options::MergeMode;
70use crate::region::version::VersionRef;
71use crate::sst::file::FileHandle;
72use crate::sst::index::bloom_filter::applier::{
73 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
74};
75use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
76use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
77use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
78use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
79#[cfg(feature = "vector_index")]
80use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
81use crate::sst::parquet::file_range::PreFilterMode;
82use crate::sst::parquet::reader::ReaderMetrics;
83
84#[cfg(feature = "vector_index")]
85const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
86
87pub(crate) enum Scanner {
89 Seq(SeqScan),
91 Unordered(UnorderedScan),
93 Series(SeriesScan),
95}
96
97impl Scanner {
98 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
100 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
101 match self {
102 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
103 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
104 Scanner::Series(series_scan) => series_scan.build_stream().await,
105 }
106 }
107
108 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
110 match self {
111 Scanner::Seq(x) => x.scan_all_partitions(),
112 Scanner::Unordered(x) => x.scan_all_partitions(),
113 Scanner::Series(x) => x.scan_all_partitions(),
114 }
115 }
116}
117
118#[cfg(test)]
119impl Scanner {
120 pub(crate) fn num_files(&self) -> usize {
122 match self {
123 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
124 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
125 Scanner::Series(series_scan) => series_scan.input().num_files(),
126 }
127 }
128
129 pub(crate) fn num_memtables(&self) -> usize {
131 match self {
132 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
133 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
134 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
135 }
136 }
137
138 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
140 match self {
141 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
142 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
143 Scanner::Series(series_scan) => series_scan.input().file_ids(),
144 }
145 }
146
147 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
148 match self {
149 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
150 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
151 Scanner::Series(series_scan) => series_scan.input().index_ids(),
152 }
153 }
154
155 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
156 match self {
157 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
158 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
159 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
160 }
161 }
162
163 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
165 use store_api::region_engine::{PrepareRequest, RegionScanner};
166
167 let request = PrepareRequest::default().with_target_partitions(target_partitions);
168 match self {
169 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
170 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
171 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
172 }
173 }
174}
175
176#[cfg_attr(doc, aquamarine::aquamarine)]
177pub(crate) struct ScanRegion {
227 version: VersionRef,
229 access_layer: AccessLayerRef,
231 request: ScanRequest,
233 cache_strategy: CacheStrategy,
235 max_concurrent_scan_files: usize,
237 ignore_inverted_index: bool,
239 ignore_fulltext_index: bool,
241 ignore_bloom_filter: bool,
243 start_time: Option<Instant>,
245 filter_deleted: bool,
248 #[cfg(feature = "enterprise")]
249 extension_range_provider: Option<BoxedExtensionRangeProvider>,
250}
251
252impl ScanRegion {
253 pub(crate) fn new(
255 version: VersionRef,
256 access_layer: AccessLayerRef,
257 request: ScanRequest,
258 cache_strategy: CacheStrategy,
259 ) -> ScanRegion {
260 ScanRegion {
261 version,
262 access_layer,
263 request,
264 cache_strategy,
265 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
266 ignore_inverted_index: false,
267 ignore_fulltext_index: false,
268 ignore_bloom_filter: false,
269 start_time: None,
270 filter_deleted: true,
271 #[cfg(feature = "enterprise")]
272 extension_range_provider: None,
273 }
274 }
275
276 #[must_use]
278 pub(crate) fn with_max_concurrent_scan_files(
279 mut self,
280 max_concurrent_scan_files: usize,
281 ) -> Self {
282 self.max_concurrent_scan_files = max_concurrent_scan_files;
283 self
284 }
285
286 #[must_use]
288 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
289 self.ignore_inverted_index = ignore;
290 self
291 }
292
293 #[must_use]
295 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
296 self.ignore_fulltext_index = ignore;
297 self
298 }
299
300 #[must_use]
302 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
303 self.ignore_bloom_filter = ignore;
304 self
305 }
306
307 #[must_use]
308 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
309 self.start_time = Some(now);
310 self
311 }
312
313 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
314 self.filter_deleted = filter_deleted;
315 }
316
317 #[cfg(feature = "enterprise")]
318 pub(crate) fn set_extension_range_provider(
319 &mut self,
320 extension_range_provider: BoxedExtensionRangeProvider,
321 ) {
322 self.extension_range_provider = Some(extension_range_provider);
323 }
324
325 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
327 pub(crate) async fn scanner(self) -> Result<Scanner> {
328 if self.use_series_scan() {
329 self.series_scan().await.map(Scanner::Series)
330 } else if self.use_unordered_scan() {
331 self.unordered_scan().await.map(Scanner::Unordered)
334 } else {
335 self.seq_scan().await.map(Scanner::Seq)
336 }
337 }
338
339 #[tracing::instrument(
341 level = tracing::Level::DEBUG,
342 skip_all,
343 fields(region_id = %self.region_id())
344 )]
345 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
346 if self.use_series_scan() {
347 self.series_scan()
348 .await
349 .map(|scanner| Box::new(scanner) as _)
350 } else if self.use_unordered_scan() {
351 self.unordered_scan()
352 .await
353 .map(|scanner| Box::new(scanner) as _)
354 } else {
355 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
356 }
357 }
358
359 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
361 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
362 let input = self.scan_input().await?.with_compaction(false);
363 Ok(SeqScan::new(input))
364 }
365
366 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
368 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
369 let input = self.scan_input().await?;
370 Ok(UnorderedScan::new(input))
371 }
372
373 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
375 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
376 let input = self.scan_input().await?;
377 Ok(SeriesScan::new(input))
378 }
379
380 fn use_unordered_scan(&self) -> bool {
382 self.version.options.append_mode
389 && self.request.series_row_selector.is_none()
390 && (self.request.distribution.is_none()
391 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
392 }
393
394 fn use_series_scan(&self) -> bool {
396 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
397 }
398
399 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
401 async fn scan_input(self) -> Result<ScanInput> {
402 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
403 let time_range = self.build_time_range_predicate();
404 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
405
406 let read_cols = match &self.request.projection_input {
407 Some(p) => {
408 let metadata = &self.version.metadata;
411 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
412 let from_predicate = read_columns_from_predicate(&predicate, metadata);
413 merge(from_projection, from_predicate)
414 }
415 None => {
416 let read_col_ids = self
417 .version
418 .metadata
419 .column_metadatas
420 .iter()
421 .map(|col| col.column_id);
422 ReadColumns::from_deduped_column_ids(read_col_ids)
423 }
424 };
425 let read_col_ids = read_cols.column_ids();
426
427 let projection = self
429 .request
430 .projection_indices()
431 .map(|x| x.to_vec())
432 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
433 let json_type_hint = self
434 .version
435 .metadata
436 .schema
437 .arrow_schema()
438 .has_json_extension_field()
439 .then_some(&self.request.json_type_hint);
440 let mapper = FlatProjectionMapper::new_with_read_columns(
441 &self.version.metadata,
442 projection,
443 read_cols,
444 json_type_hint,
445 )?;
446
447 let ssts = &self.version.ssts;
448 let mut files = Vec::new();
449 for level in ssts.levels() {
450 for file in level.files.values() {
451 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
452 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
453 (Some(_), None) => true,
459 (None, _) => true,
460 };
461
462 if exceed_min_sequence && file_in_range(file, &time_range) {
464 files.push(file.clone());
465 }
466 }
470 }
471
472 let memtables = self.version.memtables.list_memtables();
473 let mut mem_range_builders = Vec::new();
475 let filter_mode = pre_filter_mode(
476 self.version.options.append_mode,
477 self.version.options.merge_mode(),
478 );
479
480 for m in memtables {
481 let Some((start, end)) = m.stats().time_range() else {
483 continue;
484 };
485 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
487 if !memtable_range.intersects(&time_range) {
488 continue;
489 }
490 let ranges_in_memtable = m.ranges(
491 Some(&read_col_ids),
492 RangesOptions::default()
493 .with_predicate(predicate.clone())
494 .with_sequence(SequenceRange::new(
495 self.request.memtable_min_sequence,
496 self.request.memtable_max_sequence,
497 ))
498 .with_pre_filter_mode(filter_mode),
499 )?;
500 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
501 let stats = v.stats().clone();
502 MemRangeBuilder::new(v, stats)
503 }));
504 }
505
506 let region_id = self.region_id();
507 debug!(
508 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
509 region_id,
510 self.request,
511 time_range,
512 mem_range_builders.len(),
513 files.len(),
514 self.version.options.append_mode,
515 );
516
517 let (non_field_filters, field_filters) = self.partition_by_field_filters();
518 let inverted_index_appliers = [
519 self.build_invereted_index_applier(&non_field_filters),
520 self.build_invereted_index_applier(&field_filters),
521 ];
522 let bloom_filter_appliers = [
523 self.build_bloom_filter_applier(&non_field_filters),
524 self.build_bloom_filter_applier(&field_filters),
525 ];
526 let fulltext_index_appliers = [
527 self.build_fulltext_index_applier(&non_field_filters),
528 self.build_fulltext_index_applier(&field_filters),
529 ];
530 #[cfg(feature = "vector_index")]
531 let vector_index_applier = self.build_vector_index_applier();
532 #[cfg(feature = "vector_index")]
533 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
534 if self.request.filters.is_empty() {
535 search.k
536 } else {
537 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
538 }
539 });
540
541 let input = ScanInput::new(self.access_layer, mapper)
542 .with_time_range(Some(time_range))
543 .with_predicate(predicate)
544 .with_memtables(mem_range_builders)
545 .with_files(files)
546 .with_cache(self.cache_strategy)
547 .with_inverted_index_appliers(inverted_index_appliers)
548 .with_bloom_filter_index_appliers(bloom_filter_appliers)
549 .with_fulltext_index_appliers(fulltext_index_appliers)
550 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
551 .with_start_time(self.start_time)
552 .with_append_mode(self.version.options.append_mode)
553 .with_filter_deleted(self.filter_deleted)
554 .with_merge_mode(self.version.options.merge_mode())
555 .with_series_row_selector(self.request.series_row_selector)
556 .with_distribution(self.request.distribution)
557 .with_explain_flat_format(
558 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
559 )
560 .with_snapshot_sequence(
561 self.request
562 .snapshot_on_scan
563 .then_some(self.request.memtable_max_sequence)
564 .flatten(),
565 );
566 #[cfg(feature = "vector_index")]
567 let input = input
568 .with_vector_index_applier(vector_index_applier)
569 .with_vector_index_k(vector_index_k);
570
571 #[cfg(feature = "enterprise")]
572 let input = if let Some(provider) = self.extension_range_provider {
573 let ranges = provider
574 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
575 .await?;
576 debug!("Find extension ranges: {ranges:?}");
577 input.with_extension_ranges(ranges)
578 } else {
579 input
580 };
581 Ok(input)
582 }
583
584 fn region_id(&self) -> RegionId {
585 self.version.metadata.region_id
586 }
587
588 fn build_time_range_predicate(&self) -> TimestampRange {
590 let time_index = self.version.metadata.time_index_column();
591 let unit = time_index
592 .column_schema
593 .data_type
594 .as_timestamp()
595 .expect("Time index must have timestamp-compatible type")
596 .unit();
597 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
598 }
599
600 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
603 let field_columns = self
604 .version
605 .metadata
606 .field_columns()
607 .map(|col| &col.column_schema.name)
608 .collect::<HashSet<_>>();
609
610 let mut columns = HashSet::new();
611
612 self.request.filters.iter().cloned().partition(|expr| {
613 columns.clear();
614 if expr_to_columns(expr, &mut columns).is_err() {
616 return true;
618 }
619 !columns
621 .iter()
622 .any(|column| field_columns.contains(&column.name))
623 })
624 }
625
626 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
628 if self.ignore_inverted_index {
629 return None;
630 }
631
632 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
633 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
634
635 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
636
637 InvertedIndexApplierBuilder::new(
638 self.access_layer.table_dir().to_string(),
639 self.access_layer.path_type(),
640 self.access_layer.object_store().clone(),
641 self.version.metadata.as_ref(),
642 self.version.metadata.inverted_indexed_column_ids(
643 self.version
644 .options
645 .index_options
646 .inverted_index
647 .ignore_column_ids
648 .iter(),
649 ),
650 self.access_layer.puffin_manager_factory().clone(),
651 )
652 .with_file_cache(file_cache)
653 .with_inverted_index_cache(inverted_index_cache)
654 .with_puffin_metadata_cache(puffin_metadata_cache)
655 .build(filters)
656 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
657 .ok()
658 .flatten()
659 .map(Arc::new)
660 }
661
662 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
664 if self.ignore_bloom_filter {
665 return None;
666 }
667
668 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
669 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
670 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
671
672 BloomFilterIndexApplierBuilder::new(
673 self.access_layer.table_dir().to_string(),
674 self.access_layer.path_type(),
675 self.access_layer.object_store().clone(),
676 self.version.metadata.as_ref(),
677 self.access_layer.puffin_manager_factory().clone(),
678 )
679 .with_file_cache(file_cache)
680 .with_bloom_filter_index_cache(bloom_filter_index_cache)
681 .with_puffin_metadata_cache(puffin_metadata_cache)
682 .build(filters)
683 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
684 .ok()
685 .flatten()
686 .map(Arc::new)
687 }
688
689 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
691 if self.ignore_fulltext_index {
692 return None;
693 }
694
695 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
696 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
697 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
698 FulltextIndexApplierBuilder::new(
699 self.access_layer.table_dir().to_string(),
700 self.access_layer.path_type(),
701 self.access_layer.object_store().clone(),
702 self.access_layer.puffin_manager_factory().clone(),
703 self.version.metadata.as_ref(),
704 )
705 .with_file_cache(file_cache)
706 .with_puffin_metadata_cache(puffin_metadata_cache)
707 .with_bloom_filter_cache(bloom_filter_index_cache)
708 .build(filters)
709 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
710 .ok()
711 .flatten()
712 .map(Arc::new)
713 }
714
715 #[cfg(feature = "vector_index")]
717 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
718 let vector_search = self.request.vector_search.as_ref()?;
719
720 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
721 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
722 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
723
724 let applier = VectorIndexApplier::new(
725 self.access_layer.table_dir().to_string(),
726 self.access_layer.path_type(),
727 self.access_layer.object_store().clone(),
728 self.access_layer.puffin_manager_factory().clone(),
729 vector_search.column_id,
730 vector_search.query_vector.clone(),
731 vector_search.metric,
732 )
733 .with_file_cache(file_cache)
734 .with_puffin_metadata_cache(puffin_metadata_cache)
735 .with_vector_index_cache(vector_index_cache);
736
737 Some(Arc::new(applier))
738 }
739}
740
741fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
743 if predicate == &TimestampRange::min_to_max() {
744 return true;
745 }
746 let (start, end) = file.time_range();
748 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
749 file_ts_range.intersects(predicate)
750}
751
752pub struct ScanInput {
754 access_layer: AccessLayerRef,
756 pub(crate) mapper: Arc<FlatProjectionMapper>,
758 pub(crate) read_cols: ReadColumns,
762 pub(crate) time_range: Option<TimestampRange>,
764 pub(crate) predicate: PredicateGroup,
766 region_partition_expr: Option<PartitionExpr>,
768 pub(crate) memtables: Vec<MemRangeBuilder>,
770 pub(crate) files: Vec<FileHandle>,
772 pub(crate) cache_strategy: CacheStrategy,
774 ignore_file_not_found: bool,
776 pub(crate) max_concurrent_scan_files: usize,
778 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
780 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
781 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
782 #[cfg(feature = "vector_index")]
784 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
785 #[cfg(feature = "vector_index")]
787 pub(crate) vector_index_k: Option<usize>,
788 pub(crate) query_start: Option<Instant>,
790 pub(crate) append_mode: bool,
792 pub(crate) filter_deleted: bool,
794 pub(crate) merge_mode: MergeMode,
796 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
798 pub(crate) distribution: Option<TimeSeriesDistribution>,
800 explain_flat_format: bool,
802 pub(crate) snapshot_sequence: Option<SequenceNumber>,
804 pub(crate) compaction: bool,
806 #[cfg(feature = "enterprise")]
807 extension_ranges: Vec<BoxedExtensionRange>,
808}
809
810impl ScanInput {
811 #[must_use]
813 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
814 ScanInput {
815 access_layer,
816 read_cols: mapper.read_columns().clone(),
817 mapper: Arc::new(mapper),
818 time_range: None,
819 predicate: PredicateGroup::default(),
820 region_partition_expr: None,
821 memtables: Vec::new(),
822 files: Vec::new(),
823 cache_strategy: CacheStrategy::Disabled,
824 ignore_file_not_found: false,
825 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
826 inverted_index_appliers: [None, None],
827 bloom_filter_index_appliers: [None, None],
828 fulltext_index_appliers: [None, None],
829 #[cfg(feature = "vector_index")]
830 vector_index_applier: None,
831 #[cfg(feature = "vector_index")]
832 vector_index_k: None,
833 query_start: None,
834 append_mode: false,
835 filter_deleted: true,
836 merge_mode: MergeMode::default(),
837 series_row_selector: None,
838 distribution: None,
839 explain_flat_format: false,
840 snapshot_sequence: None,
841 compaction: false,
842 #[cfg(feature = "enterprise")]
843 extension_ranges: Vec::new(),
844 }
845 }
846
847 #[must_use]
849 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
850 self.time_range = time_range;
851 self
852 }
853
854 #[must_use]
856 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
857 self.region_partition_expr = predicate.region_partition_expr().cloned();
858 self.predicate = predicate;
859 self
860 }
861
862 #[must_use]
864 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
865 self.memtables = memtables;
866 self
867 }
868
869 #[must_use]
871 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
872 self.files = files;
873 self
874 }
875
876 #[must_use]
878 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
879 self.cache_strategy = cache;
880 self
881 }
882
883 #[must_use]
885 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
886 self.ignore_file_not_found = ignore;
887 self
888 }
889
890 #[must_use]
892 pub(crate) fn with_max_concurrent_scan_files(
893 mut self,
894 max_concurrent_scan_files: usize,
895 ) -> Self {
896 self.max_concurrent_scan_files = max_concurrent_scan_files;
897 self
898 }
899
900 #[must_use]
902 pub(crate) fn with_inverted_index_appliers(
903 mut self,
904 appliers: [Option<InvertedIndexApplierRef>; 2],
905 ) -> Self {
906 self.inverted_index_appliers = appliers;
907 self
908 }
909
910 #[must_use]
912 pub(crate) fn with_bloom_filter_index_appliers(
913 mut self,
914 appliers: [Option<BloomFilterIndexApplierRef>; 2],
915 ) -> Self {
916 self.bloom_filter_index_appliers = appliers;
917 self
918 }
919
920 #[must_use]
922 pub(crate) fn with_fulltext_index_appliers(
923 mut self,
924 appliers: [Option<FulltextIndexApplierRef>; 2],
925 ) -> Self {
926 self.fulltext_index_appliers = appliers;
927 self
928 }
929
930 #[cfg(feature = "vector_index")]
932 #[must_use]
933 pub(crate) fn with_vector_index_applier(
934 mut self,
935 applier: Option<VectorIndexApplierRef>,
936 ) -> Self {
937 self.vector_index_applier = applier;
938 self
939 }
940
941 #[cfg(feature = "vector_index")]
943 #[must_use]
944 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
945 self.vector_index_k = k;
946 self
947 }
948
949 #[must_use]
951 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
952 self.query_start = now;
953 self
954 }
955
956 #[must_use]
957 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
958 self.append_mode = is_append_mode;
959 self
960 }
961
962 #[must_use]
964 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
965 self.filter_deleted = filter_deleted;
966 self
967 }
968
969 #[must_use]
971 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
972 self.merge_mode = merge_mode;
973 self
974 }
975
976 #[must_use]
978 pub(crate) fn with_distribution(
979 mut self,
980 distribution: Option<TimeSeriesDistribution>,
981 ) -> Self {
982 self.distribution = distribution;
983 self
984 }
985
986 #[must_use]
988 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
989 self.explain_flat_format = explain_flat_format;
990 self
991 }
992
993 #[must_use]
995 pub(crate) fn with_series_row_selector(
996 mut self,
997 series_row_selector: Option<TimeSeriesRowSelector>,
998 ) -> Self {
999 self.series_row_selector = series_row_selector;
1000 self
1001 }
1002
1003 #[must_use]
1004 pub(crate) fn with_snapshot_sequence(
1005 mut self,
1006 snapshot_sequence: Option<SequenceNumber>,
1007 ) -> Self {
1008 self.snapshot_sequence = snapshot_sequence;
1009 self
1010 }
1011
1012 #[must_use]
1014 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1015 self.compaction = compaction;
1016 self
1017 }
1018
1019 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1021 let memtable = &self.memtables[index.index];
1022 let mut ranges = SmallVec::new();
1023 memtable.build_ranges(index.row_group_index, &mut ranges);
1024 ranges
1025 }
1026
1027 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1028 if self.should_skip_region_partition(file) {
1029 self.predicate.predicate_without_region().cloned()
1030 } else {
1031 self.predicate.predicate().cloned()
1032 }
1033 }
1034
1035 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1036 match (
1037 self.region_partition_expr.as_ref(),
1038 file.meta_ref().partition_expr.as_ref(),
1039 ) {
1040 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1041 _ => false,
1042 }
1043 }
1044
1045 #[tracing::instrument(
1047 skip_all,
1048 fields(
1049 region_id = %self.region_metadata().region_id,
1050 file_id = %file.file_id()
1051 )
1052 )]
1053 pub async fn prune_file(
1054 &self,
1055 file: &FileHandle,
1056 pre_filter_mode: PreFilterMode,
1057 reader_metrics: &mut ReaderMetrics,
1058 ) -> Result<FileRangeBuilder> {
1059 let predicate = self.predicate_for_file(file);
1060 let may_build_selective_row_selection = predicate.is_some();
1061 let decode_pk_values = !self.compaction
1062 && self
1063 .mapper
1064 .read_columns()
1065 .column_ids_iter()
1066 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1067 let reader = self
1068 .access_layer
1069 .read_sst(file.clone())
1070 .predicate(predicate)
1071 .projection(Some(self.read_cols.clone()))
1072 .cache(self.cache_strategy.clone())
1073 .inverted_index_appliers(self.inverted_index_appliers.clone())
1074 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1075 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1076 let reader = if !self.compaction && may_build_selective_row_selection {
1077 reader.deferred_optional_page_index()
1078 } else {
1079 reader
1080 };
1081 #[cfg(feature = "vector_index")]
1082 let reader = {
1083 let mut reader = reader;
1084 reader =
1085 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1086 reader
1087 };
1088 let res = reader
1089 .expected_metadata(Some(self.mapper.metadata().clone()))
1090 .compaction(self.compaction)
1091 .pre_filter_mode(pre_filter_mode)
1092 .decode_primary_key_values(decode_pk_values)
1093 .build_reader_input(reader_metrics)
1094 .await;
1095 let read_input = match res {
1096 Ok(x) => x,
1097 Err(e) => {
1098 if e.is_object_not_found() && self.ignore_file_not_found {
1099 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1100 return Ok(FileRangeBuilder::default());
1101 } else {
1102 return Err(e);
1103 }
1104 }
1105 };
1106
1107 let Some((mut file_range_ctx, selection)) = read_input else {
1108 return Ok(FileRangeBuilder::default());
1109 };
1110
1111 let need_compat = !compat::has_same_columns_and_pk_encoding(
1112 &self.mapper,
1113 file_range_ctx.read_format(),
1114 self.compaction,
1115 );
1116 if need_compat {
1117 let compat = FlatCompatBatch::try_new(
1120 &self.mapper,
1121 file_range_ctx.read_format(),
1122 self.compaction,
1123 )?;
1124 file_range_ctx.set_compat_batch(compat);
1125 }
1126 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1127 }
1128
1129 #[tracing::instrument(
1133 skip(self, sources, semaphore),
1134 fields(
1135 region_id = %self.region_metadata().region_id,
1136 source_count = sources.len()
1137 )
1138 )]
1139 pub(crate) fn create_parallel_flat_sources(
1140 &self,
1141 sources: Vec<BoxedRecordBatchStream>,
1142 semaphore: Arc<Semaphore>,
1143 channel_size: usize,
1144 ) -> Result<Vec<BoxedRecordBatchStream>> {
1145 if sources.len() <= 1 {
1146 return Ok(sources);
1147 }
1148
1149 let sources = sources
1151 .into_iter()
1152 .map(|source| {
1153 let (sender, receiver) = mpsc::channel(channel_size);
1154 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1155 let stream = Box::pin(ReceiverStream::new(receiver));
1156 Box::pin(stream) as _
1157 })
1158 .collect();
1159 Ok(sources)
1160 }
1161
1162 #[tracing::instrument(
1164 skip(self, input, semaphore, sender),
1165 fields(region_id = %self.region_metadata().region_id)
1166 )]
1167 pub(crate) fn spawn_flat_scan_task(
1168 &self,
1169 mut input: BoxedRecordBatchStream,
1170 semaphore: Arc<Semaphore>,
1171 sender: mpsc::Sender<Result<RecordBatch>>,
1172 ) {
1173 let region_id = self.region_metadata().region_id;
1174 let span = tracing::info_span!(
1175 "ScanInput::parallel_scan_task",
1176 region_id = %region_id,
1177 stream_kind = "flat"
1178 );
1179 common_runtime::spawn_global(
1180 async move {
1181 loop {
1182 let maybe_batch = {
1185 let _permit = semaphore.acquire().await.unwrap();
1187 input.next().await
1188 };
1189 match maybe_batch {
1190 Some(Ok(batch)) => {
1191 let _ = sender.send(Ok(batch)).await;
1192 }
1193 Some(Err(e)) => {
1194 let _ = sender.send(Err(e)).await;
1195 break;
1196 }
1197 None => break,
1198 }
1199 }
1200 }
1201 .instrument(span),
1202 );
1203 }
1204
1205 pub(crate) fn total_rows(&self) -> usize {
1206 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1207 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1208
1209 let rows = rows_in_files + rows_in_memtables;
1210 #[cfg(feature = "enterprise")]
1211 let rows = rows
1212 + self
1213 .extension_ranges
1214 .iter()
1215 .map(|x| x.num_rows())
1216 .sum::<u64>() as usize;
1217 rows
1218 }
1219
1220 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1221 &self.predicate
1222 }
1223
1224 pub(crate) fn num_memtables(&self) -> usize {
1226 self.memtables.len()
1227 }
1228
1229 pub(crate) fn num_files(&self) -> usize {
1231 self.files.len()
1232 }
1233
1234 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1236 let file_index = index.index - self.num_memtables();
1237 &self.files[file_index]
1238 }
1239
1240 pub fn region_metadata(&self) -> &RegionMetadataRef {
1241 self.mapper.metadata()
1242 }
1243
1244 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1245 if source_count <= 1 {
1246 return PreFilterMode::All;
1251 }
1252
1253 pre_filter_mode(self.append_mode, self.merge_mode)
1254 }
1255}
1256
1257#[cfg(feature = "enterprise")]
1258impl ScanInput {
1259 #[must_use]
1260 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1261 Self {
1262 extension_ranges,
1263 ..self
1264 }
1265 }
1266
1267 #[cfg(feature = "enterprise")]
1268 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1269 &self.extension_ranges
1270 }
1271
1272 #[cfg(feature = "enterprise")]
1274 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1275 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1276 }
1277}
1278
1279#[cfg(test)]
1280impl ScanInput {
1281 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1283 self.files.iter().map(|file| file.file_id()).collect()
1284 }
1285
1286 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1287 self.files.iter().map(|file| file.index_id()).collect()
1288 }
1289}
1290
1291fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1292 if append_mode {
1293 return PreFilterMode::All;
1294 }
1295
1296 match merge_mode {
1297 MergeMode::LastRow => PreFilterMode::SkipFields,
1298 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1299 }
1300}
1301
1302pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1305 let eligible = !input.compaction
1306 && !input.files.is_empty()
1307 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1308
1309 if !eligible {
1310 return None;
1311 }
1312
1313 let metadata = input.region_metadata();
1314 let tag_names: HashSet<&str> = metadata
1315 .column_metadatas
1316 .iter()
1317 .filter(|col| col.semantic_type == SemanticType::Tag)
1318 .map(|col| col.column_schema.name.as_str())
1319 .collect();
1320
1321 let time_index = metadata.time_index_column();
1322 let time_index_name = time_index.column_schema.name.clone();
1323 let ts_col_unit = time_index
1324 .column_schema
1325 .data_type
1326 .as_timestamp()
1327 .expect("Time index must have timestamp-compatible type")
1328 .unit();
1329
1330 let exprs = input
1331 .predicate_group()
1332 .predicate_without_region()
1333 .map(|predicate| predicate.exprs())
1334 .unwrap_or_default();
1335
1336 let mut filters = Vec::new();
1337 let mut time_filters = Vec::new();
1338 let mut has_tag_filter = false;
1339 let mut columns = HashSet::new();
1340
1341 for expr in exprs {
1342 columns.clear();
1343 let is_time_only = match expr_to_columns(expr, &mut columns) {
1344 Ok(()) if !columns.is_empty() => {
1345 has_tag_filter |= columns
1346 .iter()
1347 .any(|col| tag_names.contains(col.name.as_str()));
1348 columns.iter().all(|col| col.name == time_index_name)
1349 }
1350 _ => false,
1351 };
1352
1353 if is_time_only
1364 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1365 {
1366 time_filters.push(expr.to_string());
1367 } else {
1368 filters.push(expr.to_string());
1369 }
1370 }
1371
1372 if !has_tag_filter {
1373 return None;
1375 }
1376
1377 filters.sort_unstable();
1379 time_filters.sort_unstable();
1380 let read_columns = input.read_cols.clone();
1381 Some(
1382 crate::read::range_cache::ScanRequestFingerprintBuilder {
1383 read_column_types: read_columns
1384 .column_ids_iter()
1385 .map(|id| {
1386 metadata
1387 .column_by_id(id)
1388 .map(|col| col.column_schema.data_type.clone())
1389 })
1390 .collect(),
1391 read_columns,
1392 filters,
1393 time_filters,
1394 series_row_selector: input.series_row_selector,
1395 append_mode: input.append_mode,
1396 filter_deleted: input.filter_deleted,
1397 merge_mode: input.merge_mode,
1398 partition_expr_version: metadata.partition_expr_version,
1399 }
1400 .build(),
1401 )
1402}
1403
1404pub struct StreamContext {
1407 pub input: ScanInput,
1409 pub(crate) ranges: Vec<RangeMeta>,
1411 #[allow(dead_code)]
1414 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1415
1416 pub(crate) query_start: Instant,
1419}
1420
1421impl StreamContext {
1422 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1424 let query_start = input.query_start.unwrap_or_else(Instant::now);
1425 let ranges = RangeMeta::seq_scan_ranges(&input);
1426 READ_SST_COUNT.observe(input.num_files() as f64);
1427 let scan_fingerprint = build_scan_fingerprint(&input);
1428
1429 Self {
1430 input,
1431 ranges,
1432 scan_fingerprint,
1433 query_start,
1434 }
1435 }
1436
1437 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1439 let query_start = input.query_start.unwrap_or_else(Instant::now);
1440 let ranges = RangeMeta::unordered_scan_ranges(&input);
1441 READ_SST_COUNT.observe(input.num_files() as f64);
1442 let scan_fingerprint = build_scan_fingerprint(&input);
1443
1444 Self {
1445 input,
1446 ranges,
1447 scan_fingerprint,
1448 query_start,
1449 }
1450 }
1451
1452 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1454 self.input.num_memtables() > index.index
1455 }
1456
1457 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1458 !self.is_mem_range_index(index)
1459 && index.index < self.input.num_files() + self.input.num_memtables()
1460 }
1461
1462 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1463 let range_meta = &self.ranges[part_range.identifier];
1464 let source_count = range_meta.indices.len();
1465
1466 self.input.range_pre_filter_mode(source_count)
1467 }
1468
1469 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1471 self.ranges
1472 .iter()
1473 .enumerate()
1474 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1475 .collect()
1476 }
1477
1478 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1480 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1481 for range_meta in &self.ranges {
1482 for idx in &range_meta.row_group_indices {
1483 if self.is_mem_range_index(*idx) {
1484 num_mem_ranges += 1;
1485 } else if self.is_file_range_index(*idx) {
1486 num_file_ranges += 1;
1487 } else {
1488 num_other_ranges += 1;
1489 }
1490 }
1491 }
1492 if verbose {
1493 write!(f, "{{")?;
1494 }
1495 write!(
1496 f,
1497 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1498 self.ranges.len(),
1499 num_mem_ranges,
1500 self.input.num_files(),
1501 num_file_ranges,
1502 )?;
1503 if num_other_ranges > 0 {
1504 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1505 }
1506 write!(f, "}}")?;
1507
1508 if let Some(selector) = &self.input.series_row_selector {
1509 write!(f, ", \"selector\":\"{}\"", selector)?;
1510 }
1511 if let Some(distribution) = &self.input.distribution {
1512 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1513 }
1514
1515 if verbose {
1516 self.format_verbose_content(f)?;
1517 }
1518
1519 Ok(())
1520 }
1521
1522 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1523 struct FileWrapper<'a> {
1524 file: &'a FileHandle,
1525 }
1526
1527 impl fmt::Debug for FileWrapper<'_> {
1528 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1529 let (start, end) = self.file.time_range();
1530 write!(
1531 f,
1532 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1533 self.file.file_id(),
1534 start.value(),
1535 start.unit(),
1536 end.value(),
1537 end.unit(),
1538 self.file.num_rows(),
1539 self.file.size(),
1540 self.file.index_size()
1541 )
1542 }
1543 }
1544
1545 struct InputWrapper<'a> {
1546 input: &'a ScanInput,
1547 }
1548
1549 #[cfg(feature = "enterprise")]
1550 impl InputWrapper<'_> {
1551 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1552 if self.input.extension_ranges.is_empty() {
1553 return Ok(());
1554 }
1555
1556 let mut delimiter = "";
1557 write!(f, ", extension_ranges: [")?;
1558 for range in self.input.extension_ranges() {
1559 write!(f, "{}{:?}", delimiter, range)?;
1560 delimiter = ", ";
1561 }
1562 write!(f, "]")?;
1563 Ok(())
1564 }
1565 }
1566
1567 impl fmt::Debug for InputWrapper<'_> {
1568 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1569 let output_schema = self.input.mapper.output_schema();
1570 if !output_schema.is_empty() {
1571 let names: Vec<_> = output_schema
1572 .column_schemas()
1573 .iter()
1574 .map(|col| &col.name)
1575 .collect();
1576 write!(f, ", \"projection\": {:?}", names)?;
1577 }
1578 if let Some(predicate) = &self.input.predicate.predicate() {
1579 if !predicate.exprs().is_empty() {
1580 let exprs: Vec<_> =
1581 predicate.exprs().iter().map(|e| e.to_string()).collect();
1582 write!(f, ", \"filters\": {:?}", exprs)?;
1583 }
1584 if !predicate.dyn_filters().is_empty() {
1585 let dyn_filters: Vec<_> = predicate
1586 .dyn_filters()
1587 .iter()
1588 .map(|f| format!("{}", f))
1589 .collect();
1590 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1591 }
1592 }
1593 #[cfg(feature = "vector_index")]
1594 if let Some(vector_index_k) = self.input.vector_index_k {
1595 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1596 }
1597 if !self.input.files.is_empty() {
1598 write!(f, ", \"files\": ")?;
1599 f.debug_list()
1600 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1601 .finish()?;
1602 }
1603 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1604 #[cfg(feature = "enterprise")]
1605 self.format_extension_ranges(f)?;
1606
1607 Ok(())
1608 }
1609 }
1610
1611 write!(f, "{:?}", InputWrapper { input: &self.input })
1612 }
1613
1614 pub(crate) fn add_dyn_filter_to_predicate(
1617 self: &Arc<Self>,
1618 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1619 ) -> Vec<bool> {
1620 let mut supported = Vec::with_capacity(filter_exprs.len());
1621 let filter_expr = filter_exprs
1622 .into_iter()
1623 .filter_map(|expr| {
1624 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1625 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1626 {
1627 supported.push(true);
1628 Some(dyn_filter)
1629 } else {
1630 supported.push(false);
1631 None
1632 }
1633 })
1634 .collect();
1635 self.input.predicate.add_dyn_filters(filter_expr);
1636 supported
1637 }
1638}
1639
1640#[derive(Clone, Default)]
1643pub struct PredicateGroup {
1644 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1645 predicate_all: Predicate,
1647 predicate_without_region: Predicate,
1649 region_partition_expr: Option<PartitionExpr>,
1651}
1652
1653impl PredicateGroup {
1654 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1656 let mut combined_exprs = exprs.to_vec();
1657 let mut region_partition_expr = None;
1658
1659 if let Some(expr_json) = metadata.partition_expr.as_ref()
1660 && !expr_json.is_empty()
1661 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1662 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1663 {
1664 let logical_expr = expr
1665 .try_as_logical_expr()
1666 .context(InvalidPartitionExprSnafu {
1667 expr: expr_json.clone(),
1668 })?;
1669
1670 combined_exprs.push(logical_expr);
1671 region_partition_expr = Some(expr);
1672 }
1673
1674 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1675 let mut columns = HashSet::new();
1677 for expr in &combined_exprs {
1678 columns.clear();
1679 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1680 continue;
1681 };
1682 time_filters.push(filter);
1683 }
1684 let time_filters = if time_filters.is_empty() {
1685 None
1686 } else {
1687 Some(Arc::new(time_filters))
1688 };
1689
1690 let predicate_all = Predicate::new(combined_exprs);
1691 let predicate_without_region = Predicate::new(exprs.to_vec());
1692
1693 Ok(Self {
1694 time_filters,
1695 predicate_all,
1696 predicate_without_region,
1697 region_partition_expr,
1698 })
1699 }
1700
1701 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1703 self.time_filters.clone()
1704 }
1705
1706 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1708 if self.predicate_all.is_empty() {
1709 None
1710 } else {
1711 Some(&self.predicate_all)
1712 }
1713 }
1714
1715 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1717 if self.predicate_without_region.is_empty() {
1718 None
1719 } else {
1720 Some(&self.predicate_without_region)
1721 }
1722 }
1723
1724 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1726 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1727 self.predicate_without_region.add_dyn_filters(dyn_filters);
1728 }
1729
1730 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1732 self.region_partition_expr.as_ref()
1733 }
1734
1735 fn expr_to_filter(
1736 expr: &Expr,
1737 metadata: &RegionMetadata,
1738 columns: &mut HashSet<Column>,
1739 ) -> Option<SimpleFilterEvaluator> {
1740 columns.clear();
1741 expr_to_columns(expr, columns).ok()?;
1744 if columns.len() > 1 {
1745 return None;
1747 }
1748 let column = columns.iter().next()?;
1749 let column_meta = metadata.column_by_name(&column.name)?;
1750 if column_meta.semantic_type == SemanticType::Timestamp {
1751 SimpleFilterEvaluator::try_new(expr)
1752 } else {
1753 None
1754 }
1755 }
1756}
1757
1758#[cfg(test)]
1759mod tests {
1760 use std::sync::Arc;
1761
1762 use datafusion::physical_plan::expressions::lit as physical_lit;
1763 use datafusion_common::ScalarValue;
1764 use datafusion_expr::{col, lit};
1765 use datatypes::value::Value;
1766 use partition::expr::col as partition_col;
1767 use store_api::metadata::RegionMetadataBuilder;
1768 use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
1769
1770 use super::*;
1771 use crate::cache::CacheManager;
1772 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1773 use crate::test_util::memtable_util::metadata_with_primary_key;
1774 use crate::test_util::scheduler_util::SchedulerEnv;
1775
1776 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1777 let env = SchedulerEnv::new().await;
1778 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1779 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1780 let file = FileHandle::new(
1781 crate::sst::file::FileMeta::default(),
1782 Arc::new(crate::sst::file_purger::NoopFilePurger),
1783 );
1784
1785 ScanInput::new(env.access_layer.clone(), mapper)
1786 .with_predicate(predicate)
1787 .with_cache(CacheStrategy::EnableAll(Arc::new(
1788 CacheManager::builder()
1789 .range_result_cache_size(1024)
1790 .build(),
1791 )))
1792 .with_files(vec![file])
1793 }
1794
1795 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1797 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1798 }
1799
1800 #[tokio::test]
1801 async fn test_build_scan_fingerprint_for_eligible_scan() {
1802 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1803 let input = new_scan_input(
1804 metadata.clone(),
1805 vec![
1806 col("ts").gt_eq(ts_lit(1000)),
1807 col("k0").eq(lit("foo")),
1808 col("v0").gt(lit(1)),
1809 ],
1810 )
1811 .await
1812 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1813 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1814 .with_merge_mode(MergeMode::LastNonNull)
1815 .with_filter_deleted(false);
1816
1817 let fingerprint = build_scan_fingerprint(&input).unwrap();
1818
1819 let expected = ScanRequestFingerprintBuilder {
1820 read_columns: input.read_cols,
1821 read_column_types: vec![
1822 metadata
1823 .column_by_id(0)
1824 .map(|col| col.column_schema.data_type.clone()),
1825 metadata
1826 .column_by_id(2)
1827 .map(|col| col.column_schema.data_type.clone()),
1828 metadata
1829 .column_by_id(3)
1830 .map(|col| col.column_schema.data_type.clone()),
1831 ],
1832 filters: vec![
1833 col("k0").eq(lit("foo")).to_string(),
1834 col("v0").gt(lit(1)).to_string(),
1835 ],
1836 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1837 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1838 append_mode: false,
1839 filter_deleted: false,
1840 merge_mode: MergeMode::LastNonNull,
1841 partition_expr_version: 0,
1842 }
1843 .build();
1844 assert_eq!(expected, fingerprint);
1845 }
1846
1847 #[tokio::test]
1848 async fn test_build_scan_fingerprint_requires_tag_filter() {
1849 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1850 let input = new_scan_input(
1851 metadata,
1852 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1853 )
1854 .await;
1855
1856 assert!(build_scan_fingerprint(&input).is_none());
1857 }
1858
1859 #[tokio::test]
1860 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1861 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1862 let filters = vec![col("k0").eq(lit("foo"))];
1863
1864 let disabled = ScanInput::new(
1865 SchedulerEnv::new().await.access_layer.clone(),
1866 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1867 )
1868 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1869 assert!(build_scan_fingerprint(&disabled).is_none());
1870
1871 let compaction = new_scan_input(metadata.clone(), filters.clone())
1872 .await
1873 .with_compaction(true);
1874 assert!(build_scan_fingerprint(&compaction).is_none());
1875
1876 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1878 assert!(build_scan_fingerprint(&no_files).is_none());
1879 }
1880
1881 #[tokio::test]
1882 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1883 let base = metadata_with_primary_key(vec![0, 1], false);
1884 let mut builder = RegionMetadataBuilder::from_existing(base);
1885 let partition_expr = partition_col("k0")
1886 .gt_eq(Value::String("foo".into()))
1887 .as_json_str()
1888 .unwrap();
1889 builder.partition_expr_json(Some(partition_expr));
1890 let metadata = Arc::new(builder.build_without_validation().unwrap());
1891
1892 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1893 let fingerprint = build_scan_fingerprint(&input).unwrap();
1894
1895 let expected = ScanRequestFingerprintBuilder {
1896 read_columns: input.read_cols,
1897 read_column_types: vec![
1898 metadata
1899 .column_by_id(0)
1900 .map(|col| col.column_schema.data_type.clone()),
1901 metadata
1902 .column_by_id(2)
1903 .map(|col| col.column_schema.data_type.clone()),
1904 metadata
1905 .column_by_id(3)
1906 .map(|col| col.column_schema.data_type.clone()),
1907 ],
1908 filters: vec![col("k0").eq(lit("foo")).to_string()],
1909 time_filters: vec![],
1910 series_row_selector: None,
1911 append_mode: false,
1912 filter_deleted: true,
1913 merge_mode: MergeMode::LastRow,
1914 partition_expr_version: metadata.partition_expr_version,
1915 }
1916 .build();
1917 assert_eq!(expected, fingerprint);
1918 assert_ne!(0, metadata.partition_expr_version);
1919 }
1920
1921 #[test]
1922 fn test_update_dyn_filters_with_empty_base_predicates() {
1923 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1924 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
1925 assert!(predicate_group.predicate().is_none());
1926 assert!(predicate_group.predicate_without_region().is_none());
1927
1928 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
1929 predicate_group.add_dyn_filters(vec![dyn_filter]);
1930
1931 let predicate_all = predicate_group.predicate().unwrap();
1932 assert!(predicate_all.exprs().is_empty());
1933 assert_eq!(1, predicate_all.dyn_filters().len());
1934
1935 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
1936 assert!(predicate_without_region.exprs().is_empty());
1937 assert_eq!(1, predicate_without_region.dyn_filters().len());
1938 }
1939
1940 #[tokio::test]
1941 async fn test_range_pre_filter_mode() {
1942 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1943 let cases = [
1944 (true, MergeMode::LastRow, 1, PreFilterMode::All),
1945 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
1946 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
1947 (true, MergeMode::LastRow, 2, PreFilterMode::All),
1948 ];
1949
1950 for (append_mode, merge_mode, source_count, expected_mode) in cases {
1951 let input = new_scan_input(metadata.clone(), vec![])
1952 .await
1953 .with_append_mode(append_mode)
1954 .with_merge_mode(merge_mode);
1955
1956 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
1957 }
1958 }
1959}