1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::schema::ext::ArrowSchemaExt;
35use futures::StreamExt;
36use itertools::Itertools;
37use partition::expr::PartitionExpr;
38use smallvec::SmallVec;
39use snafu::ResultExt;
40use store_api::metadata::{RegionMetadata, RegionMetadataRef};
41use store_api::region_engine::{PartitionRange, RegionScannerRef};
42use store_api::storage::{
43 RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
44 TimeSeriesRowSelector,
45};
46use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
47use tokio::sync::{Semaphore, mpsc};
48use tokio_stream::wrappers::ReceiverStream;
49
50use crate::access_layer::AccessLayerRef;
51use crate::cache::CacheStrategy;
52use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
53use crate::error::{InvalidPartitionExprSnafu, Result};
54#[cfg(feature = "enterprise")]
55use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
56use crate::memtable::{MemtableRange, RangesOptions};
57use crate::metrics::READ_SST_COUNT;
58use crate::read::compat::{self, FlatCompatBatch};
59use crate::read::flat_projection::FlatProjectionMapper;
60use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
61use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
62use crate::read::read_columns::{
63 ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
64};
65use crate::read::seq_scan::SeqScan;
66use crate::read::series_scan::SeriesScan;
67use crate::read::stream::ScanBatchStream;
68use crate::read::unordered_scan::UnorderedScan;
69use crate::read::{BoxedRecordBatchStream, RecordBatch};
70use crate::region::options::MergeMode;
71use crate::region::version::VersionRef;
72use crate::sst::file::FileHandle;
73use crate::sst::index::bloom_filter::applier::{
74 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
75};
76use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
77use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
78use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
79use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
80#[cfg(feature = "vector_index")]
81use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
82use crate::sst::parquet::file_range::PreFilterMode;
83use crate::sst::parquet::reader::ReaderMetrics;
84
85#[cfg(feature = "vector_index")]
86const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
87
88pub(crate) enum Scanner {
90 Seq(SeqScan),
92 Unordered(UnorderedScan),
94 Series(SeriesScan),
96}
97
98impl Scanner {
99 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
101 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
102 match self {
103 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
104 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
105 Scanner::Series(series_scan) => series_scan.build_stream().await,
106 }
107 }
108
109 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
111 match self {
112 Scanner::Seq(x) => x.scan_all_partitions(),
113 Scanner::Unordered(x) => x.scan_all_partitions(),
114 Scanner::Series(x) => x.scan_all_partitions(),
115 }
116 }
117}
118
119#[cfg(test)]
120impl Scanner {
121 pub(crate) fn num_files(&self) -> usize {
123 match self {
124 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
125 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
126 Scanner::Series(series_scan) => series_scan.input().num_files(),
127 }
128 }
129
130 pub(crate) fn num_memtables(&self) -> usize {
132 match self {
133 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
134 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
135 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
136 }
137 }
138
139 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
141 match self {
142 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
143 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
144 Scanner::Series(series_scan) => series_scan.input().file_ids(),
145 }
146 }
147
148 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
149 match self {
150 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
151 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
152 Scanner::Series(series_scan) => series_scan.input().index_ids(),
153 }
154 }
155
156 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
157 match self {
158 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
159 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
160 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
161 }
162 }
163
164 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
166 use store_api::region_engine::{PrepareRequest, RegionScanner};
167
168 let request = PrepareRequest::default().with_target_partitions(target_partitions);
169 match self {
170 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
171 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
172 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
173 }
174 }
175}
176
177#[cfg_attr(doc, aquamarine::aquamarine)]
178pub(crate) struct ScanRegion {
228 version: VersionRef,
230 access_layer: AccessLayerRef,
232 request: ScanRequest,
234 cache_strategy: CacheStrategy,
236 max_concurrent_scan_files: usize,
238 ignore_inverted_index: bool,
240 ignore_fulltext_index: bool,
242 ignore_bloom_filter: bool,
244 start_time: Option<Instant>,
246 filter_deleted: bool,
249 #[cfg(feature = "enterprise")]
250 extension_range_provider: Option<BoxedExtensionRangeProvider>,
251}
252
253impl ScanRegion {
254 pub(crate) fn new(
256 version: VersionRef,
257 access_layer: AccessLayerRef,
258 request: ScanRequest,
259 cache_strategy: CacheStrategy,
260 ) -> ScanRegion {
261 ScanRegion {
262 version,
263 access_layer,
264 request,
265 cache_strategy,
266 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
267 ignore_inverted_index: false,
268 ignore_fulltext_index: false,
269 ignore_bloom_filter: false,
270 start_time: None,
271 filter_deleted: true,
272 #[cfg(feature = "enterprise")]
273 extension_range_provider: None,
274 }
275 }
276
277 #[must_use]
279 pub(crate) fn with_max_concurrent_scan_files(
280 mut self,
281 max_concurrent_scan_files: usize,
282 ) -> Self {
283 self.max_concurrent_scan_files = max_concurrent_scan_files;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290 self.ignore_inverted_index = ignore;
291 self
292 }
293
294 #[must_use]
296 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297 self.ignore_fulltext_index = ignore;
298 self
299 }
300
301 #[must_use]
303 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304 self.ignore_bloom_filter = ignore;
305 self
306 }
307
308 #[must_use]
309 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310 self.start_time = Some(now);
311 self
312 }
313
314 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315 self.filter_deleted = filter_deleted;
316 }
317
318 #[cfg(feature = "enterprise")]
319 pub(crate) fn set_extension_range_provider(
320 &mut self,
321 extension_range_provider: BoxedExtensionRangeProvider,
322 ) {
323 self.extension_range_provider = Some(extension_range_provider);
324 }
325
326 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328 pub(crate) async fn scanner(self) -> Result<Scanner> {
329 if self.use_series_scan() {
330 self.series_scan().await.map(Scanner::Series)
331 } else if self.use_unordered_scan() {
332 self.unordered_scan().await.map(Scanner::Unordered)
335 } else {
336 self.seq_scan().await.map(Scanner::Seq)
337 }
338 }
339
340 #[tracing::instrument(
342 level = tracing::Level::DEBUG,
343 skip_all,
344 fields(region_id = %self.region_id())
345 )]
346 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347 if self.use_series_scan() {
348 self.series_scan()
349 .await
350 .map(|scanner| Box::new(scanner) as _)
351 } else if self.use_unordered_scan() {
352 self.unordered_scan()
353 .await
354 .map(|scanner| Box::new(scanner) as _)
355 } else {
356 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357 }
358 }
359
360 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363 let input = self.scan_input().await?.with_compaction(false);
364 Ok(SeqScan::new(input))
365 }
366
367 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370 let input = self.scan_input().await?;
371 Ok(UnorderedScan::new(input))
372 }
373
374 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377 let input = self.scan_input().await?;
378 Ok(SeriesScan::new(input))
379 }
380
381 fn use_unordered_scan(&self) -> bool {
383 self.version.options.append_mode
390 && self.request.series_row_selector.is_none()
391 && (self.request.distribution.is_none()
392 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393 }
394
395 fn use_series_scan(&self) -> bool {
397 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398 }
399
400 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
402 async fn scan_input(self) -> Result<ScanInput> {
403 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
404 let time_range = self.build_time_range_predicate();
405 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
406
407 let read_cols = match &self.request.projection_input {
408 Some(p) => {
409 let metadata = &self.version.metadata;
412 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
413 let from_predicate = read_columns_from_predicate(&predicate, metadata);
414 merge(from_projection, from_predicate)
415 }
416 None => {
417 let read_col_ids = self
418 .version
419 .metadata
420 .column_metadatas
421 .iter()
422 .map(|col| col.column_id);
423 ReadColumns::from_deduped_column_ids(read_col_ids)
424 }
425 };
426 let read_col_ids = read_cols.column_ids();
427
428 let projection = self
430 .request
431 .projection_indices()
432 .map(|x| x.to_vec())
433 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
434 let json_type_hint = self
435 .version
436 .metadata
437 .schema
438 .arrow_schema()
439 .has_json_extension_field()
440 .then_some(&self.request.json_type_hint)
441 .inspect(|json_type_hint| {
442 debug!(
443 "Concretized JSON type: {{{}}}",
444 json_type_hint
445 .iter()
446 .map(|(k, v)| format!("{}: {}", k, v))
447 .join(", ")
448 );
449 });
450 let mapper = FlatProjectionMapper::new_with_read_columns(
451 &self.version.metadata,
452 projection,
453 read_cols,
454 json_type_hint,
455 )?;
456
457 let ssts = &self.version.ssts;
458 let mut files = Vec::new();
459 if !self.request.skip_sst_files {
460 for level in ssts.levels() {
461 for file in level.files.values() {
462 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
463 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
464 (Some(_), None) => true,
470 (None, _) => true,
471 };
472
473 if exceed_min_sequence && file_in_range(file, &time_range) {
475 files.push(file.clone());
476 }
477 }
481 }
482 }
483
484 let memtables = self.version.memtables.list_memtables();
485 let mut mem_range_builders = Vec::new();
487 let filter_mode = pre_filter_mode(
488 self.version.options.append_mode,
489 self.version.options.merge_mode(),
490 );
491
492 for m in memtables {
493 let Some((start, end)) = m.stats().time_range() else {
495 continue;
496 };
497 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
499 if !memtable_range.intersects(&time_range) {
500 continue;
501 }
502 let ranges_in_memtable = m.ranges(
503 Some(&read_col_ids),
504 RangesOptions::default()
505 .with_predicate(predicate.clone())
506 .with_sequence(SequenceRange::new(
507 self.request.memtable_min_sequence,
508 self.request.memtable_max_sequence,
509 ))
510 .with_pre_filter_mode(filter_mode),
511 )?;
512 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
513 let stats = v.stats().clone();
514 MemRangeBuilder::new(v, stats)
515 }));
516 }
517
518 let region_id = self.region_id();
519 debug!(
520 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
521 region_id,
522 self.request,
523 time_range,
524 mem_range_builders.len(),
525 files.len(),
526 self.version.options.append_mode,
527 );
528
529 let (non_field_filters, field_filters) = self.partition_by_field_filters();
530 let inverted_index_appliers = [
531 self.build_invereted_index_applier(&non_field_filters),
532 self.build_invereted_index_applier(&field_filters),
533 ];
534 let bloom_filter_appliers = [
535 self.build_bloom_filter_applier(&non_field_filters),
536 self.build_bloom_filter_applier(&field_filters),
537 ];
538 let fulltext_index_appliers = [
539 self.build_fulltext_index_applier(&non_field_filters),
540 self.build_fulltext_index_applier(&field_filters),
541 ];
542 #[cfg(feature = "vector_index")]
543 let vector_index_applier = self.build_vector_index_applier();
544 #[cfg(feature = "vector_index")]
545 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
546 if self.request.filters.is_empty() {
547 search.k
548 } else {
549 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
550 }
551 });
552
553 let input = ScanInput::new(self.access_layer, mapper)
554 .with_time_range(Some(time_range))
555 .with_predicate(predicate)
556 .with_memtables(mem_range_builders)
557 .with_files(files)
558 .with_cache(self.cache_strategy)
559 .with_inverted_index_appliers(inverted_index_appliers)
560 .with_bloom_filter_index_appliers(bloom_filter_appliers)
561 .with_fulltext_index_appliers(fulltext_index_appliers)
562 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
563 .with_start_time(self.start_time)
564 .with_append_mode(self.version.options.append_mode)
565 .with_filter_deleted(self.filter_deleted)
566 .with_merge_mode(self.version.options.merge_mode())
567 .with_series_row_selector(self.request.series_row_selector)
568 .with_distribution(self.request.distribution)
569 .with_explain_flat_format(
570 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
571 )
572 .with_snapshot_sequence(
573 self.request
574 .snapshot_on_scan
575 .then_some(self.request.memtable_max_sequence)
576 .flatten(),
577 );
578 #[cfg(feature = "vector_index")]
579 let input = input
580 .with_vector_index_applier(vector_index_applier)
581 .with_vector_index_k(vector_index_k);
582
583 #[cfg(feature = "enterprise")]
584 let input = if !self.request.skip_sst_files
585 && let Some(provider) = self.extension_range_provider
586 {
587 let ranges = provider
588 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
589 .await?;
590 debug!("Find extension ranges: {ranges:?}");
591 input.with_extension_ranges(ranges)
592 } else {
593 input
594 };
595 Ok(input)
596 }
597
598 fn region_id(&self) -> RegionId {
599 self.version.metadata.region_id
600 }
601
602 fn build_time_range_predicate(&self) -> TimestampRange {
604 let time_index = self.version.metadata.time_index_column();
605 let unit = time_index
606 .column_schema
607 .data_type
608 .as_timestamp()
609 .expect("Time index must have timestamp-compatible type")
610 .unit();
611 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
612 }
613
614 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
617 let field_columns = self
618 .version
619 .metadata
620 .field_columns()
621 .map(|col| &col.column_schema.name)
622 .collect::<HashSet<_>>();
623
624 let mut columns = HashSet::new();
625
626 self.request.filters.iter().cloned().partition(|expr| {
627 columns.clear();
628 if expr_to_columns(expr, &mut columns).is_err() {
630 return true;
632 }
633 !columns
635 .iter()
636 .any(|column| field_columns.contains(&column.name))
637 })
638 }
639
640 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
642 if self.ignore_inverted_index {
643 return None;
644 }
645
646 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
647 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
648
649 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
650
651 InvertedIndexApplierBuilder::new(
652 self.access_layer.table_dir().to_string(),
653 self.access_layer.path_type(),
654 self.access_layer.object_store().clone(),
655 self.version.metadata.as_ref(),
656 self.version.metadata.inverted_indexed_column_ids(
657 self.version
658 .options
659 .index_options
660 .inverted_index
661 .ignore_column_ids
662 .iter(),
663 ),
664 self.access_layer.puffin_manager_factory().clone(),
665 )
666 .with_file_cache(file_cache)
667 .with_inverted_index_cache(inverted_index_cache)
668 .with_puffin_metadata_cache(puffin_metadata_cache)
669 .build(filters)
670 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
671 .ok()
672 .flatten()
673 .map(Arc::new)
674 }
675
676 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
678 if self.ignore_bloom_filter {
679 return None;
680 }
681
682 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
683 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
684 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
685
686 BloomFilterIndexApplierBuilder::new(
687 self.access_layer.table_dir().to_string(),
688 self.access_layer.path_type(),
689 self.access_layer.object_store().clone(),
690 self.version.metadata.as_ref(),
691 self.access_layer.puffin_manager_factory().clone(),
692 )
693 .with_file_cache(file_cache)
694 .with_bloom_filter_index_cache(bloom_filter_index_cache)
695 .with_puffin_metadata_cache(puffin_metadata_cache)
696 .build(filters)
697 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
698 .ok()
699 .flatten()
700 .map(Arc::new)
701 }
702
703 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
705 if self.ignore_fulltext_index {
706 return None;
707 }
708
709 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
710 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
711 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
712 FulltextIndexApplierBuilder::new(
713 self.access_layer.table_dir().to_string(),
714 self.access_layer.path_type(),
715 self.access_layer.object_store().clone(),
716 self.access_layer.puffin_manager_factory().clone(),
717 self.version.metadata.as_ref(),
718 )
719 .with_file_cache(file_cache)
720 .with_puffin_metadata_cache(puffin_metadata_cache)
721 .with_bloom_filter_cache(bloom_filter_index_cache)
722 .build(filters)
723 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
724 .ok()
725 .flatten()
726 .map(Arc::new)
727 }
728
729 #[cfg(feature = "vector_index")]
731 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
732 let vector_search = self.request.vector_search.as_ref()?;
733
734 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
735 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
736 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
737
738 let applier = VectorIndexApplier::new(
739 self.access_layer.table_dir().to_string(),
740 self.access_layer.path_type(),
741 self.access_layer.object_store().clone(),
742 self.access_layer.puffin_manager_factory().clone(),
743 vector_search.column_id,
744 vector_search.query_vector.clone(),
745 vector_search.metric,
746 )
747 .with_file_cache(file_cache)
748 .with_puffin_metadata_cache(puffin_metadata_cache)
749 .with_vector_index_cache(vector_index_cache);
750
751 Some(Arc::new(applier))
752 }
753}
754
755fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
757 if predicate == &TimestampRange::min_to_max() {
758 return true;
759 }
760 let (start, end) = file.time_range();
762 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
763 file_ts_range.intersects(predicate)
764}
765
766pub struct ScanInput {
768 access_layer: AccessLayerRef,
770 pub(crate) mapper: Arc<FlatProjectionMapper>,
772 pub(crate) read_cols: ReadColumns,
776 pub(crate) time_range: Option<TimestampRange>,
778 pub(crate) predicate: PredicateGroup,
780 region_partition_expr: Option<PartitionExpr>,
782 pub(crate) memtables: Vec<MemRangeBuilder>,
784 pub(crate) files: Vec<FileHandle>,
786 pub(crate) cache_strategy: CacheStrategy,
788 ignore_file_not_found: bool,
790 pub(crate) max_concurrent_scan_files: usize,
792 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
794 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
795 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
796 #[cfg(feature = "vector_index")]
798 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
799 #[cfg(feature = "vector_index")]
801 pub(crate) vector_index_k: Option<usize>,
802 pub(crate) query_start: Option<Instant>,
804 pub(crate) append_mode: bool,
806 pub(crate) filter_deleted: bool,
808 pub(crate) merge_mode: MergeMode,
810 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
812 pub(crate) distribution: Option<TimeSeriesDistribution>,
814 explain_flat_format: bool,
816 pub(crate) snapshot_sequence: Option<SequenceNumber>,
818 pub(crate) compaction: bool,
820 #[cfg(feature = "enterprise")]
821 extension_ranges: Vec<BoxedExtensionRange>,
822}
823
824impl ScanInput {
825 #[must_use]
827 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
828 ScanInput {
829 access_layer,
830 read_cols: mapper.read_columns().clone(),
831 mapper: Arc::new(mapper),
832 time_range: None,
833 predicate: PredicateGroup::default(),
834 region_partition_expr: None,
835 memtables: Vec::new(),
836 files: Vec::new(),
837 cache_strategy: CacheStrategy::Disabled,
838 ignore_file_not_found: false,
839 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
840 inverted_index_appliers: [None, None],
841 bloom_filter_index_appliers: [None, None],
842 fulltext_index_appliers: [None, None],
843 #[cfg(feature = "vector_index")]
844 vector_index_applier: None,
845 #[cfg(feature = "vector_index")]
846 vector_index_k: None,
847 query_start: None,
848 append_mode: false,
849 filter_deleted: true,
850 merge_mode: MergeMode::default(),
851 series_row_selector: None,
852 distribution: None,
853 explain_flat_format: false,
854 snapshot_sequence: None,
855 compaction: false,
856 #[cfg(feature = "enterprise")]
857 extension_ranges: Vec::new(),
858 }
859 }
860
861 #[must_use]
863 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
864 self.time_range = time_range;
865 self
866 }
867
868 #[must_use]
870 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
871 self.region_partition_expr = predicate.region_partition_expr().cloned();
872 self.predicate = predicate;
873 self
874 }
875
876 #[must_use]
878 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
879 self.memtables = memtables;
880 self
881 }
882
883 #[must_use]
885 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
886 self.files = files;
887 self
888 }
889
890 #[must_use]
892 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
893 self.cache_strategy = cache;
894 self
895 }
896
897 #[must_use]
899 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
900 self.ignore_file_not_found = ignore;
901 self
902 }
903
904 #[must_use]
906 pub(crate) fn with_max_concurrent_scan_files(
907 mut self,
908 max_concurrent_scan_files: usize,
909 ) -> Self {
910 self.max_concurrent_scan_files = max_concurrent_scan_files;
911 self
912 }
913
914 #[must_use]
916 pub(crate) fn with_inverted_index_appliers(
917 mut self,
918 appliers: [Option<InvertedIndexApplierRef>; 2],
919 ) -> Self {
920 self.inverted_index_appliers = appliers;
921 self
922 }
923
924 #[must_use]
926 pub(crate) fn with_bloom_filter_index_appliers(
927 mut self,
928 appliers: [Option<BloomFilterIndexApplierRef>; 2],
929 ) -> Self {
930 self.bloom_filter_index_appliers = appliers;
931 self
932 }
933
934 #[must_use]
936 pub(crate) fn with_fulltext_index_appliers(
937 mut self,
938 appliers: [Option<FulltextIndexApplierRef>; 2],
939 ) -> Self {
940 self.fulltext_index_appliers = appliers;
941 self
942 }
943
944 #[cfg(feature = "vector_index")]
946 #[must_use]
947 pub(crate) fn with_vector_index_applier(
948 mut self,
949 applier: Option<VectorIndexApplierRef>,
950 ) -> Self {
951 self.vector_index_applier = applier;
952 self
953 }
954
955 #[cfg(feature = "vector_index")]
957 #[must_use]
958 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
959 self.vector_index_k = k;
960 self
961 }
962
963 #[must_use]
965 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
966 self.query_start = now;
967 self
968 }
969
970 #[must_use]
971 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
972 self.append_mode = is_append_mode;
973 self
974 }
975
976 #[must_use]
978 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
979 self.filter_deleted = filter_deleted;
980 self
981 }
982
983 #[must_use]
985 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
986 self.merge_mode = merge_mode;
987 self
988 }
989
990 #[must_use]
992 pub(crate) fn with_distribution(
993 mut self,
994 distribution: Option<TimeSeriesDistribution>,
995 ) -> Self {
996 self.distribution = distribution;
997 self
998 }
999
1000 #[must_use]
1002 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1003 self.explain_flat_format = explain_flat_format;
1004 self
1005 }
1006
1007 #[must_use]
1009 pub(crate) fn with_series_row_selector(
1010 mut self,
1011 series_row_selector: Option<TimeSeriesRowSelector>,
1012 ) -> Self {
1013 self.series_row_selector = series_row_selector;
1014 self
1015 }
1016
1017 #[must_use]
1018 pub(crate) fn with_snapshot_sequence(
1019 mut self,
1020 snapshot_sequence: Option<SequenceNumber>,
1021 ) -> Self {
1022 self.snapshot_sequence = snapshot_sequence;
1023 self
1024 }
1025
1026 #[must_use]
1028 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1029 self.compaction = compaction;
1030 self
1031 }
1032
1033 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1035 let memtable = &self.memtables[index.index];
1036 let mut ranges = SmallVec::new();
1037 memtable.build_ranges(index.row_group_index, &mut ranges);
1038 ranges
1039 }
1040
1041 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1042 if self.should_skip_region_partition(file) {
1043 self.predicate.predicate_without_region().cloned()
1044 } else {
1045 self.predicate.predicate().cloned()
1046 }
1047 }
1048
1049 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1050 match (
1051 self.region_partition_expr.as_ref(),
1052 file.meta_ref().partition_expr.as_ref(),
1053 ) {
1054 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1055 _ => false,
1056 }
1057 }
1058
1059 #[tracing::instrument(
1061 skip_all,
1062 fields(
1063 region_id = %self.region_metadata().region_id,
1064 file_id = %file.file_id()
1065 )
1066 )]
1067 pub async fn prune_file(
1068 &self,
1069 file: &FileHandle,
1070 pre_filter_mode: PreFilterMode,
1071 reader_metrics: &mut ReaderMetrics,
1072 ) -> Result<FileRangeBuilder> {
1073 let predicate = self.predicate_for_file(file);
1074 let may_build_selective_row_selection = predicate.is_some();
1075 let decode_pk_values = !self.compaction
1076 && self
1077 .mapper
1078 .read_columns()
1079 .column_ids_iter()
1080 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1081 let reader = self
1082 .access_layer
1083 .read_sst(file.clone())
1084 .predicate(predicate)
1085 .projection(Some(self.read_cols.clone()))
1086 .cache(self.cache_strategy.clone())
1087 .inverted_index_appliers(self.inverted_index_appliers.clone())
1088 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1089 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1090 let reader = if !self.compaction && may_build_selective_row_selection {
1091 reader.deferred_optional_page_index()
1092 } else {
1093 reader
1094 };
1095 #[cfg(feature = "vector_index")]
1096 let reader = {
1097 let mut reader = reader;
1098 reader =
1099 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1100 reader
1101 };
1102 let res = reader
1103 .expected_metadata(Some(self.mapper.metadata().clone()))
1104 .compaction(self.compaction)
1105 .pre_filter_mode(pre_filter_mode)
1106 .decode_primary_key_values(decode_pk_values)
1107 .build_reader_input(reader_metrics)
1108 .await;
1109 let read_input = match res {
1110 Ok(x) => x,
1111 Err(e) => {
1112 if e.is_object_not_found() && self.ignore_file_not_found {
1113 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1114 return Ok(FileRangeBuilder::default());
1115 } else {
1116 return Err(e);
1117 }
1118 }
1119 };
1120
1121 let Some((mut file_range_ctx, selection)) = read_input else {
1122 return Ok(FileRangeBuilder::default());
1123 };
1124
1125 let need_compat = !compat::has_same_columns_and_pk_encoding(
1126 &self.mapper,
1127 file_range_ctx.read_format(),
1128 self.compaction,
1129 );
1130 if need_compat {
1131 let compat = FlatCompatBatch::try_new(
1134 &self.mapper,
1135 file_range_ctx.read_format(),
1136 self.compaction,
1137 )?;
1138 file_range_ctx.set_compat_batch(compat);
1139 }
1140 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1141 }
1142
1143 #[tracing::instrument(
1147 skip(self, sources, semaphore),
1148 fields(
1149 region_id = %self.region_metadata().region_id,
1150 source_count = sources.len()
1151 )
1152 )]
1153 pub(crate) fn create_parallel_flat_sources(
1154 &self,
1155 sources: Vec<BoxedRecordBatchStream>,
1156 semaphore: Arc<Semaphore>,
1157 channel_size: usize,
1158 ) -> Result<Vec<BoxedRecordBatchStream>> {
1159 if sources.len() <= 1 {
1160 return Ok(sources);
1161 }
1162
1163 let sources = sources
1165 .into_iter()
1166 .map(|source| {
1167 let (sender, receiver) = mpsc::channel(channel_size);
1168 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1169 let stream = Box::pin(ReceiverStream::new(receiver));
1170 Box::pin(stream) as _
1171 })
1172 .collect();
1173 Ok(sources)
1174 }
1175
1176 #[tracing::instrument(
1178 skip(self, input, semaphore, sender),
1179 fields(region_id = %self.region_metadata().region_id)
1180 )]
1181 pub(crate) fn spawn_flat_scan_task(
1182 &self,
1183 mut input: BoxedRecordBatchStream,
1184 semaphore: Arc<Semaphore>,
1185 sender: mpsc::Sender<Result<RecordBatch>>,
1186 ) {
1187 let region_id = self.region_metadata().region_id;
1188 let span = tracing::info_span!(
1189 "ScanInput::parallel_scan_task",
1190 region_id = %region_id,
1191 stream_kind = "flat"
1192 );
1193 common_runtime::spawn_global(
1194 async move {
1195 loop {
1196 let maybe_batch = {
1199 let _permit = semaphore.acquire().await.unwrap();
1201 input.next().await
1202 };
1203 match maybe_batch {
1204 Some(Ok(batch)) => {
1205 let _ = sender.send(Ok(batch)).await;
1206 }
1207 Some(Err(e)) => {
1208 let _ = sender.send(Err(e)).await;
1209 break;
1210 }
1211 None => break,
1212 }
1213 }
1214 }
1215 .instrument(span),
1216 );
1217 }
1218
1219 pub(crate) fn total_rows(&self) -> usize {
1220 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1221 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1222
1223 let rows = rows_in_files + rows_in_memtables;
1224 #[cfg(feature = "enterprise")]
1225 let rows = rows
1226 + self
1227 .extension_ranges
1228 .iter()
1229 .map(|x| x.num_rows())
1230 .sum::<u64>() as usize;
1231 rows
1232 }
1233
1234 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1235 &self.predicate
1236 }
1237
1238 pub(crate) fn num_memtables(&self) -> usize {
1240 self.memtables.len()
1241 }
1242
1243 pub(crate) fn num_files(&self) -> usize {
1245 self.files.len()
1246 }
1247
1248 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1250 let file_index = index.index - self.num_memtables();
1251 &self.files[file_index]
1252 }
1253
1254 pub fn region_metadata(&self) -> &RegionMetadataRef {
1255 self.mapper.metadata()
1256 }
1257
1258 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1259 if source_count <= 1 {
1260 return PreFilterMode::All;
1265 }
1266
1267 pre_filter_mode(self.append_mode, self.merge_mode)
1268 }
1269}
1270
1271#[cfg(feature = "enterprise")]
1272impl ScanInput {
1273 #[must_use]
1274 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1275 Self {
1276 extension_ranges,
1277 ..self
1278 }
1279 }
1280
1281 #[cfg(feature = "enterprise")]
1282 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1283 &self.extension_ranges
1284 }
1285
1286 #[cfg(feature = "enterprise")]
1288 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1289 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1290 }
1291}
1292
1293#[cfg(test)]
1294impl ScanInput {
1295 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1297 self.files.iter().map(|file| file.file_id()).collect()
1298 }
1299
1300 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1301 self.files.iter().map(|file| file.index_id()).collect()
1302 }
1303}
1304
1305fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1306 if append_mode {
1307 return PreFilterMode::All;
1308 }
1309
1310 match merge_mode {
1311 MergeMode::LastRow => PreFilterMode::SkipFields,
1312 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1313 }
1314}
1315
1316pub(crate) struct ScanFingerprintBundle {
1320 pub(crate) fingerprint: ScanRequestFingerprint,
1321 pub(crate) implied_time_range: Option<TimestampRange>,
1326}
1327
1328pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1331 let eligible = !input.compaction
1332 && !input.files.is_empty()
1333 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1334
1335 if !eligible {
1336 return None;
1337 }
1338
1339 let metadata = input.region_metadata();
1340 let tag_names: HashSet<&str> = metadata
1341 .column_metadatas
1342 .iter()
1343 .filter(|col| col.semantic_type == SemanticType::Tag)
1344 .map(|col| col.column_schema.name.as_str())
1345 .collect();
1346
1347 let time_index = metadata.time_index_column();
1348 let time_index_name = time_index.column_schema.name.clone();
1349 let ts_col_unit = time_index
1350 .column_schema
1351 .data_type
1352 .as_timestamp()
1353 .expect("Time index must have timestamp-compatible type")
1354 .unit();
1355
1356 let exprs = input
1357 .predicate_group()
1358 .predicate_without_region()
1359 .map(|predicate| predicate.exprs())
1360 .unwrap_or_default();
1361
1362 let mut filters = Vec::new();
1363 let mut time_only_exprs: Vec<&Expr> = Vec::new();
1364 let mut has_tag_filter = false;
1365 let mut columns = HashSet::new();
1366
1367 for expr in exprs {
1368 columns.clear();
1369 let is_time_only = match expr_to_columns(expr, &mut columns) {
1370 Ok(()) if !columns.is_empty() => {
1371 has_tag_filter |= columns
1372 .iter()
1373 .any(|col| tag_names.contains(col.name.as_str()));
1374 columns.iter().all(|col| col.name == time_index_name)
1375 }
1376 _ => false,
1377 };
1378
1379 if is_time_only
1387 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1388 {
1389 time_only_exprs.push(expr);
1390 } else {
1391 filters.push(expr.to_string());
1392 }
1393 }
1394
1395 if !has_tag_filter {
1396 return None;
1398 }
1399
1400 let implied_time_range =
1401 implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1402 let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1403
1404 filters.sort_unstable();
1406 time_filters.sort_unstable();
1407 let read_columns = input.read_cols.clone();
1408 let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1409 read_column_types: read_columns
1410 .column_ids_iter()
1411 .map(|id| {
1412 metadata
1413 .column_by_id(id)
1414 .map(|col| col.column_schema.data_type.clone())
1415 })
1416 .collect(),
1417 read_columns,
1418 filters,
1419 time_filters,
1420 series_row_selector: input.series_row_selector,
1421 append_mode: input.append_mode,
1422 filter_deleted: input.filter_deleted,
1423 merge_mode: input.merge_mode,
1424 partition_expr_version: metadata.partition_expr_version,
1425 }
1426 .build();
1427
1428 Some(ScanFingerprintBundle {
1429 fingerprint,
1430 implied_time_range,
1431 })
1432}
1433
1434pub struct StreamContext {
1437 pub input: ScanInput,
1439 pub(crate) ranges: Vec<RangeMeta>,
1441 #[allow(dead_code)]
1444 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1445 pub(crate) scan_implied_time_range: Option<TimestampRange>,
1452
1453 pub(crate) query_start: Instant,
1456}
1457
1458impl StreamContext {
1459 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1461 let query_start = input.query_start.unwrap_or_else(Instant::now);
1462 let ranges = RangeMeta::seq_scan_ranges(&input);
1463 READ_SST_COUNT.observe(input.num_files() as f64);
1464 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1465 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1466 None => (None, None),
1467 };
1468
1469 Self {
1470 input,
1471 ranges,
1472 scan_fingerprint,
1473 scan_implied_time_range,
1474 query_start,
1475 }
1476 }
1477
1478 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1480 let query_start = input.query_start.unwrap_or_else(Instant::now);
1481 let ranges = RangeMeta::unordered_scan_ranges(&input);
1482 READ_SST_COUNT.observe(input.num_files() as f64);
1483 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1484 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1485 None => (None, None),
1486 };
1487
1488 Self {
1489 input,
1490 ranges,
1491 scan_fingerprint,
1492 scan_implied_time_range,
1493 query_start,
1494 }
1495 }
1496
1497 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1499 self.input.num_memtables() > index.index
1500 }
1501
1502 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1503 !self.is_mem_range_index(index)
1504 && index.index < self.input.num_files() + self.input.num_memtables()
1505 }
1506
1507 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1508 let range_meta = &self.ranges[part_range.identifier];
1509 let source_count = range_meta.indices.len();
1510
1511 self.input.range_pre_filter_mode(source_count)
1512 }
1513
1514 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1516 self.ranges
1517 .iter()
1518 .enumerate()
1519 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1520 .collect()
1521 }
1522
1523 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1525 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1526 for range_meta in &self.ranges {
1527 for idx in &range_meta.row_group_indices {
1528 if self.is_mem_range_index(*idx) {
1529 num_mem_ranges += 1;
1530 } else if self.is_file_range_index(*idx) {
1531 num_file_ranges += 1;
1532 } else {
1533 num_other_ranges += 1;
1534 }
1535 }
1536 }
1537 if verbose {
1538 write!(f, "{{")?;
1539 }
1540 write!(
1541 f,
1542 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1543 self.ranges.len(),
1544 num_mem_ranges,
1545 self.input.num_files(),
1546 num_file_ranges,
1547 )?;
1548 if num_other_ranges > 0 {
1549 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1550 }
1551 write!(f, "}}")?;
1552
1553 if let Some(selector) = &self.input.series_row_selector {
1554 write!(f, ", \"selector\":\"{}\"", selector)?;
1555 }
1556 if let Some(distribution) = &self.input.distribution {
1557 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1558 }
1559
1560 if verbose {
1561 self.format_verbose_content(f)?;
1562 }
1563
1564 Ok(())
1565 }
1566
1567 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1568 struct FileWrapper<'a> {
1569 file: &'a FileHandle,
1570 }
1571
1572 impl fmt::Debug for FileWrapper<'_> {
1573 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1574 let (start, end) = self.file.time_range();
1575 write!(
1576 f,
1577 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1578 self.file.file_id(),
1579 start.value(),
1580 start.unit(),
1581 end.value(),
1582 end.unit(),
1583 self.file.num_rows(),
1584 self.file.size(),
1585 self.file.index_size()
1586 )
1587 }
1588 }
1589
1590 struct InputWrapper<'a> {
1591 input: &'a ScanInput,
1592 }
1593
1594 #[cfg(feature = "enterprise")]
1595 impl InputWrapper<'_> {
1596 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1597 if self.input.extension_ranges.is_empty() {
1598 return Ok(());
1599 }
1600
1601 let mut delimiter = "";
1602 write!(f, ", extension_ranges: [")?;
1603 for range in self.input.extension_ranges() {
1604 write!(f, "{}{:?}", delimiter, range)?;
1605 delimiter = ", ";
1606 }
1607 write!(f, "]")?;
1608 Ok(())
1609 }
1610 }
1611
1612 impl fmt::Debug for InputWrapper<'_> {
1613 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1614 let output_schema = self.input.mapper.output_schema();
1615 if !output_schema.is_empty() {
1616 let names: Vec<_> = output_schema
1617 .column_schemas()
1618 .iter()
1619 .map(|col| &col.name)
1620 .collect();
1621 write!(f, ", \"projection\": {:?}", names)?;
1622 }
1623 if let Some(predicate) = &self.input.predicate.predicate() {
1624 if !predicate.exprs().is_empty() {
1625 let exprs: Vec<_> =
1626 predicate.exprs().iter().map(|e| e.to_string()).collect();
1627 write!(f, ", \"filters\": {:?}", exprs)?;
1628 }
1629 if !predicate.dyn_filters().is_empty() {
1630 let dyn_filters: Vec<_> = predicate
1631 .dyn_filters()
1632 .iter()
1633 .map(|f| format!("{}", f))
1634 .collect();
1635 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1636 }
1637 }
1638 #[cfg(feature = "vector_index")]
1639 if let Some(vector_index_k) = self.input.vector_index_k {
1640 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1641 }
1642 if !self.input.files.is_empty() {
1643 write!(f, ", \"files\": ")?;
1644 f.debug_list()
1645 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1646 .finish()?;
1647 }
1648 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1649 #[cfg(feature = "enterprise")]
1650 self.format_extension_ranges(f)?;
1651
1652 Ok(())
1653 }
1654 }
1655
1656 write!(f, "{:?}", InputWrapper { input: &self.input })
1657 }
1658
1659 pub(crate) fn add_dyn_filter_to_predicate(
1662 self: &Arc<Self>,
1663 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1664 ) -> Vec<bool> {
1665 let mut supported = Vec::with_capacity(filter_exprs.len());
1666 let filter_expr = filter_exprs
1667 .into_iter()
1668 .filter_map(|expr| {
1669 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1670 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1671 {
1672 supported.push(true);
1673 Some(dyn_filter)
1674 } else {
1675 supported.push(false);
1676 None
1677 }
1678 })
1679 .collect();
1680 self.input.predicate.add_dyn_filters(filter_expr);
1681 supported
1682 }
1683}
1684
1685#[derive(Clone, Default)]
1688pub struct PredicateGroup {
1689 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1690 predicate_all: Predicate,
1692 predicate_without_region: Predicate,
1694 region_partition_expr: Option<PartitionExpr>,
1696}
1697
1698impl PredicateGroup {
1699 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1701 let mut combined_exprs = exprs.to_vec();
1702 let mut region_partition_expr = None;
1703
1704 if let Some(expr_json) = metadata.partition_expr.as_ref()
1705 && !expr_json.is_empty()
1706 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1707 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1708 {
1709 let logical_expr = expr
1710 .try_as_logical_expr()
1711 .context(InvalidPartitionExprSnafu {
1712 expr: expr_json.clone(),
1713 })?;
1714
1715 combined_exprs.push(logical_expr);
1716 region_partition_expr = Some(expr);
1717 }
1718
1719 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1720 let mut columns = HashSet::new();
1722 for expr in &combined_exprs {
1723 columns.clear();
1724 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1725 continue;
1726 };
1727 time_filters.push(filter);
1728 }
1729 let time_filters = if time_filters.is_empty() {
1730 None
1731 } else {
1732 Some(Arc::new(time_filters))
1733 };
1734
1735 let predicate_all = Predicate::new(combined_exprs);
1736 let predicate_without_region = Predicate::new(exprs.to_vec());
1737
1738 Ok(Self {
1739 time_filters,
1740 predicate_all,
1741 predicate_without_region,
1742 region_partition_expr,
1743 })
1744 }
1745
1746 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1748 self.time_filters.clone()
1749 }
1750
1751 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1753 if self.predicate_all.is_empty() {
1754 None
1755 } else {
1756 Some(&self.predicate_all)
1757 }
1758 }
1759
1760 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1762 if self.predicate_without_region.is_empty() {
1763 None
1764 } else {
1765 Some(&self.predicate_without_region)
1766 }
1767 }
1768
1769 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1771 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1772 self.predicate_without_region.add_dyn_filters(dyn_filters);
1773 }
1774
1775 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1777 self.region_partition_expr.as_ref()
1778 }
1779
1780 fn expr_to_filter(
1781 expr: &Expr,
1782 metadata: &RegionMetadata,
1783 columns: &mut HashSet<Column>,
1784 ) -> Option<SimpleFilterEvaluator> {
1785 columns.clear();
1786 expr_to_columns(expr, columns).ok()?;
1789 if columns.len() > 1 {
1790 return None;
1792 }
1793 let column = columns.iter().next()?;
1794 let column_meta = metadata.column_by_name(&column.name)?;
1795 if column_meta.semantic_type == SemanticType::Timestamp {
1796 SimpleFilterEvaluator::try_new(expr)
1797 } else {
1798 None
1799 }
1800 }
1801}
1802
1803#[cfg(test)]
1804mod tests {
1805 use std::sync::Arc;
1806
1807 use datafusion::physical_plan::expressions::lit as physical_lit;
1808 use datafusion_common::ScalarValue;
1809 use datafusion_expr::{col, lit};
1810 use datatypes::value::Value;
1811 use partition::expr::col as partition_col;
1812 use store_api::metadata::RegionMetadataBuilder;
1813 use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
1814
1815 use super::*;
1816 use crate::cache::CacheManager;
1817 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1818 use crate::test_util::memtable_util::metadata_with_primary_key;
1819 use crate::test_util::scheduler_util::SchedulerEnv;
1820
1821 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1822 let env = SchedulerEnv::new().await;
1823 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1824 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1825 let file = FileHandle::new(
1826 crate::sst::file::FileMeta::default(),
1827 Arc::new(crate::sst::file_purger::NoopFilePurger),
1828 );
1829
1830 ScanInput::new(env.access_layer.clone(), mapper)
1831 .with_predicate(predicate)
1832 .with_cache(CacheStrategy::EnableAll(Arc::new(
1833 CacheManager::builder()
1834 .range_result_cache_size(1024)
1835 .build(),
1836 )))
1837 .with_files(vec![file])
1838 }
1839
1840 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1842 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1843 }
1844
1845 #[tokio::test]
1846 async fn test_build_scan_fingerprint_for_eligible_scan() {
1847 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1848 let input = new_scan_input(
1849 metadata.clone(),
1850 vec![
1851 col("ts").gt_eq(ts_lit(1000)),
1852 col("k0").eq(lit("foo")),
1853 col("v0").gt(lit(1)),
1854 ],
1855 )
1856 .await
1857 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1858 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1859 .with_merge_mode(MergeMode::LastNonNull)
1860 .with_filter_deleted(false);
1861
1862 let fingerprint = build_scan_fingerprint(&input).unwrap();
1863
1864 let expected = ScanRequestFingerprintBuilder {
1865 read_columns: input.read_cols,
1866 read_column_types: vec![
1867 metadata
1868 .column_by_id(0)
1869 .map(|col| col.column_schema.data_type.clone()),
1870 metadata
1871 .column_by_id(2)
1872 .map(|col| col.column_schema.data_type.clone()),
1873 metadata
1874 .column_by_id(3)
1875 .map(|col| col.column_schema.data_type.clone()),
1876 ],
1877 filters: vec![
1878 col("k0").eq(lit("foo")).to_string(),
1879 col("v0").gt(lit(1)).to_string(),
1880 ],
1881 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1882 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1883 append_mode: false,
1884 filter_deleted: false,
1885 merge_mode: MergeMode::LastNonNull,
1886 partition_expr_version: 0,
1887 }
1888 .build();
1889 assert_eq!(expected, fingerprint.fingerprint);
1890 }
1891
1892 #[tokio::test]
1893 async fn test_build_scan_fingerprint_requires_tag_filter() {
1894 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1895 let input = new_scan_input(
1896 metadata,
1897 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1898 )
1899 .await;
1900
1901 assert!(build_scan_fingerprint(&input).is_none());
1902 }
1903
1904 #[tokio::test]
1905 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1906 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1907 let filters = vec![col("k0").eq(lit("foo"))];
1908
1909 let disabled = ScanInput::new(
1910 SchedulerEnv::new().await.access_layer.clone(),
1911 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1912 )
1913 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1914 assert!(build_scan_fingerprint(&disabled).is_none());
1915
1916 let compaction = new_scan_input(metadata.clone(), filters.clone())
1917 .await
1918 .with_compaction(true);
1919 assert!(build_scan_fingerprint(&compaction).is_none());
1920
1921 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
1923 assert!(build_scan_fingerprint(&no_files).is_none());
1924 }
1925
1926 #[tokio::test]
1927 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
1928 let base = metadata_with_primary_key(vec![0, 1], false);
1929 let mut builder = RegionMetadataBuilder::from_existing(base);
1930 let partition_expr = partition_col("k0")
1931 .gt_eq(Value::String("foo".into()))
1932 .as_json_str()
1933 .unwrap();
1934 builder.partition_expr_json(Some(partition_expr));
1935 let metadata = Arc::new(builder.build_without_validation().unwrap());
1936
1937 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
1938 let fingerprint = build_scan_fingerprint(&input).unwrap();
1939
1940 let expected = ScanRequestFingerprintBuilder {
1941 read_columns: input.read_cols,
1942 read_column_types: vec![
1943 metadata
1944 .column_by_id(0)
1945 .map(|col| col.column_schema.data_type.clone()),
1946 metadata
1947 .column_by_id(2)
1948 .map(|col| col.column_schema.data_type.clone()),
1949 metadata
1950 .column_by_id(3)
1951 .map(|col| col.column_schema.data_type.clone()),
1952 ],
1953 filters: vec![col("k0").eq(lit("foo")).to_string()],
1954 time_filters: vec![],
1955 series_row_selector: None,
1956 append_mode: false,
1957 filter_deleted: true,
1958 merge_mode: MergeMode::LastRow,
1959 partition_expr_version: metadata.partition_expr_version,
1960 }
1961 .build();
1962 assert_eq!(expected, fingerprint.fingerprint);
1963 assert_ne!(0, metadata.partition_expr_version);
1964 }
1965
1966 #[test]
1967 fn test_update_dyn_filters_with_empty_base_predicates() {
1968 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1969 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
1970 assert!(predicate_group.predicate().is_none());
1971 assert!(predicate_group.predicate_without_region().is_none());
1972
1973 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
1974 predicate_group.add_dyn_filters(vec![dyn_filter]);
1975
1976 let predicate_all = predicate_group.predicate().unwrap();
1977 assert!(predicate_all.exprs().is_empty());
1978 assert_eq!(1, predicate_all.dyn_filters().len());
1979
1980 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
1981 assert!(predicate_without_region.exprs().is_empty());
1982 assert_eq!(1, predicate_without_region.dyn_filters().len());
1983 }
1984
1985 #[tokio::test]
1986 async fn test_range_pre_filter_mode() {
1987 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1988 let cases = [
1989 (true, MergeMode::LastRow, 1, PreFilterMode::All),
1990 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
1991 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
1992 (true, MergeMode::LastRow, 2, PreFilterMode::All),
1993 ];
1994
1995 for (append_mode, merge_mode, source_count, expected_mode) in cases {
1996 let input = new_scan_input(metadata.clone(), vec![])
1997 .await
1998 .with_append_mode(append_mode)
1999 .with_merge_mode(merge_mode);
2000
2001 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2002 }
2003 }
2004}