1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41 ColumnId, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
42 TimeSeriesRowSelector,
43};
44use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
45use tokio::sync::{Semaphore, mpsc};
46use tokio_stream::wrappers::ReceiverStream;
47
48use crate::access_layer::AccessLayerRef;
49use crate::cache::CacheStrategy;
50use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
51use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
52#[cfg(feature = "enterprise")]
53use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
54use crate::memtable::{MemtableRange, RangesOptions};
55use crate::metrics::READ_SST_COUNT;
56use crate::read::compat::{self, FlatCompatBatch};
57use crate::read::flat_projection::FlatProjectionMapper;
58use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
59use crate::read::range_cache::ScanRequestFingerprint;
60use crate::read::seq_scan::SeqScan;
61use crate::read::series_scan::SeriesScan;
62use crate::read::stream::ScanBatchStream;
63use crate::read::unordered_scan::UnorderedScan;
64use crate::read::{BoxedRecordBatchStream, RecordBatch};
65use crate::region::options::MergeMode;
66use crate::region::version::VersionRef;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
70};
71use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
72use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
73use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
74use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
75#[cfg(feature = "vector_index")]
76use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
77use crate::sst::parquet::file_range::PreFilterMode;
78use crate::sst::parquet::reader::ReaderMetrics;
79
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83pub(crate) enum Scanner {
85 Seq(SeqScan),
87 Unordered(UnorderedScan),
89 Series(SeriesScan),
91}
92
93impl Scanner {
94 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97 match self {
98 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100 Scanner::Series(series_scan) => series_scan.build_stream().await,
101 }
102 }
103
104 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106 match self {
107 Scanner::Seq(x) => x.scan_all_partitions(),
108 Scanner::Unordered(x) => x.scan_all_partitions(),
109 Scanner::Series(x) => x.scan_all_partitions(),
110 }
111 }
112}
113
114#[cfg(test)]
115impl Scanner {
116 pub(crate) fn num_files(&self) -> usize {
118 match self {
119 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121 Scanner::Series(series_scan) => series_scan.input().num_files(),
122 }
123 }
124
125 pub(crate) fn num_memtables(&self) -> usize {
127 match self {
128 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131 }
132 }
133
134 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136 match self {
137 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139 Scanner::Series(series_scan) => series_scan.input().file_ids(),
140 }
141 }
142
143 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144 match self {
145 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147 Scanner::Series(series_scan) => series_scan.input().index_ids(),
148 }
149 }
150
151 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
152 match self {
153 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
154 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
155 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
156 }
157 }
158
159 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
161 use store_api::region_engine::{PrepareRequest, RegionScanner};
162
163 let request = PrepareRequest::default().with_target_partitions(target_partitions);
164 match self {
165 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
166 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
167 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
168 }
169 }
170}
171
172#[cfg_attr(doc, aquamarine::aquamarine)]
173pub(crate) struct ScanRegion {
223 version: VersionRef,
225 access_layer: AccessLayerRef,
227 request: ScanRequest,
229 cache_strategy: CacheStrategy,
231 max_concurrent_scan_files: usize,
233 ignore_inverted_index: bool,
235 ignore_fulltext_index: bool,
237 ignore_bloom_filter: bool,
239 start_time: Option<Instant>,
241 filter_deleted: bool,
244 #[cfg(feature = "enterprise")]
245 extension_range_provider: Option<BoxedExtensionRangeProvider>,
246}
247
248impl ScanRegion {
249 pub(crate) fn new(
251 version: VersionRef,
252 access_layer: AccessLayerRef,
253 request: ScanRequest,
254 cache_strategy: CacheStrategy,
255 ) -> ScanRegion {
256 ScanRegion {
257 version,
258 access_layer,
259 request,
260 cache_strategy,
261 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
262 ignore_inverted_index: false,
263 ignore_fulltext_index: false,
264 ignore_bloom_filter: false,
265 start_time: None,
266 filter_deleted: true,
267 #[cfg(feature = "enterprise")]
268 extension_range_provider: None,
269 }
270 }
271
272 #[must_use]
274 pub(crate) fn with_max_concurrent_scan_files(
275 mut self,
276 max_concurrent_scan_files: usize,
277 ) -> Self {
278 self.max_concurrent_scan_files = max_concurrent_scan_files;
279 self
280 }
281
282 #[must_use]
284 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
285 self.ignore_inverted_index = ignore;
286 self
287 }
288
289 #[must_use]
291 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
292 self.ignore_fulltext_index = ignore;
293 self
294 }
295
296 #[must_use]
298 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
299 self.ignore_bloom_filter = ignore;
300 self
301 }
302
303 #[must_use]
304 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
305 self.start_time = Some(now);
306 self
307 }
308
309 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
310 self.filter_deleted = filter_deleted;
311 }
312
313 #[cfg(feature = "enterprise")]
314 pub(crate) fn set_extension_range_provider(
315 &mut self,
316 extension_range_provider: BoxedExtensionRangeProvider,
317 ) {
318 self.extension_range_provider = Some(extension_range_provider);
319 }
320
321 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
323 pub(crate) async fn scanner(self) -> Result<Scanner> {
324 if self.use_series_scan() {
325 self.series_scan().await.map(Scanner::Series)
326 } else if self.use_unordered_scan() {
327 self.unordered_scan().await.map(Scanner::Unordered)
330 } else {
331 self.seq_scan().await.map(Scanner::Seq)
332 }
333 }
334
335 #[tracing::instrument(
337 level = tracing::Level::DEBUG,
338 skip_all,
339 fields(region_id = %self.region_id())
340 )]
341 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
342 if self.use_series_scan() {
343 self.series_scan()
344 .await
345 .map(|scanner| Box::new(scanner) as _)
346 } else if self.use_unordered_scan() {
347 self.unordered_scan()
348 .await
349 .map(|scanner| Box::new(scanner) as _)
350 } else {
351 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
352 }
353 }
354
355 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
357 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
358 let input = self.scan_input().await?.with_compaction(false);
359 Ok(SeqScan::new(input))
360 }
361
362 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
365 let input = self.scan_input().await?;
366 Ok(UnorderedScan::new(input))
367 }
368
369 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
372 let input = self.scan_input().await?;
373 Ok(SeriesScan::new(input))
374 }
375
376 fn use_unordered_scan(&self) -> bool {
378 self.version.options.append_mode
385 && self.request.series_row_selector.is_none()
386 && (self.request.distribution.is_none()
387 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
388 }
389
390 fn use_series_scan(&self) -> bool {
392 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
393 }
394
395 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
397 async fn scan_input(self) -> Result<ScanInput> {
398 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
399 let time_range = self.build_time_range_predicate();
400 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
401
402 let read_column_ids = match self.request.projection_indices() {
403 Some(p) => self.build_read_column_ids(p, &predicate)?,
404 None => self
405 .version
406 .metadata
407 .column_metadatas
408 .iter()
409 .map(|col| col.column_id)
410 .collect(),
411 };
412
413 let mapper = match self.request.projection_indices() {
415 Some(p) => FlatProjectionMapper::new_with_read_columns(
416 &self.version.metadata,
417 p.to_vec(),
418 read_column_ids.clone(),
419 )?,
420 None => FlatProjectionMapper::all(&self.version.metadata)?,
421 };
422
423 let ssts = &self.version.ssts;
424 let mut files = Vec::new();
425 for level in ssts.levels() {
426 for file in level.files.values() {
427 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
428 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
429 (Some(_), None) => true,
435 (None, _) => true,
436 };
437
438 if exceed_min_sequence && file_in_range(file, &time_range) {
440 files.push(file.clone());
441 }
442 }
446 }
447
448 let memtables = self.version.memtables.list_memtables();
449 let mut mem_range_builders = Vec::new();
451 let filter_mode = pre_filter_mode(
452 self.version.options.append_mode,
453 self.version.options.merge_mode(),
454 );
455
456 for m in memtables {
457 let Some((start, end)) = m.stats().time_range() else {
459 continue;
460 };
461 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
463 if !memtable_range.intersects(&time_range) {
464 continue;
465 }
466 let ranges_in_memtable = m.ranges(
467 Some(read_column_ids.as_slice()),
468 RangesOptions::default()
469 .with_predicate(predicate.clone())
470 .with_sequence(SequenceRange::new(
471 self.request.memtable_min_sequence,
472 self.request.memtable_max_sequence,
473 ))
474 .with_pre_filter_mode(filter_mode),
475 )?;
476 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
477 let stats = v.stats().clone();
478 MemRangeBuilder::new(v, stats)
479 }));
480 }
481
482 let region_id = self.region_id();
483 debug!(
484 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
485 region_id,
486 self.request,
487 time_range,
488 mem_range_builders.len(),
489 files.len(),
490 self.version.options.append_mode,
491 );
492
493 let (non_field_filters, field_filters) = self.partition_by_field_filters();
494 let inverted_index_appliers = [
495 self.build_invereted_index_applier(&non_field_filters),
496 self.build_invereted_index_applier(&field_filters),
497 ];
498 let bloom_filter_appliers = [
499 self.build_bloom_filter_applier(&non_field_filters),
500 self.build_bloom_filter_applier(&field_filters),
501 ];
502 let fulltext_index_appliers = [
503 self.build_fulltext_index_applier(&non_field_filters),
504 self.build_fulltext_index_applier(&field_filters),
505 ];
506 #[cfg(feature = "vector_index")]
507 let vector_index_applier = self.build_vector_index_applier();
508 #[cfg(feature = "vector_index")]
509 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
510 if self.request.filters.is_empty() {
511 search.k
512 } else {
513 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
514 }
515 });
516
517 let input = ScanInput::new(self.access_layer, mapper)
518 .with_time_range(Some(time_range))
519 .with_predicate(predicate)
520 .with_memtables(mem_range_builders)
521 .with_files(files)
522 .with_cache(self.cache_strategy)
523 .with_inverted_index_appliers(inverted_index_appliers)
524 .with_bloom_filter_index_appliers(bloom_filter_appliers)
525 .with_fulltext_index_appliers(fulltext_index_appliers)
526 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
527 .with_start_time(self.start_time)
528 .with_append_mode(self.version.options.append_mode)
529 .with_filter_deleted(self.filter_deleted)
530 .with_merge_mode(self.version.options.merge_mode())
531 .with_series_row_selector(self.request.series_row_selector)
532 .with_distribution(self.request.distribution)
533 .with_explain_flat_format(
534 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
535 )
536 .with_snapshot_sequence(
537 self.request
538 .snapshot_on_scan
539 .then_some(self.request.memtable_max_sequence)
540 .flatten(),
541 );
542 #[cfg(feature = "vector_index")]
543 let input = input
544 .with_vector_index_applier(vector_index_applier)
545 .with_vector_index_k(vector_index_k);
546
547 #[cfg(feature = "enterprise")]
548 let input = if let Some(provider) = self.extension_range_provider {
549 let ranges = provider
550 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
551 .await?;
552 debug!("Find extension ranges: {ranges:?}");
553 input.with_extension_ranges(ranges)
554 } else {
555 input
556 };
557 Ok(input)
558 }
559
560 fn region_id(&self) -> RegionId {
561 self.version.metadata.region_id
562 }
563
564 fn build_time_range_predicate(&self) -> TimestampRange {
566 let time_index = self.version.metadata.time_index_column();
567 let unit = time_index
568 .column_schema
569 .data_type
570 .as_timestamp()
571 .expect("Time index must have timestamp-compatible type")
572 .unit();
573 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
574 }
575
576 fn build_read_column_ids(
578 &self,
579 projection: &[usize],
580 predicate: &PredicateGroup,
581 ) -> Result<Vec<ColumnId>> {
582 let metadata = &self.version.metadata;
583 let mut read_column_ids = Vec::new();
585 let mut seen = HashSet::new();
586
587 for idx in projection {
588 let column =
589 metadata
590 .column_metadatas
591 .get(*idx)
592 .with_context(|| InvalidRequestSnafu {
593 region_id: metadata.region_id,
594 reason: format!("projection index {} is out of bound", idx),
595 })?;
596 seen.insert(column.column_id);
597 read_column_ids.push(column.column_id);
599 }
600
601 if projection.is_empty() {
602 let time_index = metadata.time_index_column().column_id;
603 if seen.insert(time_index) {
604 read_column_ids.push(time_index);
605 }
606 }
607
608 let mut extra_names = HashSet::new();
609 let mut columns = HashSet::new();
610
611 for expr in &self.request.filters {
612 columns.clear();
613 if expr_to_columns(expr, &mut columns).is_err() {
614 continue;
615 }
616 extra_names.extend(columns.iter().map(|column| column.name.clone()));
617 }
618
619 if let Some(expr) = predicate.region_partition_expr() {
620 expr.collect_column_names(&mut extra_names);
621 }
622
623 if !extra_names.is_empty() {
624 for column in &metadata.column_metadatas {
625 if extra_names.contains(column.column_schema.name.as_str())
626 && !seen.contains(&column.column_id)
627 {
628 read_column_ids.push(column.column_id);
629 }
630 extra_names.remove(column.column_schema.name.as_str());
631 }
632 if !extra_names.is_empty() {
633 warn!(
634 "Some columns in filters are not found in region {}: {:?}",
635 metadata.region_id, extra_names
636 );
637 }
638 }
639 Ok(read_column_ids)
640 }
641
642 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
645 let field_columns = self
646 .version
647 .metadata
648 .field_columns()
649 .map(|col| &col.column_schema.name)
650 .collect::<HashSet<_>>();
651
652 let mut columns = HashSet::new();
653
654 self.request.filters.iter().cloned().partition(|expr| {
655 columns.clear();
656 if expr_to_columns(expr, &mut columns).is_err() {
658 return true;
660 }
661 !columns
663 .iter()
664 .any(|column| field_columns.contains(&column.name))
665 })
666 }
667
668 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
670 if self.ignore_inverted_index {
671 return None;
672 }
673
674 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
675 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
676
677 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
678
679 InvertedIndexApplierBuilder::new(
680 self.access_layer.table_dir().to_string(),
681 self.access_layer.path_type(),
682 self.access_layer.object_store().clone(),
683 self.version.metadata.as_ref(),
684 self.version.metadata.inverted_indexed_column_ids(
685 self.version
686 .options
687 .index_options
688 .inverted_index
689 .ignore_column_ids
690 .iter(),
691 ),
692 self.access_layer.puffin_manager_factory().clone(),
693 )
694 .with_file_cache(file_cache)
695 .with_inverted_index_cache(inverted_index_cache)
696 .with_puffin_metadata_cache(puffin_metadata_cache)
697 .build(filters)
698 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
699 .ok()
700 .flatten()
701 .map(Arc::new)
702 }
703
704 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
706 if self.ignore_bloom_filter {
707 return None;
708 }
709
710 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
711 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
712 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
713
714 BloomFilterIndexApplierBuilder::new(
715 self.access_layer.table_dir().to_string(),
716 self.access_layer.path_type(),
717 self.access_layer.object_store().clone(),
718 self.version.metadata.as_ref(),
719 self.access_layer.puffin_manager_factory().clone(),
720 )
721 .with_file_cache(file_cache)
722 .with_bloom_filter_index_cache(bloom_filter_index_cache)
723 .with_puffin_metadata_cache(puffin_metadata_cache)
724 .build(filters)
725 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
726 .ok()
727 .flatten()
728 .map(Arc::new)
729 }
730
731 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
733 if self.ignore_fulltext_index {
734 return None;
735 }
736
737 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
738 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
739 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
740 FulltextIndexApplierBuilder::new(
741 self.access_layer.table_dir().to_string(),
742 self.access_layer.path_type(),
743 self.access_layer.object_store().clone(),
744 self.access_layer.puffin_manager_factory().clone(),
745 self.version.metadata.as_ref(),
746 )
747 .with_file_cache(file_cache)
748 .with_puffin_metadata_cache(puffin_metadata_cache)
749 .with_bloom_filter_cache(bloom_filter_index_cache)
750 .build(filters)
751 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
752 .ok()
753 .flatten()
754 .map(Arc::new)
755 }
756
757 #[cfg(feature = "vector_index")]
759 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
760 let vector_search = self.request.vector_search.as_ref()?;
761
762 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
763 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
764 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
765
766 let applier = VectorIndexApplier::new(
767 self.access_layer.table_dir().to_string(),
768 self.access_layer.path_type(),
769 self.access_layer.object_store().clone(),
770 self.access_layer.puffin_manager_factory().clone(),
771 vector_search.column_id,
772 vector_search.query_vector.clone(),
773 vector_search.metric,
774 )
775 .with_file_cache(file_cache)
776 .with_puffin_metadata_cache(puffin_metadata_cache)
777 .with_vector_index_cache(vector_index_cache);
778
779 Some(Arc::new(applier))
780 }
781}
782
783fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
785 if predicate == &TimestampRange::min_to_max() {
786 return true;
787 }
788 let (start, end) = file.time_range();
790 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
791 file_ts_range.intersects(predicate)
792}
793
794pub struct ScanInput {
796 access_layer: AccessLayerRef,
798 pub(crate) mapper: Arc<FlatProjectionMapper>,
800 pub(crate) read_column_ids: Vec<ColumnId>,
804 pub(crate) time_range: Option<TimestampRange>,
806 pub(crate) predicate: PredicateGroup,
808 region_partition_expr: Option<PartitionExpr>,
810 pub(crate) memtables: Vec<MemRangeBuilder>,
812 pub(crate) files: Vec<FileHandle>,
814 pub(crate) cache_strategy: CacheStrategy,
816 ignore_file_not_found: bool,
818 pub(crate) max_concurrent_scan_files: usize,
820 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
822 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
823 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
824 #[cfg(feature = "vector_index")]
826 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
827 #[cfg(feature = "vector_index")]
829 pub(crate) vector_index_k: Option<usize>,
830 pub(crate) query_start: Option<Instant>,
832 pub(crate) append_mode: bool,
834 pub(crate) filter_deleted: bool,
836 pub(crate) merge_mode: MergeMode,
838 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
840 pub(crate) distribution: Option<TimeSeriesDistribution>,
842 explain_flat_format: bool,
844 pub(crate) snapshot_sequence: Option<SequenceNumber>,
846 pub(crate) compaction: bool,
848 #[cfg(feature = "enterprise")]
849 extension_ranges: Vec<BoxedExtensionRange>,
850}
851
852impl ScanInput {
853 #[must_use]
855 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
856 ScanInput {
857 access_layer,
858 read_column_ids: mapper.column_ids().to_vec(),
859 mapper: Arc::new(mapper),
860 time_range: None,
861 predicate: PredicateGroup::default(),
862 region_partition_expr: None,
863 memtables: Vec::new(),
864 files: Vec::new(),
865 cache_strategy: CacheStrategy::Disabled,
866 ignore_file_not_found: false,
867 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
868 inverted_index_appliers: [None, None],
869 bloom_filter_index_appliers: [None, None],
870 fulltext_index_appliers: [None, None],
871 #[cfg(feature = "vector_index")]
872 vector_index_applier: None,
873 #[cfg(feature = "vector_index")]
874 vector_index_k: None,
875 query_start: None,
876 append_mode: false,
877 filter_deleted: true,
878 merge_mode: MergeMode::default(),
879 series_row_selector: None,
880 distribution: None,
881 explain_flat_format: false,
882 snapshot_sequence: None,
883 compaction: false,
884 #[cfg(feature = "enterprise")]
885 extension_ranges: Vec::new(),
886 }
887 }
888
889 #[must_use]
891 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
892 self.time_range = time_range;
893 self
894 }
895
896 #[must_use]
898 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
899 self.region_partition_expr = predicate.region_partition_expr().cloned();
900 self.predicate = predicate;
901 self
902 }
903
904 #[must_use]
906 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
907 self.memtables = memtables;
908 self
909 }
910
911 #[must_use]
913 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
914 self.files = files;
915 self
916 }
917
918 #[must_use]
920 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
921 self.cache_strategy = cache;
922 self
923 }
924
925 #[must_use]
927 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
928 self.ignore_file_not_found = ignore;
929 self
930 }
931
932 #[must_use]
934 pub(crate) fn with_max_concurrent_scan_files(
935 mut self,
936 max_concurrent_scan_files: usize,
937 ) -> Self {
938 self.max_concurrent_scan_files = max_concurrent_scan_files;
939 self
940 }
941
942 #[must_use]
944 pub(crate) fn with_inverted_index_appliers(
945 mut self,
946 appliers: [Option<InvertedIndexApplierRef>; 2],
947 ) -> Self {
948 self.inverted_index_appliers = appliers;
949 self
950 }
951
952 #[must_use]
954 pub(crate) fn with_bloom_filter_index_appliers(
955 mut self,
956 appliers: [Option<BloomFilterIndexApplierRef>; 2],
957 ) -> Self {
958 self.bloom_filter_index_appliers = appliers;
959 self
960 }
961
962 #[must_use]
964 pub(crate) fn with_fulltext_index_appliers(
965 mut self,
966 appliers: [Option<FulltextIndexApplierRef>; 2],
967 ) -> Self {
968 self.fulltext_index_appliers = appliers;
969 self
970 }
971
972 #[cfg(feature = "vector_index")]
974 #[must_use]
975 pub(crate) fn with_vector_index_applier(
976 mut self,
977 applier: Option<VectorIndexApplierRef>,
978 ) -> Self {
979 self.vector_index_applier = applier;
980 self
981 }
982
983 #[cfg(feature = "vector_index")]
985 #[must_use]
986 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
987 self.vector_index_k = k;
988 self
989 }
990
991 #[must_use]
993 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
994 self.query_start = now;
995 self
996 }
997
998 #[must_use]
999 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1000 self.append_mode = is_append_mode;
1001 self
1002 }
1003
1004 #[must_use]
1006 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1007 self.filter_deleted = filter_deleted;
1008 self
1009 }
1010
1011 #[must_use]
1013 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1014 self.merge_mode = merge_mode;
1015 self
1016 }
1017
1018 #[must_use]
1020 pub(crate) fn with_distribution(
1021 mut self,
1022 distribution: Option<TimeSeriesDistribution>,
1023 ) -> Self {
1024 self.distribution = distribution;
1025 self
1026 }
1027
1028 #[must_use]
1030 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1031 self.explain_flat_format = explain_flat_format;
1032 self
1033 }
1034
1035 #[must_use]
1037 pub(crate) fn with_series_row_selector(
1038 mut self,
1039 series_row_selector: Option<TimeSeriesRowSelector>,
1040 ) -> Self {
1041 self.series_row_selector = series_row_selector;
1042 self
1043 }
1044
1045 #[must_use]
1046 pub(crate) fn with_snapshot_sequence(
1047 mut self,
1048 snapshot_sequence: Option<SequenceNumber>,
1049 ) -> Self {
1050 self.snapshot_sequence = snapshot_sequence;
1051 self
1052 }
1053
1054 #[must_use]
1056 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1057 self.compaction = compaction;
1058 self
1059 }
1060
1061 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1063 let memtable = &self.memtables[index.index];
1064 let mut ranges = SmallVec::new();
1065 memtable.build_ranges(index.row_group_index, &mut ranges);
1066 ranges
1067 }
1068
1069 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1070 if self.should_skip_region_partition(file) {
1071 self.predicate.predicate_without_region().cloned()
1072 } else {
1073 self.predicate.predicate().cloned()
1074 }
1075 }
1076
1077 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1078 match (
1079 self.region_partition_expr.as_ref(),
1080 file.meta_ref().partition_expr.as_ref(),
1081 ) {
1082 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1083 _ => false,
1084 }
1085 }
1086
1087 #[tracing::instrument(
1089 skip_all,
1090 fields(
1091 region_id = %self.region_metadata().region_id,
1092 file_id = %file.file_id()
1093 )
1094 )]
1095 pub async fn prune_file(
1096 &self,
1097 file: &FileHandle,
1098 pre_filter_mode: PreFilterMode,
1099 reader_metrics: &mut ReaderMetrics,
1100 ) -> Result<FileRangeBuilder> {
1101 let predicate = self.predicate_for_file(file);
1102 let decode_pk_values = !self.compaction
1103 && self
1104 .mapper
1105 .column_ids()
1106 .iter()
1107 .any(|column_id| self.mapper.metadata().primary_key.contains(column_id));
1108 let reader = self
1109 .access_layer
1110 .read_sst(file.clone())
1111 .predicate(predicate)
1112 .projection(Some(self.read_column_ids.clone()))
1113 .cache(self.cache_strategy.clone())
1114 .inverted_index_appliers(self.inverted_index_appliers.clone())
1115 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1116 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1117 #[cfg(feature = "vector_index")]
1118 let reader = {
1119 let mut reader = reader;
1120 reader =
1121 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1122 reader
1123 };
1124 let res = reader
1125 .expected_metadata(Some(self.mapper.metadata().clone()))
1126 .compaction(self.compaction)
1127 .pre_filter_mode(pre_filter_mode)
1128 .decode_primary_key_values(decode_pk_values)
1129 .build_reader_input(reader_metrics)
1130 .await;
1131 let read_input = match res {
1132 Ok(x) => x,
1133 Err(e) => {
1134 if e.is_object_not_found() && self.ignore_file_not_found {
1135 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1136 return Ok(FileRangeBuilder::default());
1137 } else {
1138 return Err(e);
1139 }
1140 }
1141 };
1142
1143 let Some((mut file_range_ctx, selection)) = read_input else {
1144 return Ok(FileRangeBuilder::default());
1145 };
1146
1147 let need_compat = !compat::has_same_columns_and_pk_encoding(
1148 self.mapper.metadata(),
1149 file_range_ctx.read_format().metadata(),
1150 );
1151 if need_compat {
1152 let compat = FlatCompatBatch::try_new(
1155 &self.mapper,
1156 file_range_ctx.read_format().metadata(),
1157 file_range_ctx.read_format().format_projection(),
1158 self.compaction,
1159 )?;
1160 file_range_ctx.set_compat_batch(compat);
1161 }
1162 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1163 }
1164
1165 #[tracing::instrument(
1169 skip(self, sources, semaphore),
1170 fields(
1171 region_id = %self.region_metadata().region_id,
1172 source_count = sources.len()
1173 )
1174 )]
1175 pub(crate) fn create_parallel_flat_sources(
1176 &self,
1177 sources: Vec<BoxedRecordBatchStream>,
1178 semaphore: Arc<Semaphore>,
1179 channel_size: usize,
1180 ) -> Result<Vec<BoxedRecordBatchStream>> {
1181 if sources.len() <= 1 {
1182 return Ok(sources);
1183 }
1184
1185 let sources = sources
1187 .into_iter()
1188 .map(|source| {
1189 let (sender, receiver) = mpsc::channel(channel_size);
1190 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1191 let stream = Box::pin(ReceiverStream::new(receiver));
1192 Box::pin(stream) as _
1193 })
1194 .collect();
1195 Ok(sources)
1196 }
1197
1198 #[tracing::instrument(
1200 skip(self, input, semaphore, sender),
1201 fields(region_id = %self.region_metadata().region_id)
1202 )]
1203 pub(crate) fn spawn_flat_scan_task(
1204 &self,
1205 mut input: BoxedRecordBatchStream,
1206 semaphore: Arc<Semaphore>,
1207 sender: mpsc::Sender<Result<RecordBatch>>,
1208 ) {
1209 let region_id = self.region_metadata().region_id;
1210 let span = tracing::info_span!(
1211 "ScanInput::parallel_scan_task",
1212 region_id = %region_id,
1213 stream_kind = "flat"
1214 );
1215 common_runtime::spawn_global(
1216 async move {
1217 loop {
1218 let maybe_batch = {
1221 let _permit = semaphore.acquire().await.unwrap();
1223 input.next().await
1224 };
1225 match maybe_batch {
1226 Some(Ok(batch)) => {
1227 let _ = sender.send(Ok(batch)).await;
1228 }
1229 Some(Err(e)) => {
1230 let _ = sender.send(Err(e)).await;
1231 break;
1232 }
1233 None => break,
1234 }
1235 }
1236 }
1237 .instrument(span),
1238 );
1239 }
1240
1241 pub(crate) fn total_rows(&self) -> usize {
1242 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1243 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1244
1245 let rows = rows_in_files + rows_in_memtables;
1246 #[cfg(feature = "enterprise")]
1247 let rows = rows
1248 + self
1249 .extension_ranges
1250 .iter()
1251 .map(|x| x.num_rows())
1252 .sum::<u64>() as usize;
1253 rows
1254 }
1255
1256 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1257 &self.predicate
1258 }
1259
1260 pub(crate) fn num_memtables(&self) -> usize {
1262 self.memtables.len()
1263 }
1264
1265 pub(crate) fn num_files(&self) -> usize {
1267 self.files.len()
1268 }
1269
1270 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1272 let file_index = index.index - self.num_memtables();
1273 &self.files[file_index]
1274 }
1275
1276 pub fn region_metadata(&self) -> &RegionMetadataRef {
1277 self.mapper.metadata()
1278 }
1279
1280 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1281 if source_count <= 1 {
1282 return PreFilterMode::All;
1287 }
1288
1289 pre_filter_mode(self.append_mode, self.merge_mode)
1290 }
1291}
1292
1293#[cfg(feature = "enterprise")]
1294impl ScanInput {
1295 #[must_use]
1296 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1297 Self {
1298 extension_ranges,
1299 ..self
1300 }
1301 }
1302
1303 #[cfg(feature = "enterprise")]
1304 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1305 &self.extension_ranges
1306 }
1307
1308 #[cfg(feature = "enterprise")]
1310 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1311 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1312 }
1313}
1314
1315#[cfg(test)]
1316impl ScanInput {
1317 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1319 self.files.iter().map(|file| file.file_id()).collect()
1320 }
1321
1322 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1323 self.files.iter().map(|file| file.index_id()).collect()
1324 }
1325}
1326
1327fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1328 if append_mode {
1329 return PreFilterMode::All;
1330 }
1331
1332 match merge_mode {
1333 MergeMode::LastRow => PreFilterMode::SkipFields,
1334 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1335 }
1336}
1337
1338pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1341 let eligible = !input.compaction
1342 && !input.files.is_empty()
1343 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1344
1345 if !eligible {
1346 return None;
1347 }
1348
1349 let metadata = input.region_metadata();
1350 let tag_names: HashSet<&str> = metadata
1351 .column_metadatas
1352 .iter()
1353 .filter(|col| col.semantic_type == SemanticType::Tag)
1354 .map(|col| col.column_schema.name.as_str())
1355 .collect();
1356
1357 let time_index = metadata.time_index_column();
1358 let time_index_name = time_index.column_schema.name.clone();
1359 let ts_col_unit = time_index
1360 .column_schema
1361 .data_type
1362 .as_timestamp()
1363 .expect("Time index must have timestamp-compatible type")
1364 .unit();
1365
1366 let exprs = input
1367 .predicate_group()
1368 .predicate_without_region()
1369 .map(|predicate| predicate.exprs())
1370 .unwrap_or_default();
1371
1372 let mut filters = Vec::new();
1373 let mut time_filters = Vec::new();
1374 let mut has_tag_filter = false;
1375 let mut columns = HashSet::new();
1376
1377 for expr in exprs {
1378 columns.clear();
1379 let is_time_only = match expr_to_columns(expr, &mut columns) {
1380 Ok(()) if !columns.is_empty() => {
1381 has_tag_filter |= columns
1382 .iter()
1383 .any(|col| tag_names.contains(col.name.as_str()));
1384 columns.iter().all(|col| col.name == time_index_name)
1385 }
1386 _ => false,
1387 };
1388
1389 if is_time_only
1390 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1391 {
1392 time_filters.push(expr.to_string());
1395 } else {
1396 filters.push(expr.to_string());
1400 }
1401 }
1402
1403 if !has_tag_filter {
1404 return None;
1406 }
1407
1408 filters.sort_unstable();
1410 time_filters.sort_unstable();
1411
1412 Some(
1413 crate::read::range_cache::ScanRequestFingerprintBuilder {
1414 read_column_ids: input.read_column_ids.clone(),
1415 read_column_types: input
1416 .read_column_ids
1417 .iter()
1418 .map(|id| {
1419 metadata
1420 .column_by_id(*id)
1421 .map(|col| col.column_schema.data_type.clone())
1422 })
1423 .collect(),
1424 filters,
1425 time_filters,
1426 series_row_selector: input.series_row_selector,
1427 append_mode: input.append_mode,
1428 filter_deleted: input.filter_deleted,
1429 merge_mode: input.merge_mode,
1430 partition_expr_version: metadata.partition_expr_version,
1431 }
1432 .build(),
1433 )
1434}
1435
1436pub struct StreamContext {
1439 pub input: ScanInput,
1441 pub(crate) ranges: Vec<RangeMeta>,
1443 #[allow(dead_code)]
1446 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1447
1448 pub(crate) query_start: Instant,
1451}
1452
1453impl StreamContext {
1454 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1456 let query_start = input.query_start.unwrap_or_else(Instant::now);
1457 let ranges = RangeMeta::seq_scan_ranges(&input);
1458 READ_SST_COUNT.observe(input.num_files() as f64);
1459 let scan_fingerprint = build_scan_fingerprint(&input);
1460
1461 Self {
1462 input,
1463 ranges,
1464 scan_fingerprint,
1465 query_start,
1466 }
1467 }
1468
1469 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1471 let query_start = input.query_start.unwrap_or_else(Instant::now);
1472 let ranges = RangeMeta::unordered_scan_ranges(&input);
1473 READ_SST_COUNT.observe(input.num_files() as f64);
1474 let scan_fingerprint = build_scan_fingerprint(&input);
1475
1476 Self {
1477 input,
1478 ranges,
1479 scan_fingerprint,
1480 query_start,
1481 }
1482 }
1483
1484 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1486 self.input.num_memtables() > index.index
1487 }
1488
1489 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1490 !self.is_mem_range_index(index)
1491 && index.index < self.input.num_files() + self.input.num_memtables()
1492 }
1493
1494 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1495 let range_meta = &self.ranges[part_range.identifier];
1496 let source_count = range_meta.indices.len();
1497
1498 self.input.range_pre_filter_mode(source_count)
1499 }
1500
1501 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1503 self.ranges
1504 .iter()
1505 .enumerate()
1506 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1507 .collect()
1508 }
1509
1510 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1512 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1513 for range_meta in &self.ranges {
1514 for idx in &range_meta.row_group_indices {
1515 if self.is_mem_range_index(*idx) {
1516 num_mem_ranges += 1;
1517 } else if self.is_file_range_index(*idx) {
1518 num_file_ranges += 1;
1519 } else {
1520 num_other_ranges += 1;
1521 }
1522 }
1523 }
1524 if verbose {
1525 write!(f, "{{")?;
1526 }
1527 write!(
1528 f,
1529 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1530 self.ranges.len(),
1531 num_mem_ranges,
1532 self.input.num_files(),
1533 num_file_ranges,
1534 )?;
1535 if num_other_ranges > 0 {
1536 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1537 }
1538 write!(f, "}}")?;
1539
1540 if let Some(selector) = &self.input.series_row_selector {
1541 write!(f, ", \"selector\":\"{}\"", selector)?;
1542 }
1543 if let Some(distribution) = &self.input.distribution {
1544 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1545 }
1546
1547 if verbose {
1548 self.format_verbose_content(f)?;
1549 }
1550
1551 Ok(())
1552 }
1553
1554 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1555 struct FileWrapper<'a> {
1556 file: &'a FileHandle,
1557 }
1558
1559 impl fmt::Debug for FileWrapper<'_> {
1560 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1561 let (start, end) = self.file.time_range();
1562 write!(
1563 f,
1564 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1565 self.file.file_id(),
1566 start.value(),
1567 start.unit(),
1568 end.value(),
1569 end.unit(),
1570 self.file.num_rows(),
1571 self.file.size(),
1572 self.file.index_size()
1573 )
1574 }
1575 }
1576
1577 struct InputWrapper<'a> {
1578 input: &'a ScanInput,
1579 }
1580
1581 #[cfg(feature = "enterprise")]
1582 impl InputWrapper<'_> {
1583 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1584 if self.input.extension_ranges.is_empty() {
1585 return Ok(());
1586 }
1587
1588 let mut delimiter = "";
1589 write!(f, ", extension_ranges: [")?;
1590 for range in self.input.extension_ranges() {
1591 write!(f, "{}{:?}", delimiter, range)?;
1592 delimiter = ", ";
1593 }
1594 write!(f, "]")?;
1595 Ok(())
1596 }
1597 }
1598
1599 impl fmt::Debug for InputWrapper<'_> {
1600 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1601 let output_schema = self.input.mapper.output_schema();
1602 if !output_schema.is_empty() {
1603 let names: Vec<_> = output_schema
1604 .column_schemas()
1605 .iter()
1606 .map(|col| &col.name)
1607 .collect();
1608 write!(f, ", \"projection\": {:?}", names)?;
1609 }
1610 if let Some(predicate) = &self.input.predicate.predicate() {
1611 if !predicate.exprs().is_empty() {
1612 let exprs: Vec<_> =
1613 predicate.exprs().iter().map(|e| e.to_string()).collect();
1614 write!(f, ", \"filters\": {:?}", exprs)?;
1615 }
1616 if !predicate.dyn_filters().is_empty() {
1617 let dyn_filters: Vec<_> = predicate
1618 .dyn_filters()
1619 .iter()
1620 .map(|f| format!("{}", f))
1621 .collect();
1622 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1623 }
1624 }
1625 #[cfg(feature = "vector_index")]
1626 if let Some(vector_index_k) = self.input.vector_index_k {
1627 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1628 }
1629 if !self.input.files.is_empty() {
1630 write!(f, ", \"files\": ")?;
1631 f.debug_list()
1632 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1633 .finish()?;
1634 }
1635 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1636 #[cfg(feature = "enterprise")]
1637 self.format_extension_ranges(f)?;
1638
1639 Ok(())
1640 }
1641 }
1642
1643 write!(f, "{:?}", InputWrapper { input: &self.input })
1644 }
1645
1646 pub(crate) fn add_dyn_filter_to_predicate(
1649 self: &Arc<Self>,
1650 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1651 ) -> Vec<bool> {
1652 let mut supported = Vec::with_capacity(filter_exprs.len());
1653 let filter_expr = filter_exprs
1654 .into_iter()
1655 .filter_map(|expr| {
1656 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1657 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1658 {
1659 supported.push(true);
1660 Some(dyn_filter)
1661 } else {
1662 supported.push(false);
1663 None
1664 }
1665 })
1666 .collect();
1667 self.input.predicate.add_dyn_filters(filter_expr);
1668 supported
1669 }
1670}
1671
1672#[derive(Clone, Default)]
1675pub struct PredicateGroup {
1676 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1677 predicate_all: Predicate,
1679 predicate_without_region: Predicate,
1681 region_partition_expr: Option<PartitionExpr>,
1683}
1684
1685impl PredicateGroup {
1686 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1688 let mut combined_exprs = exprs.to_vec();
1689 let mut region_partition_expr = None;
1690
1691 if let Some(expr_json) = metadata.partition_expr.as_ref()
1692 && !expr_json.is_empty()
1693 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1694 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1695 {
1696 let logical_expr = expr
1697 .try_as_logical_expr()
1698 .context(InvalidPartitionExprSnafu {
1699 expr: expr_json.clone(),
1700 })?;
1701
1702 combined_exprs.push(logical_expr);
1703 region_partition_expr = Some(expr);
1704 }
1705
1706 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1707 let mut columns = HashSet::new();
1709 for expr in &combined_exprs {
1710 columns.clear();
1711 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1712 continue;
1713 };
1714 time_filters.push(filter);
1715 }
1716 let time_filters = if time_filters.is_empty() {
1717 None
1718 } else {
1719 Some(Arc::new(time_filters))
1720 };
1721
1722 let predicate_all = Predicate::new(combined_exprs);
1723 let predicate_without_region = Predicate::new(exprs.to_vec());
1724
1725 Ok(Self {
1726 time_filters,
1727 predicate_all,
1728 predicate_without_region,
1729 region_partition_expr,
1730 })
1731 }
1732
1733 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1735 self.time_filters.clone()
1736 }
1737
1738 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1740 if self.predicate_all.is_empty() {
1741 None
1742 } else {
1743 Some(&self.predicate_all)
1744 }
1745 }
1746
1747 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1749 if self.predicate_without_region.is_empty() {
1750 None
1751 } else {
1752 Some(&self.predicate_without_region)
1753 }
1754 }
1755
1756 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1758 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1759 self.predicate_without_region.add_dyn_filters(dyn_filters);
1760 }
1761
1762 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1764 self.region_partition_expr.as_ref()
1765 }
1766
1767 fn expr_to_filter(
1768 expr: &Expr,
1769 metadata: &RegionMetadata,
1770 columns: &mut HashSet<Column>,
1771 ) -> Option<SimpleFilterEvaluator> {
1772 columns.clear();
1773 expr_to_columns(expr, columns).ok()?;
1776 if columns.len() > 1 {
1777 return None;
1779 }
1780 let column = columns.iter().next()?;
1781 let column_meta = metadata.column_by_name(&column.name)?;
1782 if column_meta.semantic_type == SemanticType::Timestamp {
1783 SimpleFilterEvaluator::try_new(expr)
1784 } else {
1785 None
1786 }
1787 }
1788}
1789
1790#[cfg(test)]
1791mod tests {
1792 use std::sync::Arc;
1793
1794 use datafusion::physical_plan::expressions::lit as physical_lit;
1795 use datafusion_common::ScalarValue;
1796 use datafusion_expr::{col, lit};
1797 use datatypes::value::Value;
1798 use partition::expr::col as partition_col;
1799 use store_api::metadata::RegionMetadataBuilder;
1800 use store_api::storage::{
1801 ProjectionInput, ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector,
1802 };
1803
1804 use super::*;
1805 use crate::cache::CacheManager;
1806 use crate::memtable::time_partition::TimePartitions;
1807 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1808 use crate::region::version::VersionBuilder;
1809 use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1810 use crate::test_util::scheduler_util::SchedulerEnv;
1811
1812 fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1813 let mutable = Arc::new(TimePartitions::new(
1814 metadata.clone(),
1815 Arc::new(EmptyMemtableBuilder::default()),
1816 0,
1817 None,
1818 ));
1819 Arc::new(VersionBuilder::new(metadata, mutable).build())
1820 }
1821
1822 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1823 let env = SchedulerEnv::new().await;
1824 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap();
1825 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1826 let file = FileHandle::new(
1827 crate::sst::file::FileMeta::default(),
1828 Arc::new(crate::sst::file_purger::NoopFilePurger),
1829 );
1830
1831 ScanInput::new(env.access_layer.clone(), mapper)
1832 .with_predicate(predicate)
1833 .with_cache(CacheStrategy::EnableAll(Arc::new(
1834 CacheManager::builder()
1835 .range_result_cache_size(1024)
1836 .build(),
1837 )))
1838 .with_files(vec![file])
1839 }
1840
1841 #[tokio::test]
1842 async fn test_build_read_column_ids_includes_filters() {
1843 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1844 let version = new_version(metadata.clone());
1845 let env = SchedulerEnv::new().await;
1846 let request = ScanRequest {
1847 projection_input: Some(vec![4].into()),
1848 filters: vec![
1849 col("v0").gt(lit(1)),
1850 col("ts").gt(lit(0)),
1851 col("k0").eq(lit("foo")),
1852 ],
1853 ..Default::default()
1854 };
1855 let scan_region = ScanRegion::new(
1856 version,
1857 env.access_layer.clone(),
1858 request,
1859 CacheStrategy::Disabled,
1860 );
1861 let predicate =
1862 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1863 let projection = &scan_region.request.projection_indices().unwrap();
1864 let read_ids = scan_region
1865 .build_read_column_ids(projection, &predicate)
1866 .unwrap();
1867 assert_eq!(vec![4, 0, 2, 3], read_ids);
1868 }
1869
1870 #[tokio::test]
1871 async fn test_build_read_column_ids_empty_projection() {
1872 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1873 let version = new_version(metadata.clone());
1874 let env = SchedulerEnv::new().await;
1875 let request = ScanRequest {
1876 projection_input: Some(ProjectionInput::default()),
1877 ..Default::default()
1878 };
1879 let scan_region = ScanRegion::new(
1880 version,
1881 env.access_layer.clone(),
1882 request,
1883 CacheStrategy::Disabled,
1884 );
1885 let predicate =
1886 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1887 let projection = &scan_region.request.projection_indices().unwrap();
1888 let read_ids = scan_region
1889 .build_read_column_ids(projection, &predicate)
1890 .unwrap();
1891 assert_eq!(vec![2], read_ids);
1893 }
1894
1895 #[tokio::test]
1896 async fn test_build_read_column_ids_keeps_projection_order() {
1897 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1898 let version = new_version(metadata.clone());
1899 let env = SchedulerEnv::new().await;
1900 let request = ScanRequest {
1901 projection_input: Some(vec![4, 1].into()),
1902 filters: vec![col("v0").gt(lit(1))],
1903 ..Default::default()
1904 };
1905 let scan_region = ScanRegion::new(
1906 version,
1907 env.access_layer.clone(),
1908 request,
1909 CacheStrategy::Disabled,
1910 );
1911 let predicate =
1912 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1913 let projection = &scan_region.request.projection_indices().unwrap();
1914 let read_ids = scan_region
1915 .build_read_column_ids(projection, &predicate)
1916 .unwrap();
1917 assert_eq!(vec![4, 1, 3], read_ids);
1919 }
1920
1921 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1923 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1924 }
1925
1926 #[tokio::test]
1927 async fn test_build_scan_fingerprint_for_eligible_scan() {
1928 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1929 let input = new_scan_input(
1930 metadata.clone(),
1931 vec![
1932 col("ts").gt_eq(ts_lit(1000)),
1933 col("k0").eq(lit("foo")),
1934 col("v0").gt(lit(1)),
1935 ],
1936 )
1937 .await
1938 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1939 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1940 .with_merge_mode(MergeMode::LastNonNull)
1941 .with_filter_deleted(false);
1942
1943 let fingerprint = build_scan_fingerprint(&input).unwrap();
1944
1945 let expected = ScanRequestFingerprintBuilder {
1946 read_column_ids: input.read_column_ids.clone(),
1947 read_column_types: vec![
1948 metadata
1949 .column_by_id(0)
1950 .map(|col| col.column_schema.data_type.clone()),
1951 metadata
1952 .column_by_id(2)
1953 .map(|col| col.column_schema.data_type.clone()),
1954 metadata
1955 .column_by_id(3)
1956 .map(|col| col.column_schema.data_type.clone()),
1957 ],
1958 filters: vec![
1959 col("k0").eq(lit("foo")).to_string(),
1960 col("v0").gt(lit(1)).to_string(),
1961 ],
1962 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1963 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1964 append_mode: false,
1965 filter_deleted: false,
1966 merge_mode: MergeMode::LastNonNull,
1967 partition_expr_version: 0,
1968 }
1969 .build();
1970 assert_eq!(expected, fingerprint);
1971 }
1972
1973 #[tokio::test]
1974 async fn test_build_scan_fingerprint_requires_tag_filter() {
1975 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1976 let input = new_scan_input(
1977 metadata,
1978 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1979 )
1980 .await;
1981
1982 assert!(build_scan_fingerprint(&input).is_none());
1983 }
1984
1985 #[tokio::test]
1986 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1987 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1988 let filters = vec![col("k0").eq(lit("foo"))];
1989
1990 let disabled = ScanInput::new(
1991 SchedulerEnv::new().await.access_layer.clone(),
1992 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1993 )
1994 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1995 assert!(build_scan_fingerprint(&disabled).is_none());
1996
1997 let compaction = new_scan_input(metadata.clone(), filters.clone())
1998 .await
1999 .with_compaction(true);
2000 assert!(build_scan_fingerprint(&compaction).is_none());
2001
2002 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2004 assert!(build_scan_fingerprint(&no_files).is_none());
2005 }
2006
2007 #[tokio::test]
2008 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2009 let base = metadata_with_primary_key(vec![0, 1], false);
2010 let mut builder = RegionMetadataBuilder::from_existing(base);
2011 let partition_expr = partition_col("k0")
2012 .gt_eq(Value::String("foo".into()))
2013 .as_json_str()
2014 .unwrap();
2015 builder.partition_expr_json(Some(partition_expr));
2016 let metadata = Arc::new(builder.build_without_validation().unwrap());
2017
2018 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2019 let fingerprint = build_scan_fingerprint(&input).unwrap();
2020
2021 let expected = ScanRequestFingerprintBuilder {
2022 read_column_ids: input.read_column_ids.clone(),
2023 read_column_types: vec![
2024 metadata
2025 .column_by_id(0)
2026 .map(|col| col.column_schema.data_type.clone()),
2027 metadata
2028 .column_by_id(2)
2029 .map(|col| col.column_schema.data_type.clone()),
2030 metadata
2031 .column_by_id(3)
2032 .map(|col| col.column_schema.data_type.clone()),
2033 ],
2034 filters: vec![col("k0").eq(lit("foo")).to_string()],
2035 time_filters: vec![],
2036 series_row_selector: None,
2037 append_mode: false,
2038 filter_deleted: true,
2039 merge_mode: MergeMode::LastRow,
2040 partition_expr_version: metadata.partition_expr_version,
2041 }
2042 .build();
2043 assert_eq!(expected, fingerprint);
2044 assert_ne!(0, metadata.partition_expr_version);
2045 }
2046
2047 #[test]
2048 fn test_update_dyn_filters_with_empty_base_predicates() {
2049 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2050 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2051 assert!(predicate_group.predicate().is_none());
2052 assert!(predicate_group.predicate_without_region().is_none());
2053
2054 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2055 predicate_group.add_dyn_filters(vec![dyn_filter]);
2056
2057 let predicate_all = predicate_group.predicate().unwrap();
2058 assert!(predicate_all.exprs().is_empty());
2059 assert_eq!(1, predicate_all.dyn_filters().len());
2060
2061 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2062 assert!(predicate_without_region.exprs().is_empty());
2063 assert_eq!(1, predicate_without_region.dyn_filters().len());
2064 }
2065
2066 #[tokio::test]
2067 async fn test_range_pre_filter_mode() {
2068 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2069 let cases = [
2070 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2071 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2072 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2073 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2074 ];
2075
2076 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2077 let input = new_scan_input(metadata.clone(), vec![])
2078 .await
2079 .with_append_mode(append_mode)
2080 .with_merge_mode(merge_mode);
2081
2082 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2083 }
2084 }
2085}