1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41 ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
42};
43use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
44use tokio::sync::{Semaphore, mpsc};
45use tokio_stream::wrappers::ReceiverStream;
46
47use crate::access_layer::AccessLayerRef;
48use crate::cache::CacheStrategy;
49use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
50use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
51#[cfg(feature = "enterprise")]
52use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
53use crate::memtable::{MemtableRange, RangesOptions};
54use crate::metrics::READ_SST_COUNT;
55use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
56use crate::read::projection::ProjectionMapper;
57use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
58use crate::read::range_cache::ScanRequestFingerprint;
59use crate::read::seq_scan::SeqScan;
60use crate::read::series_scan::SeriesScan;
61use crate::read::stream::ScanBatchStream;
62use crate::read::unordered_scan::UnorderedScan;
63use crate::read::{BoxedRecordBatchStream, RecordBatch};
64use crate::region::options::MergeMode;
65use crate::region::version::VersionRef;
66use crate::sst::file::FileHandle;
67use crate::sst::index::bloom_filter::applier::{
68 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
69};
70use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
71use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
72use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
73use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
74#[cfg(feature = "vector_index")]
75use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
76use crate::sst::parquet::file_range::PreFilterMode;
77use crate::sst::parquet::reader::ReaderMetrics;
78
79#[cfg(feature = "vector_index")]
80const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
81
82pub(crate) enum Scanner {
84 Seq(SeqScan),
86 Unordered(UnorderedScan),
88 Series(SeriesScan),
90}
91
92impl Scanner {
93 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
95 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96 match self {
97 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
98 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
99 Scanner::Series(series_scan) => series_scan.build_stream().await,
100 }
101 }
102
103 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
105 match self {
106 Scanner::Seq(x) => x.scan_all_partitions(),
107 Scanner::Unordered(x) => x.scan_all_partitions(),
108 Scanner::Series(x) => x.scan_all_partitions(),
109 }
110 }
111}
112
113#[cfg(test)]
114impl Scanner {
115 pub(crate) fn num_files(&self) -> usize {
117 match self {
118 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
119 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
120 Scanner::Series(series_scan) => series_scan.input().num_files(),
121 }
122 }
123
124 pub(crate) fn num_memtables(&self) -> usize {
126 match self {
127 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
128 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
129 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
130 }
131 }
132
133 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
135 match self {
136 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
137 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
138 Scanner::Series(series_scan) => series_scan.input().file_ids(),
139 }
140 }
141
142 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
143 match self {
144 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
145 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
146 Scanner::Series(series_scan) => series_scan.input().index_ids(),
147 }
148 }
149
150 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
152 use store_api::region_engine::{PrepareRequest, RegionScanner};
153
154 let request = PrepareRequest::default().with_target_partitions(target_partitions);
155 match self {
156 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
157 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
158 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
159 }
160 }
161}
162
163#[cfg_attr(doc, aquamarine::aquamarine)]
164pub(crate) struct ScanRegion {
214 version: VersionRef,
216 access_layer: AccessLayerRef,
218 request: ScanRequest,
220 cache_strategy: CacheStrategy,
222 max_concurrent_scan_files: usize,
224 ignore_inverted_index: bool,
226 ignore_fulltext_index: bool,
228 ignore_bloom_filter: bool,
230 start_time: Option<Instant>,
232 filter_deleted: bool,
235 #[cfg(feature = "enterprise")]
236 extension_range_provider: Option<BoxedExtensionRangeProvider>,
237}
238
239impl ScanRegion {
240 pub(crate) fn new(
242 version: VersionRef,
243 access_layer: AccessLayerRef,
244 request: ScanRequest,
245 cache_strategy: CacheStrategy,
246 ) -> ScanRegion {
247 ScanRegion {
248 version,
249 access_layer,
250 request,
251 cache_strategy,
252 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
253 ignore_inverted_index: false,
254 ignore_fulltext_index: false,
255 ignore_bloom_filter: false,
256 start_time: None,
257 filter_deleted: true,
258 #[cfg(feature = "enterprise")]
259 extension_range_provider: None,
260 }
261 }
262
263 #[must_use]
265 pub(crate) fn with_max_concurrent_scan_files(
266 mut self,
267 max_concurrent_scan_files: usize,
268 ) -> Self {
269 self.max_concurrent_scan_files = max_concurrent_scan_files;
270 self
271 }
272
273 #[must_use]
275 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
276 self.ignore_inverted_index = ignore;
277 self
278 }
279
280 #[must_use]
282 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
283 self.ignore_fulltext_index = ignore;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
290 self.ignore_bloom_filter = ignore;
291 self
292 }
293
294 #[must_use]
295 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
296 self.start_time = Some(now);
297 self
298 }
299
300 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
301 self.filter_deleted = filter_deleted;
302 }
303
304 #[cfg(feature = "enterprise")]
305 pub(crate) fn set_extension_range_provider(
306 &mut self,
307 extension_range_provider: BoxedExtensionRangeProvider,
308 ) {
309 self.extension_range_provider = Some(extension_range_provider);
310 }
311
312 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
314 pub(crate) async fn scanner(self) -> Result<Scanner> {
315 if self.use_series_scan() {
316 self.series_scan().await.map(Scanner::Series)
317 } else if self.use_unordered_scan() {
318 self.unordered_scan().await.map(Scanner::Unordered)
321 } else {
322 self.seq_scan().await.map(Scanner::Seq)
323 }
324 }
325
326 #[tracing::instrument(
328 level = tracing::Level::DEBUG,
329 skip_all,
330 fields(region_id = %self.region_id())
331 )]
332 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
333 if self.use_series_scan() {
334 self.series_scan()
335 .await
336 .map(|scanner| Box::new(scanner) as _)
337 } else if self.use_unordered_scan() {
338 self.unordered_scan()
339 .await
340 .map(|scanner| Box::new(scanner) as _)
341 } else {
342 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
343 }
344 }
345
346 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
348 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
349 let input = self.scan_input().await?.with_compaction(false);
350 Ok(SeqScan::new(input))
351 }
352
353 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
355 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
356 let input = self.scan_input().await?;
357 Ok(UnorderedScan::new(input))
358 }
359
360 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
363 let input = self.scan_input().await?;
364 Ok(SeriesScan::new(input))
365 }
366
367 fn use_unordered_scan(&self) -> bool {
369 self.version.options.append_mode
376 && self.request.series_row_selector.is_none()
377 && (self.request.distribution.is_none()
378 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
379 }
380
381 fn use_series_scan(&self) -> bool {
383 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
384 }
385
386 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
388 async fn scan_input(self) -> Result<ScanInput> {
389 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
390 let time_range = self.build_time_range_predicate();
391 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
392
393 let read_column_ids = match self.request.projection_indices() {
394 Some(p) => self.build_read_column_ids(p, &predicate)?,
395 None => self
396 .version
397 .metadata
398 .column_metadatas
399 .iter()
400 .map(|col| col.column_id)
401 .collect(),
402 };
403
404 let mapper = match self.request.projection_indices() {
406 Some(p) => ProjectionMapper::new_with_read_columns(
407 &self.version.metadata,
408 p.iter().copied(),
409 read_column_ids.clone(),
410 )?,
411 None => ProjectionMapper::all(&self.version.metadata)?,
412 };
413
414 let ssts = &self.version.ssts;
415 let mut files = Vec::new();
416 for level in ssts.levels() {
417 for file in level.files.values() {
418 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
419 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
420 (Some(_), None) => true,
426 (None, _) => true,
427 };
428
429 if exceed_min_sequence && file_in_range(file, &time_range) {
431 files.push(file.clone());
432 }
433 }
437 }
438
439 let memtables = self.version.memtables.list_memtables();
440 let mut mem_range_builders = Vec::new();
442 let filter_mode = pre_filter_mode(
443 self.version.options.append_mode,
444 self.version.options.merge_mode(),
445 );
446
447 for m in memtables {
448 let Some((start, end)) = m.stats().time_range() else {
450 continue;
451 };
452 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
454 if !memtable_range.intersects(&time_range) {
455 continue;
456 }
457 let ranges_in_memtable = m.ranges(
458 Some(read_column_ids.as_slice()),
459 RangesOptions::default()
460 .with_predicate(predicate.clone())
461 .with_sequence(SequenceRange::new(
462 self.request.memtable_min_sequence,
463 self.request.memtable_max_sequence,
464 ))
465 .with_pre_filter_mode(filter_mode),
466 )?;
467 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
468 let stats = v.stats().clone();
469 MemRangeBuilder::new(v, stats)
470 }));
471 }
472
473 let region_id = self.region_id();
474 debug!(
475 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
476 region_id,
477 self.request,
478 time_range,
479 mem_range_builders.len(),
480 files.len(),
481 self.version.options.append_mode,
482 );
483
484 let (non_field_filters, field_filters) = self.partition_by_field_filters();
485 let inverted_index_appliers = [
486 self.build_invereted_index_applier(&non_field_filters),
487 self.build_invereted_index_applier(&field_filters),
488 ];
489 let bloom_filter_appliers = [
490 self.build_bloom_filter_applier(&non_field_filters),
491 self.build_bloom_filter_applier(&field_filters),
492 ];
493 let fulltext_index_appliers = [
494 self.build_fulltext_index_applier(&non_field_filters),
495 self.build_fulltext_index_applier(&field_filters),
496 ];
497 #[cfg(feature = "vector_index")]
498 let vector_index_applier = self.build_vector_index_applier();
499 #[cfg(feature = "vector_index")]
500 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
501 if self.request.filters.is_empty() {
502 search.k
503 } else {
504 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
505 }
506 });
507
508 let input = ScanInput::new(self.access_layer, mapper)
509 .with_time_range(Some(time_range))
510 .with_predicate(predicate)
511 .with_memtables(mem_range_builders)
512 .with_files(files)
513 .with_cache(self.cache_strategy)
514 .with_inverted_index_appliers(inverted_index_appliers)
515 .with_bloom_filter_index_appliers(bloom_filter_appliers)
516 .with_fulltext_index_appliers(fulltext_index_appliers)
517 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
518 .with_start_time(self.start_time)
519 .with_append_mode(self.version.options.append_mode)
520 .with_filter_deleted(self.filter_deleted)
521 .with_merge_mode(self.version.options.merge_mode())
522 .with_series_row_selector(self.request.series_row_selector)
523 .with_distribution(self.request.distribution)
524 .with_explain_flat_format(
525 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
526 );
527 #[cfg(feature = "vector_index")]
528 let input = input
529 .with_vector_index_applier(vector_index_applier)
530 .with_vector_index_k(vector_index_k);
531
532 #[cfg(feature = "enterprise")]
533 let input = if let Some(provider) = self.extension_range_provider {
534 let ranges = provider
535 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
536 .await?;
537 debug!("Find extension ranges: {ranges:?}");
538 input.with_extension_ranges(ranges)
539 } else {
540 input
541 };
542 Ok(input)
543 }
544
545 fn region_id(&self) -> RegionId {
546 self.version.metadata.region_id
547 }
548
549 fn build_time_range_predicate(&self) -> TimestampRange {
551 let time_index = self.version.metadata.time_index_column();
552 let unit = time_index
553 .column_schema
554 .data_type
555 .as_timestamp()
556 .expect("Time index must have timestamp-compatible type")
557 .unit();
558 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
559 }
560
561 fn build_read_column_ids(
563 &self,
564 projection: &[usize],
565 predicate: &PredicateGroup,
566 ) -> Result<Vec<ColumnId>> {
567 let metadata = &self.version.metadata;
568 let mut read_column_ids = Vec::new();
570 let mut seen = HashSet::new();
571
572 for idx in projection {
573 let column =
574 metadata
575 .column_metadatas
576 .get(*idx)
577 .with_context(|| InvalidRequestSnafu {
578 region_id: metadata.region_id,
579 reason: format!("projection index {} is out of bound", idx),
580 })?;
581 seen.insert(column.column_id);
582 read_column_ids.push(column.column_id);
584 }
585
586 if projection.is_empty() {
587 let time_index = metadata.time_index_column().column_id;
588 if seen.insert(time_index) {
589 read_column_ids.push(time_index);
590 }
591 }
592
593 let mut extra_names = HashSet::new();
594 let mut columns = HashSet::new();
595
596 for expr in &self.request.filters {
597 columns.clear();
598 if expr_to_columns(expr, &mut columns).is_err() {
599 continue;
600 }
601 extra_names.extend(columns.iter().map(|column| column.name.clone()));
602 }
603
604 if let Some(expr) = predicate.region_partition_expr() {
605 expr.collect_column_names(&mut extra_names);
606 }
607
608 if !extra_names.is_empty() {
609 for column in &metadata.column_metadatas {
610 if extra_names.contains(column.column_schema.name.as_str())
611 && !seen.contains(&column.column_id)
612 {
613 read_column_ids.push(column.column_id);
614 }
615 extra_names.remove(column.column_schema.name.as_str());
616 }
617 if !extra_names.is_empty() {
618 warn!(
619 "Some columns in filters are not found in region {}: {:?}",
620 metadata.region_id, extra_names
621 );
622 }
623 }
624 Ok(read_column_ids)
625 }
626
627 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
630 let field_columns = self
631 .version
632 .metadata
633 .field_columns()
634 .map(|col| &col.column_schema.name)
635 .collect::<HashSet<_>>();
636
637 let mut columns = HashSet::new();
638
639 self.request.filters.iter().cloned().partition(|expr| {
640 columns.clear();
641 if expr_to_columns(expr, &mut columns).is_err() {
643 return true;
645 }
646 !columns
648 .iter()
649 .any(|column| field_columns.contains(&column.name))
650 })
651 }
652
653 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
655 if self.ignore_inverted_index {
656 return None;
657 }
658
659 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
660 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
661
662 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
663
664 InvertedIndexApplierBuilder::new(
665 self.access_layer.table_dir().to_string(),
666 self.access_layer.path_type(),
667 self.access_layer.object_store().clone(),
668 self.version.metadata.as_ref(),
669 self.version.metadata.inverted_indexed_column_ids(
670 self.version
671 .options
672 .index_options
673 .inverted_index
674 .ignore_column_ids
675 .iter(),
676 ),
677 self.access_layer.puffin_manager_factory().clone(),
678 )
679 .with_file_cache(file_cache)
680 .with_inverted_index_cache(inverted_index_cache)
681 .with_puffin_metadata_cache(puffin_metadata_cache)
682 .build(filters)
683 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
684 .ok()
685 .flatten()
686 .map(Arc::new)
687 }
688
689 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
691 if self.ignore_bloom_filter {
692 return None;
693 }
694
695 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
696 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
697 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
698
699 BloomFilterIndexApplierBuilder::new(
700 self.access_layer.table_dir().to_string(),
701 self.access_layer.path_type(),
702 self.access_layer.object_store().clone(),
703 self.version.metadata.as_ref(),
704 self.access_layer.puffin_manager_factory().clone(),
705 )
706 .with_file_cache(file_cache)
707 .with_bloom_filter_index_cache(bloom_filter_index_cache)
708 .with_puffin_metadata_cache(puffin_metadata_cache)
709 .build(filters)
710 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
711 .ok()
712 .flatten()
713 .map(Arc::new)
714 }
715
716 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
718 if self.ignore_fulltext_index {
719 return None;
720 }
721
722 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
723 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
724 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
725 FulltextIndexApplierBuilder::new(
726 self.access_layer.table_dir().to_string(),
727 self.access_layer.path_type(),
728 self.access_layer.object_store().clone(),
729 self.access_layer.puffin_manager_factory().clone(),
730 self.version.metadata.as_ref(),
731 )
732 .with_file_cache(file_cache)
733 .with_puffin_metadata_cache(puffin_metadata_cache)
734 .with_bloom_filter_cache(bloom_filter_index_cache)
735 .build(filters)
736 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
737 .ok()
738 .flatten()
739 .map(Arc::new)
740 }
741
742 #[cfg(feature = "vector_index")]
744 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
745 let vector_search = self.request.vector_search.as_ref()?;
746
747 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
748 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
749 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
750
751 let applier = VectorIndexApplier::new(
752 self.access_layer.table_dir().to_string(),
753 self.access_layer.path_type(),
754 self.access_layer.object_store().clone(),
755 self.access_layer.puffin_manager_factory().clone(),
756 vector_search.column_id,
757 vector_search.query_vector.clone(),
758 vector_search.metric,
759 )
760 .with_file_cache(file_cache)
761 .with_puffin_metadata_cache(puffin_metadata_cache)
762 .with_vector_index_cache(vector_index_cache);
763
764 Some(Arc::new(applier))
765 }
766}
767
768fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
770 if predicate == &TimestampRange::min_to_max() {
771 return true;
772 }
773 let (start, end) = file.time_range();
775 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
776 file_ts_range.intersects(predicate)
777}
778
779pub struct ScanInput {
781 access_layer: AccessLayerRef,
783 pub(crate) mapper: Arc<ProjectionMapper>,
785 pub(crate) read_column_ids: Vec<ColumnId>,
789 pub(crate) time_range: Option<TimestampRange>,
791 pub(crate) predicate: PredicateGroup,
793 region_partition_expr: Option<PartitionExpr>,
795 pub(crate) memtables: Vec<MemRangeBuilder>,
797 pub(crate) files: Vec<FileHandle>,
799 pub(crate) cache_strategy: CacheStrategy,
801 ignore_file_not_found: bool,
803 pub(crate) max_concurrent_scan_files: usize,
805 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
807 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
808 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
809 #[cfg(feature = "vector_index")]
811 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
812 #[cfg(feature = "vector_index")]
814 pub(crate) vector_index_k: Option<usize>,
815 pub(crate) query_start: Option<Instant>,
817 pub(crate) append_mode: bool,
819 pub(crate) filter_deleted: bool,
821 pub(crate) merge_mode: MergeMode,
823 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
825 pub(crate) distribution: Option<TimeSeriesDistribution>,
827 explain_flat_format: bool,
829 pub(crate) compaction: bool,
831 #[cfg(feature = "enterprise")]
832 extension_ranges: Vec<BoxedExtensionRange>,
833}
834
835impl ScanInput {
836 #[must_use]
838 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
839 ScanInput {
840 access_layer,
841 read_column_ids: mapper.column_ids().to_vec(),
842 mapper: Arc::new(mapper),
843 time_range: None,
844 predicate: PredicateGroup::default(),
845 region_partition_expr: None,
846 memtables: Vec::new(),
847 files: Vec::new(),
848 cache_strategy: CacheStrategy::Disabled,
849 ignore_file_not_found: false,
850 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
851 inverted_index_appliers: [None, None],
852 bloom_filter_index_appliers: [None, None],
853 fulltext_index_appliers: [None, None],
854 #[cfg(feature = "vector_index")]
855 vector_index_applier: None,
856 #[cfg(feature = "vector_index")]
857 vector_index_k: None,
858 query_start: None,
859 append_mode: false,
860 filter_deleted: true,
861 merge_mode: MergeMode::default(),
862 series_row_selector: None,
863 distribution: None,
864 explain_flat_format: false,
865 compaction: false,
866 #[cfg(feature = "enterprise")]
867 extension_ranges: Vec::new(),
868 }
869 }
870
871 #[must_use]
873 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
874 self.time_range = time_range;
875 self
876 }
877
878 #[must_use]
880 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
881 self.region_partition_expr = predicate.region_partition_expr().cloned();
882 self.predicate = predicate;
883 self
884 }
885
886 #[must_use]
888 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
889 self.memtables = memtables;
890 self
891 }
892
893 #[must_use]
895 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
896 self.files = files;
897 self
898 }
899
900 #[must_use]
902 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
903 self.cache_strategy = cache;
904 self
905 }
906
907 #[must_use]
909 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
910 self.ignore_file_not_found = ignore;
911 self
912 }
913
914 #[must_use]
916 pub(crate) fn with_max_concurrent_scan_files(
917 mut self,
918 max_concurrent_scan_files: usize,
919 ) -> Self {
920 self.max_concurrent_scan_files = max_concurrent_scan_files;
921 self
922 }
923
924 #[must_use]
926 pub(crate) fn with_inverted_index_appliers(
927 mut self,
928 appliers: [Option<InvertedIndexApplierRef>; 2],
929 ) -> Self {
930 self.inverted_index_appliers = appliers;
931 self
932 }
933
934 #[must_use]
936 pub(crate) fn with_bloom_filter_index_appliers(
937 mut self,
938 appliers: [Option<BloomFilterIndexApplierRef>; 2],
939 ) -> Self {
940 self.bloom_filter_index_appliers = appliers;
941 self
942 }
943
944 #[must_use]
946 pub(crate) fn with_fulltext_index_appliers(
947 mut self,
948 appliers: [Option<FulltextIndexApplierRef>; 2],
949 ) -> Self {
950 self.fulltext_index_appliers = appliers;
951 self
952 }
953
954 #[cfg(feature = "vector_index")]
956 #[must_use]
957 pub(crate) fn with_vector_index_applier(
958 mut self,
959 applier: Option<VectorIndexApplierRef>,
960 ) -> Self {
961 self.vector_index_applier = applier;
962 self
963 }
964
965 #[cfg(feature = "vector_index")]
967 #[must_use]
968 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
969 self.vector_index_k = k;
970 self
971 }
972
973 #[must_use]
975 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
976 self.query_start = now;
977 self
978 }
979
980 #[must_use]
981 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
982 self.append_mode = is_append_mode;
983 self
984 }
985
986 #[must_use]
988 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
989 self.filter_deleted = filter_deleted;
990 self
991 }
992
993 #[must_use]
995 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
996 self.merge_mode = merge_mode;
997 self
998 }
999
1000 #[must_use]
1002 pub(crate) fn with_distribution(
1003 mut self,
1004 distribution: Option<TimeSeriesDistribution>,
1005 ) -> Self {
1006 self.distribution = distribution;
1007 self
1008 }
1009
1010 #[must_use]
1012 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1013 self.explain_flat_format = explain_flat_format;
1014 self
1015 }
1016
1017 #[must_use]
1019 pub(crate) fn with_series_row_selector(
1020 mut self,
1021 series_row_selector: Option<TimeSeriesRowSelector>,
1022 ) -> Self {
1023 self.series_row_selector = series_row_selector;
1024 self
1025 }
1026
1027 #[must_use]
1029 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1030 self.compaction = compaction;
1031 self
1032 }
1033
1034 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1036 let memtable = &self.memtables[index.index];
1037 let mut ranges = SmallVec::new();
1038 memtable.build_ranges(index.row_group_index, &mut ranges);
1039 ranges
1040 }
1041
1042 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1043 if self.should_skip_region_partition(file) {
1044 self.predicate.predicate_without_region().cloned()
1045 } else {
1046 self.predicate.predicate().cloned()
1047 }
1048 }
1049
1050 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1051 match (
1052 self.region_partition_expr.as_ref(),
1053 file.meta_ref().partition_expr.as_ref(),
1054 ) {
1055 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1056 _ => false,
1057 }
1058 }
1059
1060 #[tracing::instrument(
1062 skip_all,
1063 fields(
1064 region_id = %self.region_metadata().region_id,
1065 file_id = %file.file_id()
1066 )
1067 )]
1068 pub async fn prune_file(
1069 &self,
1070 file: &FileHandle,
1071 reader_metrics: &mut ReaderMetrics,
1072 ) -> Result<FileRangeBuilder> {
1073 let predicate = self.predicate_for_file(file);
1074 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1075 let decode_pk_values = !self.compaction && self.mapper.has_tags();
1076 let reader = self
1077 .access_layer
1078 .read_sst(file.clone())
1079 .predicate(predicate)
1080 .projection(Some(self.read_column_ids.clone()))
1081 .cache(self.cache_strategy.clone())
1082 .inverted_index_appliers(self.inverted_index_appliers.clone())
1083 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1084 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1085 #[cfg(feature = "vector_index")]
1086 let reader = {
1087 let mut reader = reader;
1088 reader =
1089 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1090 reader
1091 };
1092 let res = reader
1093 .expected_metadata(Some(self.mapper.metadata().clone()))
1094 .compaction(self.compaction)
1095 .pre_filter_mode(filter_mode)
1096 .decode_primary_key_values(decode_pk_values)
1097 .build_reader_input(reader_metrics)
1098 .await;
1099 let read_input = match res {
1100 Ok(x) => x,
1101 Err(e) => {
1102 if e.is_object_not_found() && self.ignore_file_not_found {
1103 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1104 return Ok(FileRangeBuilder::default());
1105 } else {
1106 return Err(e);
1107 }
1108 }
1109 };
1110
1111 let Some((mut file_range_ctx, selection)) = read_input else {
1112 return Ok(FileRangeBuilder::default());
1113 };
1114
1115 let need_compat = !compat::has_same_columns_and_pk_encoding(
1116 self.mapper.metadata(),
1117 file_range_ctx.read_format().metadata(),
1118 );
1119 if need_compat {
1120 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1123 let mapper = self.mapper.as_flat().unwrap();
1124 FlatCompatBatch::try_new(
1125 mapper,
1126 flat_format.metadata(),
1127 flat_format.format_projection(),
1128 self.compaction,
1129 )?
1130 .map(CompatBatch::Flat)
1131 } else {
1132 let compact_batch = PrimaryKeyCompatBatch::new(
1133 &self.mapper,
1134 file_range_ctx.read_format().metadata().clone(),
1135 )?;
1136 Some(CompatBatch::PrimaryKey(compact_batch))
1137 };
1138 file_range_ctx.set_compat_batch(compat);
1139 }
1140 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1141 }
1142
1143 #[tracing::instrument(
1147 skip(self, sources, semaphore),
1148 fields(
1149 region_id = %self.region_metadata().region_id,
1150 source_count = sources.len()
1151 )
1152 )]
1153 pub(crate) fn create_parallel_flat_sources(
1154 &self,
1155 sources: Vec<BoxedRecordBatchStream>,
1156 semaphore: Arc<Semaphore>,
1157 channel_size: usize,
1158 ) -> Result<Vec<BoxedRecordBatchStream>> {
1159 if sources.len() <= 1 {
1160 return Ok(sources);
1161 }
1162
1163 let sources = sources
1165 .into_iter()
1166 .map(|source| {
1167 let (sender, receiver) = mpsc::channel(channel_size);
1168 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1169 let stream = Box::pin(ReceiverStream::new(receiver));
1170 Box::pin(stream) as _
1171 })
1172 .collect();
1173 Ok(sources)
1174 }
1175
1176 #[tracing::instrument(
1178 skip(self, input, semaphore, sender),
1179 fields(region_id = %self.region_metadata().region_id)
1180 )]
1181 pub(crate) fn spawn_flat_scan_task(
1182 &self,
1183 mut input: BoxedRecordBatchStream,
1184 semaphore: Arc<Semaphore>,
1185 sender: mpsc::Sender<Result<RecordBatch>>,
1186 ) {
1187 let region_id = self.region_metadata().region_id;
1188 let span = tracing::info_span!(
1189 "ScanInput::parallel_scan_task",
1190 region_id = %region_id,
1191 stream_kind = "flat"
1192 );
1193 common_runtime::spawn_global(
1194 async move {
1195 loop {
1196 let maybe_batch = {
1199 let _permit = semaphore.acquire().await.unwrap();
1201 input.next().await
1202 };
1203 match maybe_batch {
1204 Some(Ok(batch)) => {
1205 let _ = sender.send(Ok(batch)).await;
1206 }
1207 Some(Err(e)) => {
1208 let _ = sender.send(Err(e)).await;
1209 break;
1210 }
1211 None => break,
1212 }
1213 }
1214 }
1215 .instrument(span),
1216 );
1217 }
1218
1219 pub(crate) fn total_rows(&self) -> usize {
1220 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1221 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1222
1223 let rows = rows_in_files + rows_in_memtables;
1224 #[cfg(feature = "enterprise")]
1225 let rows = rows
1226 + self
1227 .extension_ranges
1228 .iter()
1229 .map(|x| x.num_rows())
1230 .sum::<u64>() as usize;
1231 rows
1232 }
1233
1234 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1235 &self.predicate
1236 }
1237
1238 pub(crate) fn num_memtables(&self) -> usize {
1240 self.memtables.len()
1241 }
1242
1243 pub(crate) fn num_files(&self) -> usize {
1245 self.files.len()
1246 }
1247
1248 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1250 let file_index = index.index - self.num_memtables();
1251 &self.files[file_index]
1252 }
1253
1254 pub fn region_metadata(&self) -> &RegionMetadataRef {
1255 self.mapper.metadata()
1256 }
1257}
1258
1259#[cfg(feature = "enterprise")]
1260impl ScanInput {
1261 #[must_use]
1262 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1263 Self {
1264 extension_ranges,
1265 ..self
1266 }
1267 }
1268
1269 #[cfg(feature = "enterprise")]
1270 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1271 &self.extension_ranges
1272 }
1273
1274 #[cfg(feature = "enterprise")]
1276 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1277 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1278 }
1279}
1280
1281#[cfg(test)]
1282impl ScanInput {
1283 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1285 self.files.iter().map(|file| file.file_id()).collect()
1286 }
1287
1288 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1289 self.files.iter().map(|file| file.index_id()).collect()
1290 }
1291}
1292
1293fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1294 if append_mode {
1295 return PreFilterMode::All;
1296 }
1297
1298 match merge_mode {
1299 MergeMode::LastRow => PreFilterMode::SkipFields,
1300 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1301 }
1302}
1303
1304pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1307 let eligible = !input.compaction
1308 && !input.files.is_empty()
1309 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1310
1311 if !eligible {
1312 return None;
1313 }
1314
1315 let metadata = input.region_metadata();
1316 let tag_names: HashSet<&str> = metadata
1317 .column_metadatas
1318 .iter()
1319 .filter(|col| col.semantic_type == SemanticType::Tag)
1320 .map(|col| col.column_schema.name.as_str())
1321 .collect();
1322
1323 let time_index = metadata.time_index_column();
1324 let time_index_name = time_index.column_schema.name.clone();
1325 let ts_col_unit = time_index
1326 .column_schema
1327 .data_type
1328 .as_timestamp()
1329 .expect("Time index must have timestamp-compatible type")
1330 .unit();
1331
1332 let exprs = input
1333 .predicate_group()
1334 .predicate_without_region()
1335 .map(|predicate| predicate.exprs())
1336 .unwrap_or_default();
1337
1338 let mut filters = Vec::new();
1339 let mut time_filters = Vec::new();
1340 let mut has_tag_filter = false;
1341 let mut columns = HashSet::new();
1342
1343 for expr in exprs {
1344 columns.clear();
1345 let is_time_only = match expr_to_columns(expr, &mut columns) {
1346 Ok(()) if !columns.is_empty() => {
1347 has_tag_filter |= columns
1348 .iter()
1349 .any(|col| tag_names.contains(col.name.as_str()));
1350 columns.iter().all(|col| col.name == time_index_name)
1351 }
1352 _ => false,
1353 };
1354
1355 if is_time_only
1356 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1357 {
1358 time_filters.push(expr.to_string());
1361 } else {
1362 filters.push(expr.to_string());
1366 }
1367 }
1368
1369 if !has_tag_filter {
1370 return None;
1372 }
1373
1374 filters.sort_unstable();
1376 time_filters.sort_unstable();
1377
1378 Some(
1379 crate::read::range_cache::ScanRequestFingerprintBuilder {
1380 read_column_ids: input.read_column_ids.clone(),
1381 read_column_types: input
1382 .read_column_ids
1383 .iter()
1384 .map(|id| {
1385 metadata
1386 .column_by_id(*id)
1387 .map(|col| col.column_schema.data_type.clone())
1388 })
1389 .collect(),
1390 filters,
1391 time_filters,
1392 series_row_selector: input.series_row_selector,
1393 append_mode: input.append_mode,
1394 filter_deleted: input.filter_deleted,
1395 merge_mode: input.merge_mode,
1396 partition_expr_version: metadata.partition_expr_version,
1397 }
1398 .build(),
1399 )
1400}
1401
1402pub struct StreamContext {
1405 pub input: ScanInput,
1407 pub(crate) ranges: Vec<RangeMeta>,
1409 #[allow(dead_code)]
1412 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1413
1414 pub(crate) query_start: Instant,
1417}
1418
1419impl StreamContext {
1420 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1422 let query_start = input.query_start.unwrap_or_else(Instant::now);
1423 let ranges = RangeMeta::seq_scan_ranges(&input);
1424 READ_SST_COUNT.observe(input.num_files() as f64);
1425 let scan_fingerprint = build_scan_fingerprint(&input);
1426
1427 Self {
1428 input,
1429 ranges,
1430 scan_fingerprint,
1431 query_start,
1432 }
1433 }
1434
1435 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1437 let query_start = input.query_start.unwrap_or_else(Instant::now);
1438 let ranges = RangeMeta::unordered_scan_ranges(&input);
1439 READ_SST_COUNT.observe(input.num_files() as f64);
1440 let scan_fingerprint = build_scan_fingerprint(&input);
1441
1442 Self {
1443 input,
1444 ranges,
1445 scan_fingerprint,
1446 query_start,
1447 }
1448 }
1449
1450 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1452 self.input.num_memtables() > index.index
1453 }
1454
1455 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1456 !self.is_mem_range_index(index)
1457 && index.index < self.input.num_files() + self.input.num_memtables()
1458 }
1459
1460 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1462 self.ranges
1463 .iter()
1464 .enumerate()
1465 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1466 .collect()
1467 }
1468
1469 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1471 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1472 for range_meta in &self.ranges {
1473 for idx in &range_meta.row_group_indices {
1474 if self.is_mem_range_index(*idx) {
1475 num_mem_ranges += 1;
1476 } else if self.is_file_range_index(*idx) {
1477 num_file_ranges += 1;
1478 } else {
1479 num_other_ranges += 1;
1480 }
1481 }
1482 }
1483 if verbose {
1484 write!(f, "{{")?;
1485 }
1486 write!(
1487 f,
1488 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1489 self.ranges.len(),
1490 num_mem_ranges,
1491 self.input.num_files(),
1492 num_file_ranges,
1493 )?;
1494 if num_other_ranges > 0 {
1495 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1496 }
1497 write!(f, "}}")?;
1498
1499 if let Some(selector) = &self.input.series_row_selector {
1500 write!(f, ", \"selector\":\"{}\"", selector)?;
1501 }
1502 if let Some(distribution) = &self.input.distribution {
1503 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1504 }
1505
1506 if verbose {
1507 self.format_verbose_content(f)?;
1508 }
1509
1510 Ok(())
1511 }
1512
1513 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1514 struct FileWrapper<'a> {
1515 file: &'a FileHandle,
1516 }
1517
1518 impl fmt::Debug for FileWrapper<'_> {
1519 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1520 let (start, end) = self.file.time_range();
1521 write!(
1522 f,
1523 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1524 self.file.file_id(),
1525 start.value(),
1526 start.unit(),
1527 end.value(),
1528 end.unit(),
1529 self.file.num_rows(),
1530 self.file.size(),
1531 self.file.index_size()
1532 )
1533 }
1534 }
1535
1536 struct InputWrapper<'a> {
1537 input: &'a ScanInput,
1538 }
1539
1540 #[cfg(feature = "enterprise")]
1541 impl InputWrapper<'_> {
1542 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1543 if self.input.extension_ranges.is_empty() {
1544 return Ok(());
1545 }
1546
1547 let mut delimiter = "";
1548 write!(f, ", extension_ranges: [")?;
1549 for range in self.input.extension_ranges() {
1550 write!(f, "{}{:?}", delimiter, range)?;
1551 delimiter = ", ";
1552 }
1553 write!(f, "]")?;
1554 Ok(())
1555 }
1556 }
1557
1558 impl fmt::Debug for InputWrapper<'_> {
1559 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1560 let output_schema = self.input.mapper.output_schema();
1561 if !output_schema.is_empty() {
1562 let names: Vec<_> = output_schema
1563 .column_schemas()
1564 .iter()
1565 .map(|col| &col.name)
1566 .collect();
1567 write!(f, ", \"projection\": {:?}", names)?;
1568 }
1569 if let Some(predicate) = &self.input.predicate.predicate() {
1570 if !predicate.exprs().is_empty() {
1571 let exprs: Vec<_> =
1572 predicate.exprs().iter().map(|e| e.to_string()).collect();
1573 write!(f, ", \"filters\": {:?}", exprs)?;
1574 }
1575 if !predicate.dyn_filters().is_empty() {
1576 let dyn_filters: Vec<_> = predicate
1577 .dyn_filters()
1578 .iter()
1579 .map(|f| format!("{}", f))
1580 .collect();
1581 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1582 }
1583 }
1584 #[cfg(feature = "vector_index")]
1585 if let Some(vector_index_k) = self.input.vector_index_k {
1586 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1587 }
1588 if !self.input.files.is_empty() {
1589 write!(f, ", \"files\": ")?;
1590 f.debug_list()
1591 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1592 .finish()?;
1593 }
1594 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1595 #[cfg(feature = "enterprise")]
1596 self.format_extension_ranges(f)?;
1597
1598 Ok(())
1599 }
1600 }
1601
1602 write!(f, "{:?}", InputWrapper { input: &self.input })
1603 }
1604
1605 pub(crate) fn add_dyn_filter_to_predicate(
1608 self: &Arc<Self>,
1609 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1610 ) -> Vec<bool> {
1611 let mut supported = Vec::with_capacity(filter_exprs.len());
1612 let filter_expr = filter_exprs
1613 .into_iter()
1614 .filter_map(|expr| {
1615 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1616 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1617 {
1618 supported.push(true);
1619 Some(dyn_filter)
1620 } else {
1621 supported.push(false);
1622 None
1623 }
1624 })
1625 .collect();
1626 self.input.predicate.add_dyn_filters(filter_expr);
1627 supported
1628 }
1629}
1630
1631#[derive(Clone, Default)]
1634pub struct PredicateGroup {
1635 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1636 predicate_all: Predicate,
1638 predicate_without_region: Predicate,
1640 region_partition_expr: Option<PartitionExpr>,
1642}
1643
1644impl PredicateGroup {
1645 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1647 let mut combined_exprs = exprs.to_vec();
1648 let mut region_partition_expr = None;
1649
1650 if let Some(expr_json) = metadata.partition_expr.as_ref()
1651 && !expr_json.is_empty()
1652 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1653 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1654 {
1655 let logical_expr = expr
1656 .try_as_logical_expr()
1657 .context(InvalidPartitionExprSnafu {
1658 expr: expr_json.clone(),
1659 })?;
1660
1661 combined_exprs.push(logical_expr);
1662 region_partition_expr = Some(expr);
1663 }
1664
1665 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1666 let mut columns = HashSet::new();
1668 for expr in &combined_exprs {
1669 columns.clear();
1670 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1671 continue;
1672 };
1673 time_filters.push(filter);
1674 }
1675 let time_filters = if time_filters.is_empty() {
1676 None
1677 } else {
1678 Some(Arc::new(time_filters))
1679 };
1680
1681 let predicate_all = Predicate::new(combined_exprs);
1682 let predicate_without_region = Predicate::new(exprs.to_vec());
1683
1684 Ok(Self {
1685 time_filters,
1686 predicate_all,
1687 predicate_without_region,
1688 region_partition_expr,
1689 })
1690 }
1691
1692 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1694 self.time_filters.clone()
1695 }
1696
1697 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1699 if self.predicate_all.is_empty() {
1700 None
1701 } else {
1702 Some(&self.predicate_all)
1703 }
1704 }
1705
1706 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1708 if self.predicate_without_region.is_empty() {
1709 None
1710 } else {
1711 Some(&self.predicate_without_region)
1712 }
1713 }
1714
1715 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1717 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1718 self.predicate_without_region.add_dyn_filters(dyn_filters);
1719 }
1720
1721 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1723 self.region_partition_expr.as_ref()
1724 }
1725
1726 fn expr_to_filter(
1727 expr: &Expr,
1728 metadata: &RegionMetadata,
1729 columns: &mut HashSet<Column>,
1730 ) -> Option<SimpleFilterEvaluator> {
1731 columns.clear();
1732 expr_to_columns(expr, columns).ok()?;
1735 if columns.len() > 1 {
1736 return None;
1738 }
1739 let column = columns.iter().next()?;
1740 let column_meta = metadata.column_by_name(&column.name)?;
1741 if column_meta.semantic_type == SemanticType::Timestamp {
1742 SimpleFilterEvaluator::try_new(expr)
1743 } else {
1744 None
1745 }
1746 }
1747}
1748
1749#[cfg(test)]
1750mod tests {
1751 use std::sync::Arc;
1752
1753 use datafusion::physical_plan::expressions::lit as physical_lit;
1754 use datafusion_common::ScalarValue;
1755 use datafusion_expr::{col, lit};
1756 use datatypes::value::Value;
1757 use partition::expr::col as partition_col;
1758 use store_api::metadata::RegionMetadataBuilder;
1759 use store_api::storage::{
1760 ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
1761 };
1762
1763 use super::*;
1764 use crate::cache::CacheManager;
1765 use crate::memtable::time_partition::TimePartitions;
1766 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1767 use crate::region::version::VersionBuilder;
1768 use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1769 use crate::test_util::scheduler_util::SchedulerEnv;
1770
1771 fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1772 let mutable = Arc::new(TimePartitions::new(
1773 metadata.clone(),
1774 Arc::new(EmptyMemtableBuilder::default()),
1775 0,
1776 None,
1777 ));
1778 Arc::new(VersionBuilder::new(metadata, mutable).build())
1779 }
1780
1781 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1782 let env = SchedulerEnv::new().await;
1783 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1784 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1785 let file = FileHandle::new(
1786 crate::sst::file::FileMeta::default(),
1787 Arc::new(crate::sst::file_purger::NoopFilePurger),
1788 );
1789
1790 ScanInput::new(env.access_layer.clone(), mapper)
1791 .with_predicate(predicate)
1792 .with_cache(CacheStrategy::EnableAll(Arc::new(
1793 CacheManager::builder()
1794 .range_result_cache_size(1024)
1795 .build(),
1796 )))
1797 .with_files(vec![file])
1798 }
1799
1800 #[tokio::test]
1801 async fn test_build_read_column_ids_includes_filters() {
1802 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1803 let version = new_version(metadata.clone());
1804 let env = SchedulerEnv::new().await;
1805 let request = ScanRequest {
1806 projection_input: Some(vec![4].into()),
1807 filters: vec![
1808 col("v0").gt(lit(1)),
1809 col("ts").gt(lit(0)),
1810 col("k0").eq(lit("foo")),
1811 ],
1812 ..Default::default()
1813 };
1814 let scan_region = ScanRegion::new(
1815 version,
1816 env.access_layer.clone(),
1817 request,
1818 CacheStrategy::Disabled,
1819 );
1820 let predicate =
1821 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1822 let projection = &scan_region.request.projection_indices().unwrap();
1823 let read_ids = scan_region
1824 .build_read_column_ids(projection, &predicate)
1825 .unwrap();
1826 assert_eq!(vec![4, 0, 2, 3], read_ids);
1827 }
1828
1829 #[tokio::test]
1830 async fn test_build_read_column_ids_empty_projection() {
1831 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1832 let version = new_version(metadata.clone());
1833 let env = SchedulerEnv::new().await;
1834 let request = ScanRequest {
1835 projection_input: Some(ProjectionInput::default()),
1836 ..Default::default()
1837 };
1838 let scan_region = ScanRegion::new(
1839 version,
1840 env.access_layer.clone(),
1841 request,
1842 CacheStrategy::Disabled,
1843 );
1844 let predicate =
1845 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1846 let projection = &scan_region.request.projection_indices().unwrap();
1847 let read_ids = scan_region
1848 .build_read_column_ids(projection, &predicate)
1849 .unwrap();
1850 assert_eq!(vec![2], read_ids);
1852 }
1853
1854 #[tokio::test]
1855 async fn test_build_read_column_ids_keeps_projection_order() {
1856 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1857 let version = new_version(metadata.clone());
1858 let env = SchedulerEnv::new().await;
1859 let request = ScanRequest {
1860 projection_input: Some(vec![4, 1].into()),
1861 filters: vec![col("v0").gt(lit(1))],
1862 ..Default::default()
1863 };
1864 let scan_region = ScanRegion::new(
1865 version,
1866 env.access_layer.clone(),
1867 request,
1868 CacheStrategy::Disabled,
1869 );
1870 let predicate =
1871 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1872 let projection = &scan_region.request.projection_indices().unwrap();
1873 let read_ids = scan_region
1874 .build_read_column_ids(projection, &predicate)
1875 .unwrap();
1876 assert_eq!(vec![4, 1, 3], read_ids);
1878 }
1879
1880 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1882 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1883 }
1884
1885 #[tokio::test]
1886 async fn test_build_scan_fingerprint_for_eligible_scan() {
1887 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1888 let input = new_scan_input(
1889 metadata.clone(),
1890 vec![
1891 col("ts").gt_eq(ts_lit(1000)),
1892 col("k0").eq(lit("foo")),
1893 col("v0").gt(lit(1)),
1894 ],
1895 )
1896 .await
1897 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1898 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1899 .with_merge_mode(MergeMode::LastNonNull)
1900 .with_filter_deleted(false);
1901
1902 let fingerprint = build_scan_fingerprint(&input).unwrap();
1903
1904 let expected = ScanRequestFingerprintBuilder {
1905 read_column_ids: input.read_column_ids.clone(),
1906 read_column_types: vec![
1907 metadata
1908 .column_by_id(0)
1909 .map(|col| col.column_schema.data_type.clone()),
1910 metadata
1911 .column_by_id(2)
1912 .map(|col| col.column_schema.data_type.clone()),
1913 metadata
1914 .column_by_id(3)
1915 .map(|col| col.column_schema.data_type.clone()),
1916 ],
1917 filters: vec![
1918 col("k0").eq(lit("foo")).to_string(),
1919 col("v0").gt(lit(1)).to_string(),
1920 ],
1921 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1922 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1923 append_mode: false,
1924 filter_deleted: false,
1925 merge_mode: MergeMode::LastNonNull,
1926 partition_expr_version: 0,
1927 }
1928 .build();
1929 assert_eq!(expected, fingerprint);
1930 }
1931
1932 #[tokio::test]
1933 async fn test_build_scan_fingerprint_requires_tag_filter() {
1934 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1935 let input = new_scan_input(
1936 metadata,
1937 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1938 )
1939 .await;
1940
1941 assert!(build_scan_fingerprint(&input).is_none());
1942 }
1943
1944 #[tokio::test]
1945 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1946 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1947 let filters = vec![col("k0").eq(lit("foo"))];
1948
1949 let disabled = ScanInput::new(
1950 SchedulerEnv::new().await.access_layer.clone(),
1951 ProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1952 )
1953 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1954 assert!(build_scan_fingerprint(&disabled).is_none());
1955
1956 let compaction = new_scan_input(metadata.clone(), filters.clone())
1957 .await
1958 .with_compaction(true);
1959 assert!(build_scan_fingerprint(&compaction).is_none());
1960
1961 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1963 assert!(build_scan_fingerprint(&no_files).is_none());
1964 }
1965
1966 #[tokio::test]
1967 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1968 let base = metadata_with_primary_key(vec![0, 1], false);
1969 let mut builder = RegionMetadataBuilder::from_existing(base);
1970 let partition_expr = partition_col("k0")
1971 .gt_eq(Value::String("foo".into()))
1972 .as_json_str()
1973 .unwrap();
1974 builder.partition_expr_json(Some(partition_expr));
1975 let metadata = Arc::new(builder.build_without_validation().unwrap());
1976
1977 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1978 let fingerprint = build_scan_fingerprint(&input).unwrap();
1979
1980 let expected = ScanRequestFingerprintBuilder {
1981 read_column_ids: input.read_column_ids.clone(),
1982 read_column_types: vec![
1983 metadata
1984 .column_by_id(0)
1985 .map(|col| col.column_schema.data_type.clone()),
1986 metadata
1987 .column_by_id(2)
1988 .map(|col| col.column_schema.data_type.clone()),
1989 metadata
1990 .column_by_id(3)
1991 .map(|col| col.column_schema.data_type.clone()),
1992 ],
1993 filters: vec![col("k0").eq(lit("foo")).to_string()],
1994 time_filters: vec![],
1995 series_row_selector: None,
1996 append_mode: false,
1997 filter_deleted: true,
1998 merge_mode: MergeMode::LastRow,
1999 partition_expr_version: metadata.partition_expr_version,
2000 }
2001 .build();
2002 assert_eq!(expected, fingerprint);
2003 assert_ne!(0, metadata.partition_expr_version);
2004 }
2005
2006 #[test]
2007 fn test_update_dyn_filters_with_empty_base_predicates() {
2008 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2009 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2010 assert!(predicate_group.predicate().is_none());
2011 assert!(predicate_group.predicate_without_region().is_none());
2012
2013 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2014 predicate_group.add_dyn_filters(vec![dyn_filter]);
2015
2016 let predicate_all = predicate_group.predicate().unwrap();
2017 assert!(predicate_all.exprs().is_empty());
2018 assert_eq!(1, predicate_all.dyn_filters().len());
2019
2020 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2021 assert!(predicate_without_region.exprs().is_empty());
2022 assert_eq!(1, predicate_without_region.dyn_filters().len());
2023 }
2024}