1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use futures::StreamExt;
35use partition::expr::PartitionExpr;
36use smallvec::SmallVec;
37use snafu::{OptionExt as _, ResultExt};
38use store_api::metadata::{RegionMetadata, RegionMetadataRef};
39use store_api::region_engine::{PartitionRange, RegionScannerRef};
40use store_api::storage::{
41 ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
42};
43use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
44use tokio::sync::{Semaphore, mpsc};
45use tokio_stream::wrappers::ReceiverStream;
46
47use crate::access_layer::AccessLayerRef;
48use crate::cache::CacheStrategy;
49use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
50use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
51#[cfg(feature = "enterprise")]
52use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
53use crate::memtable::{MemtableRange, RangesOptions};
54use crate::metrics::READ_SST_COUNT;
55use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
56use crate::read::projection::ProjectionMapper;
57use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
58use crate::read::range_cache::ScanRequestFingerprint;
59use crate::read::seq_scan::SeqScan;
60use crate::read::series_scan::SeriesScan;
61use crate::read::stream::ScanBatchStream;
62use crate::read::unordered_scan::UnorderedScan;
63use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
64use crate::region::options::MergeMode;
65use crate::region::version::VersionRef;
66use crate::sst::FormatType;
67use crate::sst::file::FileHandle;
68use crate::sst::index::bloom_filter::applier::{
69 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
70};
71use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
72use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
73use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
74use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
75#[cfg(feature = "vector_index")]
76use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
77use crate::sst::parquet::file_range::PreFilterMode;
78use crate::sst::parquet::reader::ReaderMetrics;
79
80const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
82#[cfg(feature = "vector_index")]
83const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
84
85pub(crate) enum Scanner {
87 Seq(SeqScan),
89 Unordered(UnorderedScan),
91 Series(SeriesScan),
93}
94
95impl Scanner {
96 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
98 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
99 match self {
100 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
101 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
102 Scanner::Series(series_scan) => series_scan.build_stream().await,
103 }
104 }
105
106 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
108 match self {
109 Scanner::Seq(x) => x.scan_all_partitions(),
110 Scanner::Unordered(x) => x.scan_all_partitions(),
111 Scanner::Series(x) => x.scan_all_partitions(),
112 }
113 }
114}
115
116#[cfg(test)]
117impl Scanner {
118 pub(crate) fn num_files(&self) -> usize {
120 match self {
121 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
122 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
123 Scanner::Series(series_scan) => series_scan.input().num_files(),
124 }
125 }
126
127 pub(crate) fn num_memtables(&self) -> usize {
129 match self {
130 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
131 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
132 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
133 }
134 }
135
136 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
138 match self {
139 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
140 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
141 Scanner::Series(series_scan) => series_scan.input().file_ids(),
142 }
143 }
144
145 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
146 match self {
147 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
148 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
149 Scanner::Series(series_scan) => series_scan.input().index_ids(),
150 }
151 }
152
153 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
155 use store_api::region_engine::{PrepareRequest, RegionScanner};
156
157 let request = PrepareRequest::default().with_target_partitions(target_partitions);
158 match self {
159 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
160 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
161 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
162 }
163 }
164}
165
166#[cfg_attr(doc, aquamarine::aquamarine)]
167pub(crate) struct ScanRegion {
217 version: VersionRef,
219 access_layer: AccessLayerRef,
221 request: ScanRequest,
223 cache_strategy: CacheStrategy,
225 parallel_scan_channel_size: usize,
227 max_concurrent_scan_files: usize,
229 ignore_inverted_index: bool,
231 ignore_fulltext_index: bool,
233 ignore_bloom_filter: bool,
235 start_time: Option<Instant>,
237 filter_deleted: bool,
240 #[cfg(feature = "enterprise")]
241 extension_range_provider: Option<BoxedExtensionRangeProvider>,
242}
243
244impl ScanRegion {
245 pub(crate) fn new(
247 version: VersionRef,
248 access_layer: AccessLayerRef,
249 request: ScanRequest,
250 cache_strategy: CacheStrategy,
251 ) -> ScanRegion {
252 ScanRegion {
253 version,
254 access_layer,
255 request,
256 cache_strategy,
257 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
258 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
259 ignore_inverted_index: false,
260 ignore_fulltext_index: false,
261 ignore_bloom_filter: false,
262 start_time: None,
263 filter_deleted: true,
264 #[cfg(feature = "enterprise")]
265 extension_range_provider: None,
266 }
267 }
268
269 #[must_use]
271 pub(crate) fn with_parallel_scan_channel_size(
272 mut self,
273 parallel_scan_channel_size: usize,
274 ) -> Self {
275 self.parallel_scan_channel_size = parallel_scan_channel_size;
276 self
277 }
278
279 #[must_use]
281 pub(crate) fn with_max_concurrent_scan_files(
282 mut self,
283 max_concurrent_scan_files: usize,
284 ) -> Self {
285 self.max_concurrent_scan_files = max_concurrent_scan_files;
286 self
287 }
288
289 #[must_use]
291 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292 self.ignore_inverted_index = ignore;
293 self
294 }
295
296 #[must_use]
298 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299 self.ignore_fulltext_index = ignore;
300 self
301 }
302
303 #[must_use]
305 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306 self.ignore_bloom_filter = ignore;
307 self
308 }
309
310 #[must_use]
311 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312 self.start_time = Some(now);
313 self
314 }
315
316 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317 self.filter_deleted = filter_deleted;
318 }
319
320 #[cfg(feature = "enterprise")]
321 pub(crate) fn set_extension_range_provider(
322 &mut self,
323 extension_range_provider: BoxedExtensionRangeProvider,
324 ) {
325 self.extension_range_provider = Some(extension_range_provider);
326 }
327
328 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330 pub(crate) async fn scanner(self) -> Result<Scanner> {
331 if self.use_series_scan() {
332 self.series_scan().await.map(Scanner::Series)
333 } else if self.use_unordered_scan() {
334 self.unordered_scan().await.map(Scanner::Unordered)
337 } else {
338 self.seq_scan().await.map(Scanner::Seq)
339 }
340 }
341
342 #[tracing::instrument(
344 level = tracing::Level::DEBUG,
345 skip_all,
346 fields(region_id = %self.region_id())
347 )]
348 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349 if self.use_series_scan() {
350 self.series_scan()
351 .await
352 .map(|scanner| Box::new(scanner) as _)
353 } else if self.use_unordered_scan() {
354 self.unordered_scan()
355 .await
356 .map(|scanner| Box::new(scanner) as _)
357 } else {
358 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359 }
360 }
361
362 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365 let input = self.scan_input().await?.with_compaction(false);
366 Ok(SeqScan::new(input))
367 }
368
369 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372 let input = self.scan_input().await?;
373 Ok(UnorderedScan::new(input))
374 }
375
376 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379 let input = self.scan_input().await?;
380 Ok(SeriesScan::new(input))
381 }
382
383 fn use_unordered_scan(&self) -> bool {
385 self.version.options.append_mode
392 && self.request.series_row_selector.is_none()
393 && (self.request.distribution.is_none()
394 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395 }
396
397 fn use_series_scan(&self) -> bool {
399 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400 }
401
402 fn use_flat_format(&self) -> bool {
404 self.request.force_flat_format
405 || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
406 }
407
408 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
410 async fn scan_input(mut self) -> Result<ScanInput> {
411 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
412 let time_range = self.build_time_range_predicate();
413 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
414 let flat_format = self.use_flat_format();
415
416 let read_column_ids = match &self.request.projection {
417 Some(p) => self.build_read_column_ids(p, &predicate)?,
418 None => self
419 .version
420 .metadata
421 .column_metadatas
422 .iter()
423 .map(|col| col.column_id)
424 .collect(),
425 };
426
427 let mapper = match &self.request.projection {
429 Some(p) => ProjectionMapper::new_with_read_columns(
430 &self.version.metadata,
431 p.iter().copied(),
432 flat_format,
433 read_column_ids.clone(),
434 )?,
435 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
436 };
437
438 let ssts = &self.version.ssts;
439 let mut files = Vec::new();
440 for level in ssts.levels() {
441 for file in level.files.values() {
442 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
443 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
444 (Some(_), None) => true,
450 (None, _) => true,
451 };
452
453 if exceed_min_sequence && file_in_range(file, &time_range) {
455 files.push(file.clone());
456 }
457 }
461 }
462
463 let memtables = self.version.memtables.list_memtables();
464 let mut mem_range_builders = Vec::new();
466 let filter_mode = pre_filter_mode(
467 self.version.options.append_mode,
468 self.version.options.merge_mode(),
469 );
470
471 for m in memtables {
472 let Some((start, end)) = m.stats().time_range() else {
474 continue;
475 };
476 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
478 if !memtable_range.intersects(&time_range) {
479 continue;
480 }
481 let ranges_in_memtable = m.ranges(
482 Some(read_column_ids.as_slice()),
483 RangesOptions::default()
484 .with_predicate(predicate.clone())
485 .with_sequence(SequenceRange::new(
486 self.request.memtable_min_sequence,
487 self.request.memtable_max_sequence,
488 ))
489 .with_pre_filter_mode(filter_mode),
490 )?;
491 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
492 let stats = v.stats().clone();
493 MemRangeBuilder::new(v, stats)
494 }));
495 }
496
497 let region_id = self.region_id();
498 debug!(
499 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
500 region_id,
501 self.request,
502 time_range,
503 mem_range_builders.len(),
504 files.len(),
505 self.version.options.append_mode,
506 flat_format,
507 );
508
509 let (non_field_filters, field_filters) = self.partition_by_field_filters();
510 let inverted_index_appliers = [
511 self.build_invereted_index_applier(&non_field_filters),
512 self.build_invereted_index_applier(&field_filters),
513 ];
514 let bloom_filter_appliers = [
515 self.build_bloom_filter_applier(&non_field_filters),
516 self.build_bloom_filter_applier(&field_filters),
517 ];
518 let fulltext_index_appliers = [
519 self.build_fulltext_index_applier(&non_field_filters),
520 self.build_fulltext_index_applier(&field_filters),
521 ];
522 #[cfg(feature = "vector_index")]
523 let vector_index_applier = self.build_vector_index_applier();
524 #[cfg(feature = "vector_index")]
525 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
526 if self.request.filters.is_empty() {
527 search.k
528 } else {
529 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
530 }
531 });
532
533 if flat_format {
534 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
536 }
537
538 let input = ScanInput::new(self.access_layer, mapper)
539 .with_time_range(Some(time_range))
540 .with_predicate(predicate)
541 .with_memtables(mem_range_builders)
542 .with_files(files)
543 .with_cache(self.cache_strategy)
544 .with_inverted_index_appliers(inverted_index_appliers)
545 .with_bloom_filter_index_appliers(bloom_filter_appliers)
546 .with_fulltext_index_appliers(fulltext_index_appliers)
547 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
548 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
549 .with_start_time(self.start_time)
550 .with_append_mode(self.version.options.append_mode)
551 .with_filter_deleted(self.filter_deleted)
552 .with_merge_mode(self.version.options.merge_mode())
553 .with_series_row_selector(self.request.series_row_selector)
554 .with_distribution(self.request.distribution)
555 .with_flat_format(flat_format);
556 #[cfg(feature = "vector_index")]
557 let input = input
558 .with_vector_index_applier(vector_index_applier)
559 .with_vector_index_k(vector_index_k);
560
561 #[cfg(feature = "enterprise")]
562 let input = if let Some(provider) = self.extension_range_provider {
563 let ranges = provider
564 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
565 .await?;
566 debug!("Find extension ranges: {ranges:?}");
567 input.with_extension_ranges(ranges)
568 } else {
569 input
570 };
571 Ok(input)
572 }
573
574 fn region_id(&self) -> RegionId {
575 self.version.metadata.region_id
576 }
577
578 fn build_time_range_predicate(&self) -> TimestampRange {
580 let time_index = self.version.metadata.time_index_column();
581 let unit = time_index
582 .column_schema
583 .data_type
584 .as_timestamp()
585 .expect("Time index must have timestamp-compatible type")
586 .unit();
587 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
588 }
589
590 fn build_read_column_ids(
592 &self,
593 projection: &[usize],
594 predicate: &PredicateGroup,
595 ) -> Result<Vec<ColumnId>> {
596 let metadata = &self.version.metadata;
597 let mut read_column_ids = Vec::new();
599 let mut seen = HashSet::new();
600
601 for idx in projection {
602 let column =
603 metadata
604 .column_metadatas
605 .get(*idx)
606 .with_context(|| InvalidRequestSnafu {
607 region_id: metadata.region_id,
608 reason: format!("projection index {} is out of bound", idx),
609 })?;
610 seen.insert(column.column_id);
611 read_column_ids.push(column.column_id);
613 }
614
615 if projection.is_empty() {
616 let time_index = metadata.time_index_column().column_id;
617 if seen.insert(time_index) {
618 read_column_ids.push(time_index);
619 }
620 }
621
622 let mut extra_names = HashSet::new();
623 let mut columns = HashSet::new();
624
625 for expr in &self.request.filters {
626 columns.clear();
627 if expr_to_columns(expr, &mut columns).is_err() {
628 continue;
629 }
630 extra_names.extend(columns.iter().map(|column| column.name.clone()));
631 }
632
633 if let Some(expr) = predicate.region_partition_expr() {
634 expr.collect_column_names(&mut extra_names);
635 }
636
637 if !extra_names.is_empty() {
638 for column in &metadata.column_metadatas {
639 if extra_names.contains(column.column_schema.name.as_str())
640 && !seen.contains(&column.column_id)
641 {
642 read_column_ids.push(column.column_id);
643 }
644 extra_names.remove(column.column_schema.name.as_str());
645 }
646 if !extra_names.is_empty() {
647 warn!(
648 "Some columns in filters are not found in region {}: {:?}",
649 metadata.region_id, extra_names
650 );
651 }
652 }
653 Ok(read_column_ids)
654 }
655
656 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
659 let field_columns = self
660 .version
661 .metadata
662 .field_columns()
663 .map(|col| &col.column_schema.name)
664 .collect::<HashSet<_>>();
665
666 let mut columns = HashSet::new();
667
668 self.request.filters.iter().cloned().partition(|expr| {
669 columns.clear();
670 if expr_to_columns(expr, &mut columns).is_err() {
672 return true;
674 }
675 !columns
677 .iter()
678 .any(|column| field_columns.contains(&column.name))
679 })
680 }
681
682 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
684 if self.ignore_inverted_index {
685 return None;
686 }
687
688 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
689 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
690
691 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692
693 InvertedIndexApplierBuilder::new(
694 self.access_layer.table_dir().to_string(),
695 self.access_layer.path_type(),
696 self.access_layer.object_store().clone(),
697 self.version.metadata.as_ref(),
698 self.version.metadata.inverted_indexed_column_ids(
699 self.version
700 .options
701 .index_options
702 .inverted_index
703 .ignore_column_ids
704 .iter(),
705 ),
706 self.access_layer.puffin_manager_factory().clone(),
707 )
708 .with_file_cache(file_cache)
709 .with_inverted_index_cache(inverted_index_cache)
710 .with_puffin_metadata_cache(puffin_metadata_cache)
711 .build(filters)
712 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
713 .ok()
714 .flatten()
715 .map(Arc::new)
716 }
717
718 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
720 if self.ignore_bloom_filter {
721 return None;
722 }
723
724 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
725 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
726 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
727
728 BloomFilterIndexApplierBuilder::new(
729 self.access_layer.table_dir().to_string(),
730 self.access_layer.path_type(),
731 self.access_layer.object_store().clone(),
732 self.version.metadata.as_ref(),
733 self.access_layer.puffin_manager_factory().clone(),
734 )
735 .with_file_cache(file_cache)
736 .with_bloom_filter_index_cache(bloom_filter_index_cache)
737 .with_puffin_metadata_cache(puffin_metadata_cache)
738 .build(filters)
739 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
740 .ok()
741 .flatten()
742 .map(Arc::new)
743 }
744
745 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
747 if self.ignore_fulltext_index {
748 return None;
749 }
750
751 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
752 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
753 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
754 FulltextIndexApplierBuilder::new(
755 self.access_layer.table_dir().to_string(),
756 self.access_layer.path_type(),
757 self.access_layer.object_store().clone(),
758 self.access_layer.puffin_manager_factory().clone(),
759 self.version.metadata.as_ref(),
760 )
761 .with_file_cache(file_cache)
762 .with_puffin_metadata_cache(puffin_metadata_cache)
763 .with_bloom_filter_cache(bloom_filter_index_cache)
764 .build(filters)
765 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
766 .ok()
767 .flatten()
768 .map(Arc::new)
769 }
770
771 #[cfg(feature = "vector_index")]
773 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
774 let vector_search = self.request.vector_search.as_ref()?;
775
776 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
777 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
778 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
779
780 let applier = VectorIndexApplier::new(
781 self.access_layer.table_dir().to_string(),
782 self.access_layer.path_type(),
783 self.access_layer.object_store().clone(),
784 self.access_layer.puffin_manager_factory().clone(),
785 vector_search.column_id,
786 vector_search.query_vector.clone(),
787 vector_search.metric,
788 )
789 .with_file_cache(file_cache)
790 .with_puffin_metadata_cache(puffin_metadata_cache)
791 .with_vector_index_cache(vector_index_cache);
792
793 Some(Arc::new(applier))
794 }
795}
796
797fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
799 if predicate == &TimestampRange::min_to_max() {
800 return true;
801 }
802 let (start, end) = file.time_range();
804 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
805 file_ts_range.intersects(predicate)
806}
807
808pub struct ScanInput {
810 access_layer: AccessLayerRef,
812 pub(crate) mapper: Arc<ProjectionMapper>,
814 pub(crate) read_column_ids: Vec<ColumnId>,
818 pub(crate) time_range: Option<TimestampRange>,
820 pub(crate) predicate: PredicateGroup,
822 region_partition_expr: Option<PartitionExpr>,
824 pub(crate) memtables: Vec<MemRangeBuilder>,
826 pub(crate) files: Vec<FileHandle>,
828 pub(crate) cache_strategy: CacheStrategy,
830 ignore_file_not_found: bool,
832 pub(crate) parallel_scan_channel_size: usize,
834 pub(crate) max_concurrent_scan_files: usize,
836 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
838 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
839 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
840 #[cfg(feature = "vector_index")]
842 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
843 #[cfg(feature = "vector_index")]
845 pub(crate) vector_index_k: Option<usize>,
846 pub(crate) query_start: Option<Instant>,
848 pub(crate) append_mode: bool,
850 pub(crate) filter_deleted: bool,
852 pub(crate) merge_mode: MergeMode,
854 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
856 pub(crate) distribution: Option<TimeSeriesDistribution>,
858 pub(crate) flat_format: bool,
860 pub(crate) compaction: bool,
862 #[cfg(feature = "enterprise")]
863 extension_ranges: Vec<BoxedExtensionRange>,
864}
865
866impl ScanInput {
867 #[must_use]
869 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
870 ScanInput {
871 access_layer,
872 read_column_ids: mapper.column_ids().to_vec(),
873 mapper: Arc::new(mapper),
874 time_range: None,
875 predicate: PredicateGroup::default(),
876 region_partition_expr: None,
877 memtables: Vec::new(),
878 files: Vec::new(),
879 cache_strategy: CacheStrategy::Disabled,
880 ignore_file_not_found: false,
881 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
882 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
883 inverted_index_appliers: [None, None],
884 bloom_filter_index_appliers: [None, None],
885 fulltext_index_appliers: [None, None],
886 #[cfg(feature = "vector_index")]
887 vector_index_applier: None,
888 #[cfg(feature = "vector_index")]
889 vector_index_k: None,
890 query_start: None,
891 append_mode: false,
892 filter_deleted: true,
893 merge_mode: MergeMode::default(),
894 series_row_selector: None,
895 distribution: None,
896 flat_format: false,
897 compaction: false,
898 #[cfg(feature = "enterprise")]
899 extension_ranges: Vec::new(),
900 }
901 }
902
903 #[must_use]
905 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
906 self.time_range = time_range;
907 self
908 }
909
910 #[must_use]
912 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
913 self.region_partition_expr = predicate.region_partition_expr().cloned();
914 self.predicate = predicate;
915 self
916 }
917
918 #[must_use]
920 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
921 self.memtables = memtables;
922 self
923 }
924
925 #[must_use]
927 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
928 self.files = files;
929 self
930 }
931
932 #[must_use]
934 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
935 self.cache_strategy = cache;
936 self
937 }
938
939 #[must_use]
941 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
942 self.ignore_file_not_found = ignore;
943 self
944 }
945
946 #[must_use]
948 pub(crate) fn with_parallel_scan_channel_size(
949 mut self,
950 parallel_scan_channel_size: usize,
951 ) -> Self {
952 self.parallel_scan_channel_size = parallel_scan_channel_size;
953 self
954 }
955
956 #[must_use]
958 pub(crate) fn with_max_concurrent_scan_files(
959 mut self,
960 max_concurrent_scan_files: usize,
961 ) -> Self {
962 self.max_concurrent_scan_files = max_concurrent_scan_files;
963 self
964 }
965
966 #[must_use]
968 pub(crate) fn with_inverted_index_appliers(
969 mut self,
970 appliers: [Option<InvertedIndexApplierRef>; 2],
971 ) -> Self {
972 self.inverted_index_appliers = appliers;
973 self
974 }
975
976 #[must_use]
978 pub(crate) fn with_bloom_filter_index_appliers(
979 mut self,
980 appliers: [Option<BloomFilterIndexApplierRef>; 2],
981 ) -> Self {
982 self.bloom_filter_index_appliers = appliers;
983 self
984 }
985
986 #[must_use]
988 pub(crate) fn with_fulltext_index_appliers(
989 mut self,
990 appliers: [Option<FulltextIndexApplierRef>; 2],
991 ) -> Self {
992 self.fulltext_index_appliers = appliers;
993 self
994 }
995
996 #[cfg(feature = "vector_index")]
998 #[must_use]
999 pub(crate) fn with_vector_index_applier(
1000 mut self,
1001 applier: Option<VectorIndexApplierRef>,
1002 ) -> Self {
1003 self.vector_index_applier = applier;
1004 self
1005 }
1006
1007 #[cfg(feature = "vector_index")]
1009 #[must_use]
1010 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1011 self.vector_index_k = k;
1012 self
1013 }
1014
1015 #[must_use]
1017 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1018 self.query_start = now;
1019 self
1020 }
1021
1022 #[must_use]
1023 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1024 self.append_mode = is_append_mode;
1025 self
1026 }
1027
1028 #[must_use]
1030 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1031 self.filter_deleted = filter_deleted;
1032 self
1033 }
1034
1035 #[must_use]
1037 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1038 self.merge_mode = merge_mode;
1039 self
1040 }
1041
1042 #[must_use]
1044 pub(crate) fn with_distribution(
1045 mut self,
1046 distribution: Option<TimeSeriesDistribution>,
1047 ) -> Self {
1048 self.distribution = distribution;
1049 self
1050 }
1051
1052 #[must_use]
1054 pub(crate) fn with_series_row_selector(
1055 mut self,
1056 series_row_selector: Option<TimeSeriesRowSelector>,
1057 ) -> Self {
1058 self.series_row_selector = series_row_selector;
1059 self
1060 }
1061
1062 #[must_use]
1064 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1065 self.flat_format = flat_format;
1066 self
1067 }
1068
1069 #[must_use]
1071 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1072 self.compaction = compaction;
1073 self
1074 }
1075
1076 #[tracing::instrument(
1080 skip(self, sources, semaphore),
1081 fields(
1082 region_id = %self.region_metadata().region_id,
1083 source_count = sources.len()
1084 )
1085 )]
1086 pub(crate) fn create_parallel_sources(
1087 &self,
1088 sources: Vec<Source>,
1089 semaphore: Arc<Semaphore>,
1090 ) -> Result<Vec<Source>> {
1091 if sources.len() <= 1 {
1092 return Ok(sources);
1093 }
1094
1095 let sources = sources
1097 .into_iter()
1098 .map(|source| {
1099 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1100 self.spawn_scan_task(source, semaphore.clone(), sender);
1101 let stream = Box::pin(ReceiverStream::new(receiver));
1102 Source::Stream(stream)
1103 })
1104 .collect();
1105 Ok(sources)
1106 }
1107
1108 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1110 let memtable = &self.memtables[index.index];
1111 let mut ranges = SmallVec::new();
1112 memtable.build_ranges(index.row_group_index, &mut ranges);
1113 ranges
1114 }
1115
1116 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1117 if self.should_skip_region_partition(file) {
1118 self.predicate.predicate_without_region().cloned()
1119 } else {
1120 self.predicate.predicate().cloned()
1121 }
1122 }
1123
1124 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1125 match (
1126 self.region_partition_expr.as_ref(),
1127 file.meta_ref().partition_expr.as_ref(),
1128 ) {
1129 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1130 _ => false,
1131 }
1132 }
1133
1134 #[tracing::instrument(
1136 skip_all,
1137 fields(
1138 region_id = %self.region_metadata().region_id,
1139 file_id = %file.file_id()
1140 )
1141 )]
1142 pub async fn prune_file(
1143 &self,
1144 file: &FileHandle,
1145 reader_metrics: &mut ReaderMetrics,
1146 ) -> Result<FileRangeBuilder> {
1147 let predicate = self.predicate_for_file(file);
1148 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1149 let decode_pk_values = !self.compaction && self.mapper.has_tags();
1150 let reader = self
1151 .access_layer
1152 .read_sst(file.clone())
1153 .predicate(predicate)
1154 .projection(Some(self.read_column_ids.clone()))
1155 .cache(self.cache_strategy.clone())
1156 .inverted_index_appliers(self.inverted_index_appliers.clone())
1157 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1158 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1159 #[cfg(feature = "vector_index")]
1160 let reader = {
1161 let mut reader = reader;
1162 reader =
1163 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1164 reader
1165 };
1166 let res = reader
1167 .expected_metadata(Some(self.mapper.metadata().clone()))
1168 .flat_format(self.flat_format)
1169 .compaction(self.compaction)
1170 .pre_filter_mode(filter_mode)
1171 .decode_primary_key_values(decode_pk_values)
1172 .build_reader_input(reader_metrics)
1173 .await;
1174 let read_input = match res {
1175 Ok(x) => x,
1176 Err(e) => {
1177 if e.is_object_not_found() && self.ignore_file_not_found {
1178 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1179 return Ok(FileRangeBuilder::default());
1180 } else {
1181 return Err(e);
1182 }
1183 }
1184 };
1185
1186 let Some((mut file_range_ctx, selection)) = read_input else {
1187 return Ok(FileRangeBuilder::default());
1188 };
1189
1190 let need_compat = !compat::has_same_columns_and_pk_encoding(
1191 self.mapper.metadata(),
1192 file_range_ctx.read_format().metadata(),
1193 );
1194 if need_compat {
1195 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1198 let mapper = self.mapper.as_flat().unwrap();
1199 FlatCompatBatch::try_new(
1200 mapper,
1201 flat_format.metadata(),
1202 flat_format.format_projection(),
1203 self.compaction,
1204 )?
1205 .map(CompatBatch::Flat)
1206 } else {
1207 let compact_batch = PrimaryKeyCompatBatch::new(
1208 &self.mapper,
1209 file_range_ctx.read_format().metadata().clone(),
1210 )?;
1211 Some(CompatBatch::PrimaryKey(compact_batch))
1212 };
1213 file_range_ctx.set_compat_batch(compat);
1214 }
1215 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1216 }
1217
1218 #[tracing::instrument(
1220 skip(self, input, semaphore, sender),
1221 fields(region_id = %self.region_metadata().region_id)
1222 )]
1223 pub(crate) fn spawn_scan_task(
1224 &self,
1225 mut input: Source,
1226 semaphore: Arc<Semaphore>,
1227 sender: mpsc::Sender<Result<Batch>>,
1228 ) {
1229 let region_id = self.region_metadata().region_id;
1230 let span = tracing::info_span!(
1231 "ScanInput::parallel_scan_task",
1232 region_id = %region_id,
1233 stream_kind = "batch"
1234 );
1235 common_runtime::spawn_global(
1236 async move {
1237 loop {
1238 let maybe_batch = {
1241 let _permit = semaphore.acquire().await.unwrap();
1243 input.next_batch().await
1244 };
1245 match maybe_batch {
1246 Ok(Some(batch)) => {
1247 let _ = sender.send(Ok(batch)).await;
1248 }
1249 Ok(None) => break,
1250 Err(e) => {
1251 let _ = sender.send(Err(e)).await;
1252 break;
1253 }
1254 }
1255 }
1256 }
1257 .instrument(span),
1258 );
1259 }
1260
1261 #[tracing::instrument(
1265 skip(self, sources, semaphore),
1266 fields(
1267 region_id = %self.region_metadata().region_id,
1268 source_count = sources.len()
1269 )
1270 )]
1271 pub(crate) fn create_parallel_flat_sources(
1272 &self,
1273 sources: Vec<BoxedRecordBatchStream>,
1274 semaphore: Arc<Semaphore>,
1275 ) -> Result<Vec<BoxedRecordBatchStream>> {
1276 if sources.len() <= 1 {
1277 return Ok(sources);
1278 }
1279
1280 let sources = sources
1282 .into_iter()
1283 .map(|source| {
1284 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1285 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1286 let stream = Box::pin(ReceiverStream::new(receiver));
1287 Box::pin(stream) as _
1288 })
1289 .collect();
1290 Ok(sources)
1291 }
1292
1293 #[tracing::instrument(
1295 skip(self, input, semaphore, sender),
1296 fields(region_id = %self.region_metadata().region_id)
1297 )]
1298 pub(crate) fn spawn_flat_scan_task(
1299 &self,
1300 mut input: BoxedRecordBatchStream,
1301 semaphore: Arc<Semaphore>,
1302 sender: mpsc::Sender<Result<RecordBatch>>,
1303 ) {
1304 let region_id = self.region_metadata().region_id;
1305 let span = tracing::info_span!(
1306 "ScanInput::parallel_scan_task",
1307 region_id = %region_id,
1308 stream_kind = "flat"
1309 );
1310 common_runtime::spawn_global(
1311 async move {
1312 loop {
1313 let maybe_batch = {
1316 let _permit = semaphore.acquire().await.unwrap();
1318 input.next().await
1319 };
1320 match maybe_batch {
1321 Some(Ok(batch)) => {
1322 let _ = sender.send(Ok(batch)).await;
1323 }
1324 Some(Err(e)) => {
1325 let _ = sender.send(Err(e)).await;
1326 break;
1327 }
1328 None => break,
1329 }
1330 }
1331 }
1332 .instrument(span),
1333 );
1334 }
1335
1336 pub(crate) fn total_rows(&self) -> usize {
1337 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1338 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1339
1340 let rows = rows_in_files + rows_in_memtables;
1341 #[cfg(feature = "enterprise")]
1342 let rows = rows
1343 + self
1344 .extension_ranges
1345 .iter()
1346 .map(|x| x.num_rows())
1347 .sum::<u64>() as usize;
1348 rows
1349 }
1350
1351 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1352 &self.predicate
1353 }
1354
1355 pub(crate) fn num_memtables(&self) -> usize {
1357 self.memtables.len()
1358 }
1359
1360 pub(crate) fn num_files(&self) -> usize {
1362 self.files.len()
1363 }
1364
1365 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1367 let file_index = index.index - self.num_memtables();
1368 &self.files[file_index]
1369 }
1370
1371 pub fn region_metadata(&self) -> &RegionMetadataRef {
1372 self.mapper.metadata()
1373 }
1374}
1375
1376#[cfg(feature = "enterprise")]
1377impl ScanInput {
1378 #[must_use]
1379 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1380 Self {
1381 extension_ranges,
1382 ..self
1383 }
1384 }
1385
1386 #[cfg(feature = "enterprise")]
1387 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1388 &self.extension_ranges
1389 }
1390
1391 #[cfg(feature = "enterprise")]
1393 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1394 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1395 }
1396}
1397
1398#[cfg(test)]
1399impl ScanInput {
1400 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1402 self.files.iter().map(|file| file.file_id()).collect()
1403 }
1404
1405 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1406 self.files.iter().map(|file| file.index_id()).collect()
1407 }
1408}
1409
1410fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1411 if append_mode {
1412 return PreFilterMode::All;
1413 }
1414
1415 match merge_mode {
1416 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1417 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1418 }
1419}
1420
1421pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1424 let eligible = input.flat_format
1425 && !input.compaction
1426 && !input.files.is_empty()
1427 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1428
1429 if !eligible {
1430 return None;
1431 }
1432
1433 let metadata = input.region_metadata();
1434 let tag_names: HashSet<&str> = metadata
1435 .column_metadatas
1436 .iter()
1437 .filter(|col| col.semantic_type == SemanticType::Tag)
1438 .map(|col| col.column_schema.name.as_str())
1439 .collect();
1440
1441 let time_index = metadata.time_index_column();
1442 let time_index_name = time_index.column_schema.name.clone();
1443 let ts_col_unit = time_index
1444 .column_schema
1445 .data_type
1446 .as_timestamp()
1447 .expect("Time index must have timestamp-compatible type")
1448 .unit();
1449
1450 let exprs = input
1451 .predicate_group()
1452 .predicate_without_region()
1453 .map(|predicate| predicate.exprs())
1454 .unwrap_or_default();
1455
1456 let mut filters = Vec::new();
1457 let mut time_filters = Vec::new();
1458 let mut has_tag_filter = false;
1459 let mut columns = HashSet::new();
1460
1461 for expr in exprs {
1462 columns.clear();
1463 let is_time_only = match expr_to_columns(expr, &mut columns) {
1464 Ok(()) if !columns.is_empty() => {
1465 has_tag_filter |= columns
1466 .iter()
1467 .any(|col| tag_names.contains(col.name.as_str()));
1468 columns.iter().all(|col| col.name == time_index_name)
1469 }
1470 _ => false,
1471 };
1472
1473 if is_time_only
1474 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1475 {
1476 time_filters.push(expr.to_string());
1479 } else {
1480 filters.push(expr.to_string());
1484 }
1485 }
1486
1487 if !has_tag_filter {
1488 return None;
1490 }
1491
1492 filters.sort_unstable();
1494 time_filters.sort_unstable();
1495
1496 Some(
1497 crate::read::range_cache::ScanRequestFingerprintBuilder {
1498 read_column_ids: input.read_column_ids.clone(),
1499 read_column_types: input
1500 .read_column_ids
1501 .iter()
1502 .map(|id| {
1503 metadata
1504 .column_by_id(*id)
1505 .map(|col| col.column_schema.data_type.clone())
1506 })
1507 .collect(),
1508 filters,
1509 time_filters,
1510 series_row_selector: input.series_row_selector,
1511 append_mode: input.append_mode,
1512 filter_deleted: input.filter_deleted,
1513 merge_mode: input.merge_mode,
1514 partition_expr_version: metadata.partition_expr_version,
1515 }
1516 .build(),
1517 )
1518}
1519
1520pub struct StreamContext {
1523 pub input: ScanInput,
1525 pub(crate) ranges: Vec<RangeMeta>,
1527 #[allow(dead_code)]
1530 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1531
1532 pub(crate) query_start: Instant,
1535}
1536
1537impl StreamContext {
1538 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1540 let query_start = input.query_start.unwrap_or_else(Instant::now);
1541 let ranges = RangeMeta::seq_scan_ranges(&input);
1542 READ_SST_COUNT.observe(input.num_files() as f64);
1543 let scan_fingerprint = build_scan_fingerprint(&input);
1544
1545 Self {
1546 input,
1547 ranges,
1548 scan_fingerprint,
1549 query_start,
1550 }
1551 }
1552
1553 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1555 let query_start = input.query_start.unwrap_or_else(Instant::now);
1556 let ranges = RangeMeta::unordered_scan_ranges(&input);
1557 READ_SST_COUNT.observe(input.num_files() as f64);
1558 let scan_fingerprint = build_scan_fingerprint(&input);
1559
1560 Self {
1561 input,
1562 ranges,
1563 scan_fingerprint,
1564 query_start,
1565 }
1566 }
1567
1568 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1570 self.input.num_memtables() > index.index
1571 }
1572
1573 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1574 !self.is_mem_range_index(index)
1575 && index.index < self.input.num_files() + self.input.num_memtables()
1576 }
1577
1578 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1580 self.ranges
1581 .iter()
1582 .enumerate()
1583 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1584 .collect()
1585 }
1586
1587 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1589 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1590 for range_meta in &self.ranges {
1591 for idx in &range_meta.row_group_indices {
1592 if self.is_mem_range_index(*idx) {
1593 num_mem_ranges += 1;
1594 } else if self.is_file_range_index(*idx) {
1595 num_file_ranges += 1;
1596 } else {
1597 num_other_ranges += 1;
1598 }
1599 }
1600 }
1601 if verbose {
1602 write!(f, "{{")?;
1603 }
1604 write!(
1605 f,
1606 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1607 self.ranges.len(),
1608 num_mem_ranges,
1609 self.input.num_files(),
1610 num_file_ranges,
1611 )?;
1612 if num_other_ranges > 0 {
1613 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1614 }
1615 write!(f, "}}")?;
1616
1617 if let Some(selector) = &self.input.series_row_selector {
1618 write!(f, ", \"selector\":\"{}\"", selector)?;
1619 }
1620 if let Some(distribution) = &self.input.distribution {
1621 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1622 }
1623
1624 if verbose {
1625 self.format_verbose_content(f)?;
1626 }
1627
1628 Ok(())
1629 }
1630
1631 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1632 struct FileWrapper<'a> {
1633 file: &'a FileHandle,
1634 }
1635
1636 impl fmt::Debug for FileWrapper<'_> {
1637 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1638 let (start, end) = self.file.time_range();
1639 write!(
1640 f,
1641 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1642 self.file.file_id(),
1643 start.value(),
1644 start.unit(),
1645 end.value(),
1646 end.unit(),
1647 self.file.num_rows(),
1648 self.file.size(),
1649 self.file.index_size()
1650 )
1651 }
1652 }
1653
1654 struct InputWrapper<'a> {
1655 input: &'a ScanInput,
1656 }
1657
1658 #[cfg(feature = "enterprise")]
1659 impl InputWrapper<'_> {
1660 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1661 if self.input.extension_ranges.is_empty() {
1662 return Ok(());
1663 }
1664
1665 let mut delimiter = "";
1666 write!(f, ", extension_ranges: [")?;
1667 for range in self.input.extension_ranges() {
1668 write!(f, "{}{:?}", delimiter, range)?;
1669 delimiter = ", ";
1670 }
1671 write!(f, "]")?;
1672 Ok(())
1673 }
1674 }
1675
1676 impl fmt::Debug for InputWrapper<'_> {
1677 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1678 let output_schema = self.input.mapper.output_schema();
1679 if !output_schema.is_empty() {
1680 let names: Vec<_> = output_schema
1681 .column_schemas()
1682 .iter()
1683 .map(|col| &col.name)
1684 .collect();
1685 write!(f, ", \"projection\": {:?}", names)?;
1686 }
1687 if let Some(predicate) = &self.input.predicate.predicate() {
1688 if !predicate.exprs().is_empty() {
1689 let exprs: Vec<_> =
1690 predicate.exprs().iter().map(|e| e.to_string()).collect();
1691 write!(f, ", \"filters\": {:?}", exprs)?;
1692 }
1693 if !predicate.dyn_filters().is_empty() {
1694 let dyn_filters: Vec<_> = predicate
1695 .dyn_filters()
1696 .iter()
1697 .map(|f| format!("{}", f))
1698 .collect();
1699 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1700 }
1701 }
1702 #[cfg(feature = "vector_index")]
1703 if let Some(vector_index_k) = self.input.vector_index_k {
1704 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1705 }
1706 if !self.input.files.is_empty() {
1707 write!(f, ", \"files\": ")?;
1708 f.debug_list()
1709 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1710 .finish()?;
1711 }
1712 write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1713
1714 #[cfg(feature = "enterprise")]
1715 self.format_extension_ranges(f)?;
1716
1717 Ok(())
1718 }
1719 }
1720
1721 write!(f, "{:?}", InputWrapper { input: &self.input })
1722 }
1723
1724 pub(crate) fn add_dyn_filter_to_predicate(
1727 self: &Arc<Self>,
1728 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1729 ) -> Vec<bool> {
1730 let mut supported = Vec::with_capacity(filter_exprs.len());
1731 let filter_expr = filter_exprs
1732 .into_iter()
1733 .filter_map(|expr| {
1734 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1735 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1736 {
1737 supported.push(true);
1738 Some(dyn_filter)
1739 } else {
1740 supported.push(false);
1741 None
1742 }
1743 })
1744 .collect();
1745 self.input.predicate.add_dyn_filters(filter_expr);
1746 supported
1747 }
1748}
1749
1750#[derive(Clone, Default)]
1753pub struct PredicateGroup {
1754 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1755 predicate_all: Predicate,
1757 predicate_without_region: Predicate,
1759 region_partition_expr: Option<PartitionExpr>,
1761}
1762
1763impl PredicateGroup {
1764 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1766 let mut combined_exprs = exprs.to_vec();
1767 let mut region_partition_expr = None;
1768
1769 if let Some(expr_json) = metadata.partition_expr.as_ref()
1770 && !expr_json.is_empty()
1771 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1772 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1773 {
1774 let logical_expr = expr
1775 .try_as_logical_expr()
1776 .context(InvalidPartitionExprSnafu {
1777 expr: expr_json.clone(),
1778 })?;
1779
1780 combined_exprs.push(logical_expr);
1781 region_partition_expr = Some(expr);
1782 }
1783
1784 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1785 let mut columns = HashSet::new();
1787 for expr in &combined_exprs {
1788 columns.clear();
1789 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1790 continue;
1791 };
1792 time_filters.push(filter);
1793 }
1794 let time_filters = if time_filters.is_empty() {
1795 None
1796 } else {
1797 Some(Arc::new(time_filters))
1798 };
1799
1800 let predicate_all = Predicate::new(combined_exprs);
1801 let predicate_without_region = Predicate::new(exprs.to_vec());
1802
1803 Ok(Self {
1804 time_filters,
1805 predicate_all,
1806 predicate_without_region,
1807 region_partition_expr,
1808 })
1809 }
1810
1811 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1813 self.time_filters.clone()
1814 }
1815
1816 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1818 if self.predicate_all.is_empty() {
1819 None
1820 } else {
1821 Some(&self.predicate_all)
1822 }
1823 }
1824
1825 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1827 if self.predicate_without_region.is_empty() {
1828 None
1829 } else {
1830 Some(&self.predicate_without_region)
1831 }
1832 }
1833
1834 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1836 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1837 self.predicate_without_region.add_dyn_filters(dyn_filters);
1838 }
1839
1840 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1842 self.region_partition_expr.as_ref()
1843 }
1844
1845 fn expr_to_filter(
1846 expr: &Expr,
1847 metadata: &RegionMetadata,
1848 columns: &mut HashSet<Column>,
1849 ) -> Option<SimpleFilterEvaluator> {
1850 columns.clear();
1851 expr_to_columns(expr, columns).ok()?;
1854 if columns.len() > 1 {
1855 return None;
1857 }
1858 let column = columns.iter().next()?;
1859 let column_meta = metadata.column_by_name(&column.name)?;
1860 if column_meta.semantic_type == SemanticType::Timestamp {
1861 SimpleFilterEvaluator::try_new(expr)
1862 } else {
1863 None
1864 }
1865 }
1866}
1867
1868#[cfg(test)]
1869mod tests {
1870 use std::sync::Arc;
1871
1872 use datafusion::physical_plan::expressions::lit as physical_lit;
1873 use datafusion_common::ScalarValue;
1874 use datafusion_expr::{col, lit};
1875 use datatypes::value::Value;
1876 use partition::expr::col as partition_col;
1877 use store_api::metadata::RegionMetadataBuilder;
1878 use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
1879
1880 use super::*;
1881 use crate::cache::CacheManager;
1882 use crate::memtable::time_partition::TimePartitions;
1883 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1884 use crate::region::options::RegionOptions;
1885 use crate::region::version::VersionBuilder;
1886 use crate::sst::FormatType;
1887 use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1888 use crate::test_util::scheduler_util::SchedulerEnv;
1889
1890 fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1891 let mutable = Arc::new(TimePartitions::new(
1892 metadata.clone(),
1893 Arc::new(EmptyMemtableBuilder::default()),
1894 0,
1895 None,
1896 ));
1897 Arc::new(VersionBuilder::new(metadata, mutable).build())
1898 }
1899
1900 fn new_version_with_sst_format(
1901 metadata: RegionMetadataRef,
1902 sst_format: Option<FormatType>,
1903 ) -> VersionRef {
1904 let mutable = Arc::new(TimePartitions::new(
1905 metadata.clone(),
1906 Arc::new(EmptyMemtableBuilder::default()),
1907 0,
1908 None,
1909 ));
1910 let options = RegionOptions {
1911 sst_format,
1912 ..Default::default()
1913 };
1914 Arc::new(
1915 VersionBuilder::new(metadata, mutable)
1916 .options(options)
1917 .build(),
1918 )
1919 }
1920
1921 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1922 let env = SchedulerEnv::new().await;
1923 let mapper = ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap();
1924 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1925 let file = FileHandle::new(
1926 crate::sst::file::FileMeta::default(),
1927 Arc::new(crate::sst::file_purger::NoopFilePurger),
1928 );
1929
1930 ScanInput::new(env.access_layer.clone(), mapper)
1931 .with_predicate(predicate)
1932 .with_cache(CacheStrategy::EnableAll(Arc::new(
1933 CacheManager::builder()
1934 .range_result_cache_size(1024)
1935 .build(),
1936 )))
1937 .with_flat_format(true)
1938 .with_files(vec![file])
1939 }
1940
1941 #[tokio::test]
1942 async fn test_build_read_column_ids_includes_filters() {
1943 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1944 let version = new_version(metadata.clone());
1945 let env = SchedulerEnv::new().await;
1946 let request = ScanRequest {
1947 projection: Some(vec![4]),
1948 filters: vec![
1949 col("v0").gt(lit(1)),
1950 col("ts").gt(lit(0)),
1951 col("k0").eq(lit("foo")),
1952 ],
1953 ..Default::default()
1954 };
1955 let scan_region = ScanRegion::new(
1956 version,
1957 env.access_layer.clone(),
1958 request,
1959 CacheStrategy::Disabled,
1960 );
1961 let predicate =
1962 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1963 let projection = scan_region.request.projection.as_ref().unwrap();
1964 let read_ids = scan_region
1965 .build_read_column_ids(projection, &predicate)
1966 .unwrap();
1967 assert_eq!(vec![4, 0, 2, 3], read_ids);
1968 }
1969
1970 #[tokio::test]
1971 async fn test_build_read_column_ids_empty_projection() {
1972 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1973 let version = new_version(metadata.clone());
1974 let env = SchedulerEnv::new().await;
1975 let request = ScanRequest {
1976 projection: Some(vec![]),
1977 ..Default::default()
1978 };
1979 let scan_region = ScanRegion::new(
1980 version,
1981 env.access_layer.clone(),
1982 request,
1983 CacheStrategy::Disabled,
1984 );
1985 let predicate =
1986 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1987 let projection = scan_region.request.projection.as_ref().unwrap();
1988 let read_ids = scan_region
1989 .build_read_column_ids(projection, &predicate)
1990 .unwrap();
1991 assert_eq!(vec![2], read_ids);
1993 }
1994
1995 #[tokio::test]
1996 async fn test_build_read_column_ids_keeps_projection_order() {
1997 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1998 let version = new_version(metadata.clone());
1999 let env = SchedulerEnv::new().await;
2000 let request = ScanRequest {
2001 projection: Some(vec![4, 1]),
2002 filters: vec![col("v0").gt(lit(1))],
2003 ..Default::default()
2004 };
2005 let scan_region = ScanRegion::new(
2006 version,
2007 env.access_layer.clone(),
2008 request,
2009 CacheStrategy::Disabled,
2010 );
2011 let predicate =
2012 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
2013 let projection = scan_region.request.projection.as_ref().unwrap();
2014 let read_ids = scan_region
2015 .build_read_column_ids(projection, &predicate)
2016 .unwrap();
2017 assert_eq!(vec![4, 1, 3], read_ids);
2019 }
2020
2021 #[tokio::test]
2022 async fn test_use_flat_format_honors_request_override() {
2023 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2024 let env = SchedulerEnv::new().await;
2025
2026 let primary_key_version =
2027 new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
2028 let request = ScanRequest::default();
2029 let scan_region = ScanRegion::new(
2030 primary_key_version.clone(),
2031 env.access_layer.clone(),
2032 request,
2033 CacheStrategy::Disabled,
2034 );
2035 assert!(!scan_region.use_flat_format());
2036
2037 let request = ScanRequest {
2038 force_flat_format: true,
2039 ..Default::default()
2040 };
2041 let scan_region = ScanRegion::new(
2042 primary_key_version,
2043 env.access_layer.clone(),
2044 request,
2045 CacheStrategy::Disabled,
2046 );
2047 assert!(scan_region.use_flat_format());
2048
2049 let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
2050 let request = ScanRequest::default();
2051 let scan_region = ScanRegion::new(
2052 flat_version,
2053 env.access_layer.clone(),
2054 request,
2055 CacheStrategy::Disabled,
2056 );
2057 assert!(scan_region.use_flat_format());
2058 }
2059
2060 fn ts_lit(val: i64) -> datafusion_expr::Expr {
2062 lit(ScalarValue::TimestampMillisecond(Some(val), None))
2063 }
2064
2065 #[tokio::test]
2066 async fn test_build_scan_fingerprint_for_eligible_scan() {
2067 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2068 let input = new_scan_input(
2069 metadata.clone(),
2070 vec![
2071 col("ts").gt_eq(ts_lit(1000)),
2072 col("k0").eq(lit("foo")),
2073 col("v0").gt(lit(1)),
2074 ],
2075 )
2076 .await
2077 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2078 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2079 .with_merge_mode(MergeMode::LastNonNull)
2080 .with_filter_deleted(false);
2081
2082 let fingerprint = build_scan_fingerprint(&input).unwrap();
2083
2084 let expected = ScanRequestFingerprintBuilder {
2085 read_column_ids: input.read_column_ids.clone(),
2086 read_column_types: vec![
2087 metadata
2088 .column_by_id(0)
2089 .map(|col| col.column_schema.data_type.clone()),
2090 metadata
2091 .column_by_id(2)
2092 .map(|col| col.column_schema.data_type.clone()),
2093 metadata
2094 .column_by_id(3)
2095 .map(|col| col.column_schema.data_type.clone()),
2096 ],
2097 filters: vec![
2098 col("k0").eq(lit("foo")).to_string(),
2099 col("v0").gt(lit(1)).to_string(),
2100 ],
2101 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2102 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2103 append_mode: false,
2104 filter_deleted: false,
2105 merge_mode: MergeMode::LastNonNull,
2106 partition_expr_version: 0,
2107 }
2108 .build();
2109 assert_eq!(expected, fingerprint);
2110 }
2111
2112 #[tokio::test]
2113 async fn test_build_scan_fingerprint_requires_tag_filter() {
2114 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2115 let input = new_scan_input(
2116 metadata,
2117 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2118 )
2119 .await;
2120
2121 assert!(build_scan_fingerprint(&input).is_none());
2122 }
2123
2124 #[tokio::test]
2125 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2126 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2127 let filters = vec![col("k0").eq(lit("foo"))];
2128
2129 let disabled = ScanInput::new(
2130 SchedulerEnv::new().await.access_layer.clone(),
2131 ProjectionMapper::new(&metadata, [0, 2, 3].into_iter(), true).unwrap(),
2132 )
2133 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap())
2134 .with_flat_format(true);
2135 assert!(build_scan_fingerprint(&disabled).is_none());
2136
2137 let non_flat = new_scan_input(metadata.clone(), filters.clone())
2138 .await
2139 .with_flat_format(false);
2140 assert!(build_scan_fingerprint(&non_flat).is_none());
2141
2142 let compaction = new_scan_input(metadata.clone(), filters.clone())
2143 .await
2144 .with_compaction(true);
2145 assert!(build_scan_fingerprint(&compaction).is_none());
2146
2147 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2149 assert!(build_scan_fingerprint(&no_files).is_none());
2150 }
2151
2152 #[tokio::test]
2153 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2154 let base = metadata_with_primary_key(vec![0, 1], false);
2155 let mut builder = RegionMetadataBuilder::from_existing(base);
2156 let partition_expr = partition_col("k0")
2157 .gt_eq(Value::String("foo".into()))
2158 .as_json_str()
2159 .unwrap();
2160 builder.partition_expr_json(Some(partition_expr));
2161 let metadata = Arc::new(builder.build_without_validation().unwrap());
2162
2163 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2164 let fingerprint = build_scan_fingerprint(&input).unwrap();
2165
2166 let expected = ScanRequestFingerprintBuilder {
2167 read_column_ids: input.read_column_ids.clone(),
2168 read_column_types: vec![
2169 metadata
2170 .column_by_id(0)
2171 .map(|col| col.column_schema.data_type.clone()),
2172 metadata
2173 .column_by_id(2)
2174 .map(|col| col.column_schema.data_type.clone()),
2175 metadata
2176 .column_by_id(3)
2177 .map(|col| col.column_schema.data_type.clone()),
2178 ],
2179 filters: vec![col("k0").eq(lit("foo")).to_string()],
2180 time_filters: vec![],
2181 series_row_selector: None,
2182 append_mode: false,
2183 filter_deleted: true,
2184 merge_mode: MergeMode::LastRow,
2185 partition_expr_version: metadata.partition_expr_version,
2186 }
2187 .build();
2188 assert_eq!(expected, fingerprint);
2189 assert_ne!(0, metadata.partition_expr_version);
2190 }
2191
2192 #[test]
2193 fn test_update_dyn_filters_with_empty_base_predicates() {
2194 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2195 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2196 assert!(predicate_group.predicate().is_none());
2197 assert!(predicate_group.predicate_without_region().is_none());
2198
2199 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2200 predicate_group.add_dyn_filters(vec![dyn_filter]);
2201
2202 let predicate_all = predicate_group.predicate().unwrap();
2203 assert!(predicate_all.exprs().is_empty());
2204 assert_eq!(1, predicate_all.dyn_filters().len());
2205
2206 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2207 assert!(predicate_without_region.exprs().is_empty());
2208 assert_eq!(1, predicate_without_region.dyn_filters().len());
2209 }
2210}