1use std::collections::HashSet;
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion_common::Column;
31use datafusion_expr::Expr;
32use datafusion_expr::utils::expr_to_columns;
33use futures::StreamExt;
34use partition::expr::PartitionExpr;
35use smallvec::SmallVec;
36use snafu::{OptionExt as _, ResultExt};
37use store_api::metadata::{RegionMetadata, RegionMetadataRef};
38use store_api::region_engine::{PartitionRange, RegionScannerRef};
39use store_api::storage::{
40 ColumnId, RegionId, ScanRequest, SequenceRange, TimeSeriesDistribution, TimeSeriesRowSelector,
41};
42use table::predicate::{Predicate, build_time_range_predicate};
43use tokio::sync::{Semaphore, mpsc};
44use tokio_stream::wrappers::ReceiverStream;
45
46use crate::access_layer::AccessLayerRef;
47use crate::cache::CacheStrategy;
48use crate::config::{DEFAULT_MAX_CONCURRENT_SCAN_FILES, DEFAULT_SCAN_CHANNEL_SIZE};
49use crate::error::{InvalidPartitionExprSnafu, InvalidRequestSnafu, Result};
50#[cfg(feature = "enterprise")]
51use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
52use crate::memtable::{MemtableRange, RangesOptions};
53use crate::metrics::READ_SST_COUNT;
54use crate::read::compat::{self, CompatBatch, FlatCompatBatch, PrimaryKeyCompatBatch};
55use crate::read::projection::ProjectionMapper;
56use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
57use crate::read::seq_scan::SeqScan;
58use crate::read::series_scan::SeriesScan;
59use crate::read::stream::ScanBatchStream;
60use crate::read::unordered_scan::UnorderedScan;
61use crate::read::{Batch, BoxedRecordBatchStream, RecordBatch, Source};
62use crate::region::options::MergeMode;
63use crate::region::version::VersionRef;
64use crate::sst::FormatType;
65use crate::sst::file::FileHandle;
66use crate::sst::index::bloom_filter::applier::{
67 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
68};
69use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
70use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
71use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
72use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
73#[cfg(feature = "vector_index")]
74use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
75use crate::sst::parquet::file_range::PreFilterMode;
76use crate::sst::parquet::reader::ReaderMetrics;
77
78const FLAT_SCAN_CHANNEL_SIZE: usize = 2;
80#[cfg(feature = "vector_index")]
81const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
82
83pub(crate) enum Scanner {
85 Seq(SeqScan),
87 Unordered(UnorderedScan),
89 Series(SeriesScan),
91}
92
93impl Scanner {
94 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
96 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
97 match self {
98 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
99 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
100 Scanner::Series(series_scan) => series_scan.build_stream().await,
101 }
102 }
103
104 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
106 match self {
107 Scanner::Seq(x) => x.scan_all_partitions(),
108 Scanner::Unordered(x) => x.scan_all_partitions(),
109 Scanner::Series(x) => x.scan_all_partitions(),
110 }
111 }
112}
113
114#[cfg(test)]
115impl Scanner {
116 pub(crate) fn num_files(&self) -> usize {
118 match self {
119 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
120 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
121 Scanner::Series(series_scan) => series_scan.input().num_files(),
122 }
123 }
124
125 pub(crate) fn num_memtables(&self) -> usize {
127 match self {
128 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
129 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
130 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
131 }
132 }
133
134 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
136 match self {
137 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
138 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
139 Scanner::Series(series_scan) => series_scan.input().file_ids(),
140 }
141 }
142
143 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
144 match self {
145 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
146 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
147 Scanner::Series(series_scan) => series_scan.input().index_ids(),
148 }
149 }
150
151 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
153 use store_api::region_engine::{PrepareRequest, RegionScanner};
154
155 let request = PrepareRequest::default().with_target_partitions(target_partitions);
156 match self {
157 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
158 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
159 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
160 }
161 }
162}
163
164#[cfg_attr(doc, aquamarine::aquamarine)]
165pub(crate) struct ScanRegion {
215 version: VersionRef,
217 access_layer: AccessLayerRef,
219 request: ScanRequest,
221 cache_strategy: CacheStrategy,
223 parallel_scan_channel_size: usize,
225 max_concurrent_scan_files: usize,
227 ignore_inverted_index: bool,
229 ignore_fulltext_index: bool,
231 ignore_bloom_filter: bool,
233 start_time: Option<Instant>,
235 filter_deleted: bool,
238 #[cfg(feature = "enterprise")]
239 extension_range_provider: Option<BoxedExtensionRangeProvider>,
240}
241
242impl ScanRegion {
243 pub(crate) fn new(
245 version: VersionRef,
246 access_layer: AccessLayerRef,
247 request: ScanRequest,
248 cache_strategy: CacheStrategy,
249 ) -> ScanRegion {
250 ScanRegion {
251 version,
252 access_layer,
253 request,
254 cache_strategy,
255 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
256 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
257 ignore_inverted_index: false,
258 ignore_fulltext_index: false,
259 ignore_bloom_filter: false,
260 start_time: None,
261 filter_deleted: true,
262 #[cfg(feature = "enterprise")]
263 extension_range_provider: None,
264 }
265 }
266
267 #[must_use]
269 pub(crate) fn with_parallel_scan_channel_size(
270 mut self,
271 parallel_scan_channel_size: usize,
272 ) -> Self {
273 self.parallel_scan_channel_size = parallel_scan_channel_size;
274 self
275 }
276
277 #[must_use]
279 pub(crate) fn with_max_concurrent_scan_files(
280 mut self,
281 max_concurrent_scan_files: usize,
282 ) -> Self {
283 self.max_concurrent_scan_files = max_concurrent_scan_files;
284 self
285 }
286
287 #[must_use]
289 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
290 self.ignore_inverted_index = ignore;
291 self
292 }
293
294 #[must_use]
296 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
297 self.ignore_fulltext_index = ignore;
298 self
299 }
300
301 #[must_use]
303 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
304 self.ignore_bloom_filter = ignore;
305 self
306 }
307
308 #[must_use]
309 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
310 self.start_time = Some(now);
311 self
312 }
313
314 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
315 self.filter_deleted = filter_deleted;
316 }
317
318 #[cfg(feature = "enterprise")]
319 pub(crate) fn set_extension_range_provider(
320 &mut self,
321 extension_range_provider: BoxedExtensionRangeProvider,
322 ) {
323 self.extension_range_provider = Some(extension_range_provider);
324 }
325
326 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
328 pub(crate) async fn scanner(self) -> Result<Scanner> {
329 if self.use_series_scan() {
330 self.series_scan().await.map(Scanner::Series)
331 } else if self.use_unordered_scan() {
332 self.unordered_scan().await.map(Scanner::Unordered)
335 } else {
336 self.seq_scan().await.map(Scanner::Seq)
337 }
338 }
339
340 #[tracing::instrument(
342 level = tracing::Level::DEBUG,
343 skip_all,
344 fields(region_id = %self.region_id())
345 )]
346 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
347 if self.use_series_scan() {
348 self.series_scan()
349 .await
350 .map(|scanner| Box::new(scanner) as _)
351 } else if self.use_unordered_scan() {
352 self.unordered_scan()
353 .await
354 .map(|scanner| Box::new(scanner) as _)
355 } else {
356 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
357 }
358 }
359
360 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
362 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
363 let input = self.scan_input().await?.with_compaction(false);
364 Ok(SeqScan::new(input))
365 }
366
367 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
369 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
370 let input = self.scan_input().await?;
371 Ok(UnorderedScan::new(input))
372 }
373
374 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
376 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
377 let input = self.scan_input().await?;
378 Ok(SeriesScan::new(input))
379 }
380
381 fn use_unordered_scan(&self) -> bool {
383 self.version.options.append_mode
390 && self.request.series_row_selector.is_none()
391 && (self.request.distribution.is_none()
392 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
393 }
394
395 fn use_series_scan(&self) -> bool {
397 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
398 }
399
400 fn use_flat_format(&self) -> bool {
402 self.request.force_flat_format
403 || self.version.options.sst_format.unwrap_or_default() == FormatType::Flat
404 }
405
406 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
408 async fn scan_input(mut self) -> Result<ScanInput> {
409 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
410 let time_range = self.build_time_range_predicate();
411 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
412 let flat_format = self.use_flat_format();
413
414 let read_column_ids = match &self.request.projection {
415 Some(p) => self.build_read_column_ids(p, &predicate)?,
416 None => self
417 .version
418 .metadata
419 .column_metadatas
420 .iter()
421 .map(|col| col.column_id)
422 .collect(),
423 };
424
425 let mapper = match &self.request.projection {
427 Some(p) => ProjectionMapper::new_with_read_columns(
428 &self.version.metadata,
429 p.iter().copied(),
430 flat_format,
431 read_column_ids.clone(),
432 )?,
433 None => ProjectionMapper::all(&self.version.metadata, flat_format)?,
434 };
435
436 let ssts = &self.version.ssts;
437 let mut files = Vec::new();
438 for level in ssts.levels() {
439 for file in level.files.values() {
440 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
441 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
442 (Some(_), None) => true,
448 (None, _) => true,
449 };
450
451 if exceed_min_sequence && file_in_range(file, &time_range) {
453 files.push(file.clone());
454 }
455 }
459 }
460
461 let memtables = self.version.memtables.list_memtables();
462 let mut mem_range_builders = Vec::new();
464 let filter_mode = pre_filter_mode(
465 self.version.options.append_mode,
466 self.version.options.merge_mode(),
467 );
468
469 for m in memtables {
470 let Some((start, end)) = m.stats().time_range() else {
472 continue;
473 };
474 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
476 if !memtable_range.intersects(&time_range) {
477 continue;
478 }
479 let ranges_in_memtable = m.ranges(
480 Some(read_column_ids.as_slice()),
481 RangesOptions::default()
482 .with_predicate(predicate.clone())
483 .with_sequence(SequenceRange::new(
484 self.request.memtable_min_sequence,
485 self.request.memtable_max_sequence,
486 ))
487 .with_pre_filter_mode(filter_mode),
488 )?;
489 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
490 let stats = v.stats().clone();
491 MemRangeBuilder::new(v, stats)
492 }));
493 }
494
495 let region_id = self.region_id();
496 debug!(
497 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}, flat_format: {}",
498 region_id,
499 self.request,
500 time_range,
501 mem_range_builders.len(),
502 files.len(),
503 self.version.options.append_mode,
504 flat_format,
505 );
506
507 let (non_field_filters, field_filters) = self.partition_by_field_filters();
508 let inverted_index_appliers = [
509 self.build_invereted_index_applier(&non_field_filters),
510 self.build_invereted_index_applier(&field_filters),
511 ];
512 let bloom_filter_appliers = [
513 self.build_bloom_filter_applier(&non_field_filters),
514 self.build_bloom_filter_applier(&field_filters),
515 ];
516 let fulltext_index_appliers = [
517 self.build_fulltext_index_applier(&non_field_filters),
518 self.build_fulltext_index_applier(&field_filters),
519 ];
520 #[cfg(feature = "vector_index")]
521 let vector_index_applier = self.build_vector_index_applier();
522 #[cfg(feature = "vector_index")]
523 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
524 if self.request.filters.is_empty() {
525 search.k
526 } else {
527 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
528 }
529 });
530
531 if flat_format {
532 self.parallel_scan_channel_size = FLAT_SCAN_CHANNEL_SIZE;
534 }
535
536 let input = ScanInput::new(self.access_layer, mapper)
537 .with_time_range(Some(time_range))
538 .with_predicate(predicate)
539 .with_memtables(mem_range_builders)
540 .with_files(files)
541 .with_cache(self.cache_strategy)
542 .with_inverted_index_appliers(inverted_index_appliers)
543 .with_bloom_filter_index_appliers(bloom_filter_appliers)
544 .with_fulltext_index_appliers(fulltext_index_appliers)
545 .with_parallel_scan_channel_size(self.parallel_scan_channel_size)
546 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
547 .with_start_time(self.start_time)
548 .with_append_mode(self.version.options.append_mode)
549 .with_filter_deleted(self.filter_deleted)
550 .with_merge_mode(self.version.options.merge_mode())
551 .with_series_row_selector(self.request.series_row_selector)
552 .with_distribution(self.request.distribution)
553 .with_flat_format(flat_format);
554 #[cfg(feature = "vector_index")]
555 let input = input
556 .with_vector_index_applier(vector_index_applier)
557 .with_vector_index_k(vector_index_k);
558
559 #[cfg(feature = "enterprise")]
560 let input = if let Some(provider) = self.extension_range_provider {
561 let ranges = provider
562 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
563 .await?;
564 debug!("Find extension ranges: {ranges:?}");
565 input.with_extension_ranges(ranges)
566 } else {
567 input
568 };
569 Ok(input)
570 }
571
572 fn region_id(&self) -> RegionId {
573 self.version.metadata.region_id
574 }
575
576 fn build_time_range_predicate(&self) -> TimestampRange {
578 let time_index = self.version.metadata.time_index_column();
579 let unit = time_index
580 .column_schema
581 .data_type
582 .as_timestamp()
583 .expect("Time index must have timestamp-compatible type")
584 .unit();
585 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
586 }
587
588 fn build_read_column_ids(
590 &self,
591 projection: &[usize],
592 predicate: &PredicateGroup,
593 ) -> Result<Vec<ColumnId>> {
594 let metadata = &self.version.metadata;
595 let mut read_column_ids = Vec::new();
597 let mut seen = HashSet::new();
598
599 for idx in projection {
600 let column =
601 metadata
602 .column_metadatas
603 .get(*idx)
604 .with_context(|| InvalidRequestSnafu {
605 region_id: metadata.region_id,
606 reason: format!("projection index {} is out of bound", idx),
607 })?;
608 seen.insert(column.column_id);
609 read_column_ids.push(column.column_id);
611 }
612
613 if projection.is_empty() {
614 let time_index = metadata.time_index_column().column_id;
615 if seen.insert(time_index) {
616 read_column_ids.push(time_index);
617 }
618 }
619
620 let mut extra_names = HashSet::new();
621 let mut columns = HashSet::new();
622
623 for expr in &self.request.filters {
624 columns.clear();
625 if expr_to_columns(expr, &mut columns).is_err() {
626 continue;
627 }
628 extra_names.extend(columns.iter().map(|column| column.name.clone()));
629 }
630
631 if let Some(expr) = predicate.region_partition_expr() {
632 expr.collect_column_names(&mut extra_names);
633 }
634
635 if !extra_names.is_empty() {
636 for column in &metadata.column_metadatas {
637 if extra_names.contains(column.column_schema.name.as_str())
638 && !seen.contains(&column.column_id)
639 {
640 read_column_ids.push(column.column_id);
641 }
642 extra_names.remove(column.column_schema.name.as_str());
643 }
644 if !extra_names.is_empty() {
645 warn!(
646 "Some columns in filters are not found in region {}: {:?}",
647 metadata.region_id, extra_names
648 );
649 }
650 }
651 Ok(read_column_ids)
652 }
653
654 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
657 let field_columns = self
658 .version
659 .metadata
660 .field_columns()
661 .map(|col| &col.column_schema.name)
662 .collect::<HashSet<_>>();
663
664 let mut columns = HashSet::new();
665
666 self.request.filters.iter().cloned().partition(|expr| {
667 columns.clear();
668 if expr_to_columns(expr, &mut columns).is_err() {
670 return true;
672 }
673 !columns
675 .iter()
676 .any(|column| field_columns.contains(&column.name))
677 })
678 }
679
680 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
682 if self.ignore_inverted_index {
683 return None;
684 }
685
686 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
687 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
688
689 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
690
691 InvertedIndexApplierBuilder::new(
692 self.access_layer.table_dir().to_string(),
693 self.access_layer.path_type(),
694 self.access_layer.object_store().clone(),
695 self.version.metadata.as_ref(),
696 self.version.metadata.inverted_indexed_column_ids(
697 self.version
698 .options
699 .index_options
700 .inverted_index
701 .ignore_column_ids
702 .iter(),
703 ),
704 self.access_layer.puffin_manager_factory().clone(),
705 )
706 .with_file_cache(file_cache)
707 .with_inverted_index_cache(inverted_index_cache)
708 .with_puffin_metadata_cache(puffin_metadata_cache)
709 .build(filters)
710 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
711 .ok()
712 .flatten()
713 .map(Arc::new)
714 }
715
716 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
718 if self.ignore_bloom_filter {
719 return None;
720 }
721
722 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
723 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
724 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
725
726 BloomFilterIndexApplierBuilder::new(
727 self.access_layer.table_dir().to_string(),
728 self.access_layer.path_type(),
729 self.access_layer.object_store().clone(),
730 self.version.metadata.as_ref(),
731 self.access_layer.puffin_manager_factory().clone(),
732 )
733 .with_file_cache(file_cache)
734 .with_bloom_filter_index_cache(bloom_filter_index_cache)
735 .with_puffin_metadata_cache(puffin_metadata_cache)
736 .build(filters)
737 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
738 .ok()
739 .flatten()
740 .map(Arc::new)
741 }
742
743 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
745 if self.ignore_fulltext_index {
746 return None;
747 }
748
749 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
750 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
751 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
752 FulltextIndexApplierBuilder::new(
753 self.access_layer.table_dir().to_string(),
754 self.access_layer.path_type(),
755 self.access_layer.object_store().clone(),
756 self.access_layer.puffin_manager_factory().clone(),
757 self.version.metadata.as_ref(),
758 )
759 .with_file_cache(file_cache)
760 .with_puffin_metadata_cache(puffin_metadata_cache)
761 .with_bloom_filter_cache(bloom_filter_index_cache)
762 .build(filters)
763 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
764 .ok()
765 .flatten()
766 .map(Arc::new)
767 }
768
769 #[cfg(feature = "vector_index")]
771 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
772 let vector_search = self.request.vector_search.as_ref()?;
773
774 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
775 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
776 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
777
778 let applier = VectorIndexApplier::new(
779 self.access_layer.table_dir().to_string(),
780 self.access_layer.path_type(),
781 self.access_layer.object_store().clone(),
782 self.access_layer.puffin_manager_factory().clone(),
783 vector_search.column_id,
784 vector_search.query_vector.clone(),
785 vector_search.metric,
786 )
787 .with_file_cache(file_cache)
788 .with_puffin_metadata_cache(puffin_metadata_cache)
789 .with_vector_index_cache(vector_index_cache);
790
791 Some(Arc::new(applier))
792 }
793}
794
795fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
797 if predicate == &TimestampRange::min_to_max() {
798 return true;
799 }
800 let (start, end) = file.time_range();
802 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
803 file_ts_range.intersects(predicate)
804}
805
806pub struct ScanInput {
808 access_layer: AccessLayerRef,
810 pub(crate) mapper: Arc<ProjectionMapper>,
812 pub(crate) read_column_ids: Vec<ColumnId>,
816 time_range: Option<TimestampRange>,
818 pub(crate) predicate: PredicateGroup,
820 region_partition_expr: Option<PartitionExpr>,
822 pub(crate) memtables: Vec<MemRangeBuilder>,
824 pub(crate) files: Vec<FileHandle>,
826 pub(crate) cache_strategy: CacheStrategy,
828 ignore_file_not_found: bool,
830 pub(crate) parallel_scan_channel_size: usize,
832 pub(crate) max_concurrent_scan_files: usize,
834 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
836 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
837 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
838 #[cfg(feature = "vector_index")]
840 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
841 #[cfg(feature = "vector_index")]
843 pub(crate) vector_index_k: Option<usize>,
844 pub(crate) query_start: Option<Instant>,
846 pub(crate) append_mode: bool,
848 pub(crate) filter_deleted: bool,
850 pub(crate) merge_mode: MergeMode,
852 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
854 pub(crate) distribution: Option<TimeSeriesDistribution>,
856 pub(crate) flat_format: bool,
858 pub(crate) compaction: bool,
860 #[cfg(feature = "enterprise")]
861 extension_ranges: Vec<BoxedExtensionRange>,
862}
863
864impl ScanInput {
865 #[must_use]
867 pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
868 ScanInput {
869 access_layer,
870 read_column_ids: mapper.column_ids().to_vec(),
871 mapper: Arc::new(mapper),
872 time_range: None,
873 predicate: PredicateGroup::default(),
874 region_partition_expr: None,
875 memtables: Vec::new(),
876 files: Vec::new(),
877 cache_strategy: CacheStrategy::Disabled,
878 ignore_file_not_found: false,
879 parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
880 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
881 inverted_index_appliers: [None, None],
882 bloom_filter_index_appliers: [None, None],
883 fulltext_index_appliers: [None, None],
884 #[cfg(feature = "vector_index")]
885 vector_index_applier: None,
886 #[cfg(feature = "vector_index")]
887 vector_index_k: None,
888 query_start: None,
889 append_mode: false,
890 filter_deleted: true,
891 merge_mode: MergeMode::default(),
892 series_row_selector: None,
893 distribution: None,
894 flat_format: false,
895 compaction: false,
896 #[cfg(feature = "enterprise")]
897 extension_ranges: Vec::new(),
898 }
899 }
900
901 #[must_use]
903 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
904 self.time_range = time_range;
905 self
906 }
907
908 #[must_use]
910 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
911 self.region_partition_expr = predicate.region_partition_expr().cloned();
912 self.predicate = predicate;
913 self
914 }
915
916 #[must_use]
918 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
919 self.memtables = memtables;
920 self
921 }
922
923 #[must_use]
925 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
926 self.files = files;
927 self
928 }
929
930 #[must_use]
932 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
933 self.cache_strategy = cache;
934 self
935 }
936
937 #[must_use]
939 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
940 self.ignore_file_not_found = ignore;
941 self
942 }
943
944 #[must_use]
946 pub(crate) fn with_parallel_scan_channel_size(
947 mut self,
948 parallel_scan_channel_size: usize,
949 ) -> Self {
950 self.parallel_scan_channel_size = parallel_scan_channel_size;
951 self
952 }
953
954 #[must_use]
956 pub(crate) fn with_max_concurrent_scan_files(
957 mut self,
958 max_concurrent_scan_files: usize,
959 ) -> Self {
960 self.max_concurrent_scan_files = max_concurrent_scan_files;
961 self
962 }
963
964 #[must_use]
966 pub(crate) fn with_inverted_index_appliers(
967 mut self,
968 appliers: [Option<InvertedIndexApplierRef>; 2],
969 ) -> Self {
970 self.inverted_index_appliers = appliers;
971 self
972 }
973
974 #[must_use]
976 pub(crate) fn with_bloom_filter_index_appliers(
977 mut self,
978 appliers: [Option<BloomFilterIndexApplierRef>; 2],
979 ) -> Self {
980 self.bloom_filter_index_appliers = appliers;
981 self
982 }
983
984 #[must_use]
986 pub(crate) fn with_fulltext_index_appliers(
987 mut self,
988 appliers: [Option<FulltextIndexApplierRef>; 2],
989 ) -> Self {
990 self.fulltext_index_appliers = appliers;
991 self
992 }
993
994 #[cfg(feature = "vector_index")]
996 #[must_use]
997 pub(crate) fn with_vector_index_applier(
998 mut self,
999 applier: Option<VectorIndexApplierRef>,
1000 ) -> Self {
1001 self.vector_index_applier = applier;
1002 self
1003 }
1004
1005 #[cfg(feature = "vector_index")]
1007 #[must_use]
1008 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
1009 self.vector_index_k = k;
1010 self
1011 }
1012
1013 #[must_use]
1015 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
1016 self.query_start = now;
1017 self
1018 }
1019
1020 #[must_use]
1021 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1022 self.append_mode = is_append_mode;
1023 self
1024 }
1025
1026 #[must_use]
1028 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1029 self.filter_deleted = filter_deleted;
1030 self
1031 }
1032
1033 #[must_use]
1035 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1036 self.merge_mode = merge_mode;
1037 self
1038 }
1039
1040 #[must_use]
1042 pub(crate) fn with_distribution(
1043 mut self,
1044 distribution: Option<TimeSeriesDistribution>,
1045 ) -> Self {
1046 self.distribution = distribution;
1047 self
1048 }
1049
1050 #[must_use]
1052 pub(crate) fn with_series_row_selector(
1053 mut self,
1054 series_row_selector: Option<TimeSeriesRowSelector>,
1055 ) -> Self {
1056 self.series_row_selector = series_row_selector;
1057 self
1058 }
1059
1060 #[must_use]
1062 pub(crate) fn with_flat_format(mut self, flat_format: bool) -> Self {
1063 self.flat_format = flat_format;
1064 self
1065 }
1066
1067 #[must_use]
1069 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1070 self.compaction = compaction;
1071 self
1072 }
1073
1074 #[tracing::instrument(
1078 skip(self, sources, semaphore),
1079 fields(
1080 region_id = %self.region_metadata().region_id,
1081 source_count = sources.len()
1082 )
1083 )]
1084 pub(crate) fn create_parallel_sources(
1085 &self,
1086 sources: Vec<Source>,
1087 semaphore: Arc<Semaphore>,
1088 ) -> Result<Vec<Source>> {
1089 if sources.len() <= 1 {
1090 return Ok(sources);
1091 }
1092
1093 let sources = sources
1095 .into_iter()
1096 .map(|source| {
1097 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1098 self.spawn_scan_task(source, semaphore.clone(), sender);
1099 let stream = Box::pin(ReceiverStream::new(receiver));
1100 Source::Stream(stream)
1101 })
1102 .collect();
1103 Ok(sources)
1104 }
1105
1106 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1108 let memtable = &self.memtables[index.index];
1109 let mut ranges = SmallVec::new();
1110 memtable.build_ranges(index.row_group_index, &mut ranges);
1111 ranges
1112 }
1113
1114 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1115 if self.should_skip_region_partition(file) {
1116 self.predicate.predicate_without_region().cloned()
1117 } else {
1118 self.predicate.predicate().cloned()
1119 }
1120 }
1121
1122 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1123 match (
1124 self.region_partition_expr.as_ref(),
1125 file.meta_ref().partition_expr.as_ref(),
1126 ) {
1127 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1128 _ => false,
1129 }
1130 }
1131
1132 #[tracing::instrument(
1134 skip_all,
1135 fields(
1136 region_id = %self.region_metadata().region_id,
1137 file_id = %file.file_id()
1138 )
1139 )]
1140 pub async fn prune_file(
1141 &self,
1142 file: &FileHandle,
1143 reader_metrics: &mut ReaderMetrics,
1144 ) -> Result<FileRangeBuilder> {
1145 let predicate = self.predicate_for_file(file);
1146 let filter_mode = pre_filter_mode(self.append_mode, self.merge_mode);
1147 let decode_pk_values = !self.compaction && self.mapper.has_tags();
1148 let reader = self
1149 .access_layer
1150 .read_sst(file.clone())
1151 .predicate(predicate)
1152 .projection(Some(self.read_column_ids.clone()))
1153 .cache(self.cache_strategy.clone())
1154 .inverted_index_appliers(self.inverted_index_appliers.clone())
1155 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1156 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1157 #[cfg(feature = "vector_index")]
1158 let reader = {
1159 let mut reader = reader;
1160 reader =
1161 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1162 reader
1163 };
1164 let res = reader
1165 .expected_metadata(Some(self.mapper.metadata().clone()))
1166 .flat_format(self.flat_format)
1167 .compaction(self.compaction)
1168 .pre_filter_mode(filter_mode)
1169 .decode_primary_key_values(decode_pk_values)
1170 .build_reader_input(reader_metrics)
1171 .await;
1172 let (mut file_range_ctx, selection) = match res {
1173 Ok(x) => x,
1174 Err(e) => {
1175 if e.is_object_not_found() && self.ignore_file_not_found {
1176 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1177 return Ok(FileRangeBuilder::default());
1178 } else {
1179 return Err(e);
1180 }
1181 }
1182 };
1183
1184 let need_compat = !compat::has_same_columns_and_pk_encoding(
1185 self.mapper.metadata(),
1186 file_range_ctx.read_format().metadata(),
1187 );
1188 if need_compat {
1189 let compat = if let Some(flat_format) = file_range_ctx.read_format().as_flat() {
1192 let mapper = self.mapper.as_flat().unwrap();
1193 FlatCompatBatch::try_new(
1194 mapper,
1195 flat_format.metadata(),
1196 flat_format.format_projection(),
1197 self.compaction,
1198 )?
1199 .map(CompatBatch::Flat)
1200 } else {
1201 let compact_batch = PrimaryKeyCompatBatch::new(
1202 &self.mapper,
1203 file_range_ctx.read_format().metadata().clone(),
1204 )?;
1205 Some(CompatBatch::PrimaryKey(compact_batch))
1206 };
1207 file_range_ctx.set_compat_batch(compat);
1208 }
1209 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1210 }
1211
1212 #[tracing::instrument(
1214 skip(self, input, semaphore, sender),
1215 fields(region_id = %self.region_metadata().region_id)
1216 )]
1217 pub(crate) fn spawn_scan_task(
1218 &self,
1219 mut input: Source,
1220 semaphore: Arc<Semaphore>,
1221 sender: mpsc::Sender<Result<Batch>>,
1222 ) {
1223 let region_id = self.region_metadata().region_id;
1224 let span = tracing::info_span!(
1225 "ScanInput::parallel_scan_task",
1226 region_id = %region_id,
1227 stream_kind = "batch"
1228 );
1229 common_runtime::spawn_global(
1230 async move {
1231 loop {
1232 let maybe_batch = {
1235 let _permit = semaphore.acquire().await.unwrap();
1237 input.next_batch().await
1238 };
1239 match maybe_batch {
1240 Ok(Some(batch)) => {
1241 let _ = sender.send(Ok(batch)).await;
1242 }
1243 Ok(None) => break,
1244 Err(e) => {
1245 let _ = sender.send(Err(e)).await;
1246 break;
1247 }
1248 }
1249 }
1250 }
1251 .instrument(span),
1252 );
1253 }
1254
1255 #[tracing::instrument(
1259 skip(self, sources, semaphore),
1260 fields(
1261 region_id = %self.region_metadata().region_id,
1262 source_count = sources.len()
1263 )
1264 )]
1265 pub(crate) fn create_parallel_flat_sources(
1266 &self,
1267 sources: Vec<BoxedRecordBatchStream>,
1268 semaphore: Arc<Semaphore>,
1269 ) -> Result<Vec<BoxedRecordBatchStream>> {
1270 if sources.len() <= 1 {
1271 return Ok(sources);
1272 }
1273
1274 let sources = sources
1276 .into_iter()
1277 .map(|source| {
1278 let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
1279 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1280 let stream = Box::pin(ReceiverStream::new(receiver));
1281 Box::pin(stream) as _
1282 })
1283 .collect();
1284 Ok(sources)
1285 }
1286
1287 #[tracing::instrument(
1289 skip(self, input, semaphore, sender),
1290 fields(region_id = %self.region_metadata().region_id)
1291 )]
1292 pub(crate) fn spawn_flat_scan_task(
1293 &self,
1294 mut input: BoxedRecordBatchStream,
1295 semaphore: Arc<Semaphore>,
1296 sender: mpsc::Sender<Result<RecordBatch>>,
1297 ) {
1298 let region_id = self.region_metadata().region_id;
1299 let span = tracing::info_span!(
1300 "ScanInput::parallel_scan_task",
1301 region_id = %region_id,
1302 stream_kind = "flat"
1303 );
1304 common_runtime::spawn_global(
1305 async move {
1306 loop {
1307 let maybe_batch = {
1310 let _permit = semaphore.acquire().await.unwrap();
1312 input.next().await
1313 };
1314 match maybe_batch {
1315 Some(Ok(batch)) => {
1316 let _ = sender.send(Ok(batch)).await;
1317 }
1318 Some(Err(e)) => {
1319 let _ = sender.send(Err(e)).await;
1320 break;
1321 }
1322 None => break,
1323 }
1324 }
1325 }
1326 .instrument(span),
1327 );
1328 }
1329
1330 pub(crate) fn total_rows(&self) -> usize {
1331 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1332 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1333
1334 let rows = rows_in_files + rows_in_memtables;
1335 #[cfg(feature = "enterprise")]
1336 let rows = rows
1337 + self
1338 .extension_ranges
1339 .iter()
1340 .map(|x| x.num_rows())
1341 .sum::<u64>() as usize;
1342 rows
1343 }
1344
1345 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1346 &self.predicate
1347 }
1348
1349 pub(crate) fn num_memtables(&self) -> usize {
1351 self.memtables.len()
1352 }
1353
1354 pub(crate) fn num_files(&self) -> usize {
1356 self.files.len()
1357 }
1358
1359 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1361 let file_index = index.index - self.num_memtables();
1362 &self.files[file_index]
1363 }
1364
1365 pub fn region_metadata(&self) -> &RegionMetadataRef {
1366 self.mapper.metadata()
1367 }
1368}
1369
1370#[cfg(feature = "enterprise")]
1371impl ScanInput {
1372 #[must_use]
1373 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1374 Self {
1375 extension_ranges,
1376 ..self
1377 }
1378 }
1379
1380 #[cfg(feature = "enterprise")]
1381 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1382 &self.extension_ranges
1383 }
1384
1385 #[cfg(feature = "enterprise")]
1387 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1388 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1389 }
1390}
1391
1392#[cfg(test)]
1393impl ScanInput {
1394 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1396 self.files.iter().map(|file| file.file_id()).collect()
1397 }
1398
1399 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1400 self.files.iter().map(|file| file.index_id()).collect()
1401 }
1402}
1403
1404fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1405 if append_mode {
1406 return PreFilterMode::All;
1407 }
1408
1409 match merge_mode {
1410 MergeMode::LastRow => PreFilterMode::SkipFieldsOnDelete,
1411 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1412 }
1413}
1414
1415pub struct StreamContext {
1418 pub input: ScanInput,
1420 pub(crate) ranges: Vec<RangeMeta>,
1422
1423 pub(crate) query_start: Instant,
1426}
1427
1428impl StreamContext {
1429 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1431 let query_start = input.query_start.unwrap_or_else(Instant::now);
1432 let ranges = RangeMeta::seq_scan_ranges(&input);
1433 READ_SST_COUNT.observe(input.num_files() as f64);
1434
1435 Self {
1436 input,
1437 ranges,
1438 query_start,
1439 }
1440 }
1441
1442 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1444 let query_start = input.query_start.unwrap_or_else(Instant::now);
1445 let ranges = RangeMeta::unordered_scan_ranges(&input);
1446 READ_SST_COUNT.observe(input.num_files() as f64);
1447
1448 Self {
1449 input,
1450 ranges,
1451 query_start,
1452 }
1453 }
1454
1455 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1457 self.input.num_memtables() > index.index
1458 }
1459
1460 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1461 !self.is_mem_range_index(index)
1462 && index.index < self.input.num_files() + self.input.num_memtables()
1463 }
1464
1465 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1467 self.ranges
1468 .iter()
1469 .enumerate()
1470 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1471 .collect()
1472 }
1473
1474 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1476 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1477 for range_meta in &self.ranges {
1478 for idx in &range_meta.row_group_indices {
1479 if self.is_mem_range_index(*idx) {
1480 num_mem_ranges += 1;
1481 } else if self.is_file_range_index(*idx) {
1482 num_file_ranges += 1;
1483 } else {
1484 num_other_ranges += 1;
1485 }
1486 }
1487 }
1488 if verbose {
1489 write!(f, "{{")?;
1490 }
1491 write!(
1492 f,
1493 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1494 self.ranges.len(),
1495 num_mem_ranges,
1496 self.input.num_files(),
1497 num_file_ranges,
1498 )?;
1499 if num_other_ranges > 0 {
1500 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1501 }
1502 write!(f, "}}")?;
1503
1504 if let Some(selector) = &self.input.series_row_selector {
1505 write!(f, ", \"selector\":\"{}\"", selector)?;
1506 }
1507 if let Some(distribution) = &self.input.distribution {
1508 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1509 }
1510
1511 if verbose {
1512 self.format_verbose_content(f)?;
1513 }
1514
1515 Ok(())
1516 }
1517
1518 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1519 struct FileWrapper<'a> {
1520 file: &'a FileHandle,
1521 }
1522
1523 impl fmt::Debug for FileWrapper<'_> {
1524 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1525 let (start, end) = self.file.time_range();
1526 write!(
1527 f,
1528 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1529 self.file.file_id(),
1530 start.value(),
1531 start.unit(),
1532 end.value(),
1533 end.unit(),
1534 self.file.num_rows(),
1535 self.file.size(),
1536 self.file.index_size()
1537 )
1538 }
1539 }
1540
1541 struct InputWrapper<'a> {
1542 input: &'a ScanInput,
1543 }
1544
1545 #[cfg(feature = "enterprise")]
1546 impl InputWrapper<'_> {
1547 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1548 if self.input.extension_ranges.is_empty() {
1549 return Ok(());
1550 }
1551
1552 let mut delimiter = "";
1553 write!(f, ", extension_ranges: [")?;
1554 for range in self.input.extension_ranges() {
1555 write!(f, "{}{:?}", delimiter, range)?;
1556 delimiter = ", ";
1557 }
1558 write!(f, "]")?;
1559 Ok(())
1560 }
1561 }
1562
1563 impl fmt::Debug for InputWrapper<'_> {
1564 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1565 let output_schema = self.input.mapper.output_schema();
1566 if !output_schema.is_empty() {
1567 let names: Vec<_> = output_schema
1568 .column_schemas()
1569 .iter()
1570 .map(|col| &col.name)
1571 .collect();
1572 write!(f, ", \"projection\": {:?}", names)?;
1573 }
1574 if let Some(predicate) = &self.input.predicate.predicate()
1575 && !predicate.exprs().is_empty()
1576 {
1577 let exprs: Vec<_> = predicate.exprs().iter().map(|e| e.to_string()).collect();
1578 write!(f, ", \"filters\": {:?}", exprs)?;
1579 }
1580 #[cfg(feature = "vector_index")]
1581 if let Some(vector_index_k) = self.input.vector_index_k {
1582 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1583 }
1584 if !self.input.files.is_empty() {
1585 write!(f, ", \"files\": ")?;
1586 f.debug_list()
1587 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1588 .finish()?;
1589 }
1590 write!(f, ", \"flat_format\": {}", self.input.flat_format)?;
1591
1592 #[cfg(feature = "enterprise")]
1593 self.format_extension_ranges(f)?;
1594
1595 Ok(())
1596 }
1597 }
1598
1599 write!(f, "{:?}", InputWrapper { input: &self.input })
1600 }
1601}
1602
1603#[derive(Clone, Default)]
1606pub struct PredicateGroup {
1607 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1608 predicate_all: Option<Predicate>,
1610 predicate_without_region: Option<Predicate>,
1612 region_partition_expr: Option<PartitionExpr>,
1614}
1615
1616impl PredicateGroup {
1617 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1619 let mut combined_exprs = exprs.to_vec();
1620 let mut region_partition_expr = None;
1621
1622 if let Some(expr_json) = metadata.partition_expr.as_ref()
1623 && !expr_json.is_empty()
1624 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1625 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1626 {
1627 let logical_expr = expr
1628 .try_as_logical_expr()
1629 .context(InvalidPartitionExprSnafu {
1630 expr: expr_json.clone(),
1631 })?;
1632
1633 combined_exprs.push(logical_expr);
1634 region_partition_expr = Some(expr);
1635 }
1636
1637 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1638 let mut columns = HashSet::new();
1640 for expr in &combined_exprs {
1641 columns.clear();
1642 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1643 continue;
1644 };
1645 time_filters.push(filter);
1646 }
1647 let time_filters = if time_filters.is_empty() {
1648 None
1649 } else {
1650 Some(Arc::new(time_filters))
1651 };
1652
1653 let predicate_all = if combined_exprs.is_empty() {
1654 None
1655 } else {
1656 Some(Predicate::new(combined_exprs))
1657 };
1658 let predicate_without_region = if exprs.is_empty() {
1659 None
1660 } else {
1661 Some(Predicate::new(exprs.to_vec()))
1662 };
1663
1664 Ok(Self {
1665 time_filters,
1666 predicate_all,
1667 predicate_without_region,
1668 region_partition_expr,
1669 })
1670 }
1671
1672 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1674 self.time_filters.clone()
1675 }
1676
1677 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1679 self.predicate_all.as_ref()
1680 }
1681
1682 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1684 self.predicate_without_region.as_ref()
1685 }
1686
1687 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1689 self.region_partition_expr.as_ref()
1690 }
1691
1692 fn expr_to_filter(
1693 expr: &Expr,
1694 metadata: &RegionMetadata,
1695 columns: &mut HashSet<Column>,
1696 ) -> Option<SimpleFilterEvaluator> {
1697 columns.clear();
1698 expr_to_columns(expr, columns).ok()?;
1701 if columns.len() > 1 {
1702 return None;
1704 }
1705 let column = columns.iter().next()?;
1706 let column_meta = metadata.column_by_name(&column.name)?;
1707 if column_meta.semantic_type == SemanticType::Timestamp {
1708 SimpleFilterEvaluator::try_new(expr)
1709 } else {
1710 None
1711 }
1712 }
1713}
1714
1715#[cfg(test)]
1716mod tests {
1717 use std::sync::Arc;
1718
1719 use datafusion_expr::{col, lit};
1720 use store_api::storage::ScanRequest;
1721
1722 use super::*;
1723 use crate::memtable::time_partition::TimePartitions;
1724 use crate::region::options::RegionOptions;
1725 use crate::region::version::VersionBuilder;
1726 use crate::sst::FormatType;
1727 use crate::test_util::memtable_util::{EmptyMemtableBuilder, metadata_with_primary_key};
1728 use crate::test_util::scheduler_util::SchedulerEnv;
1729
1730 fn new_version(metadata: RegionMetadataRef) -> VersionRef {
1731 let mutable = Arc::new(TimePartitions::new(
1732 metadata.clone(),
1733 Arc::new(EmptyMemtableBuilder::default()),
1734 0,
1735 None,
1736 ));
1737 Arc::new(VersionBuilder::new(metadata, mutable).build())
1738 }
1739
1740 fn new_version_with_sst_format(
1741 metadata: RegionMetadataRef,
1742 sst_format: Option<FormatType>,
1743 ) -> VersionRef {
1744 let mutable = Arc::new(TimePartitions::new(
1745 metadata.clone(),
1746 Arc::new(EmptyMemtableBuilder::default()),
1747 0,
1748 None,
1749 ));
1750 let options = RegionOptions {
1751 sst_format,
1752 ..Default::default()
1753 };
1754 Arc::new(
1755 VersionBuilder::new(metadata, mutable)
1756 .options(options)
1757 .build(),
1758 )
1759 }
1760
1761 #[tokio::test]
1762 async fn test_build_read_column_ids_includes_filters() {
1763 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1764 let version = new_version(metadata.clone());
1765 let env = SchedulerEnv::new().await;
1766 let request = ScanRequest {
1767 projection: Some(vec![4]),
1768 filters: vec![
1769 col("v0").gt(lit(1)),
1770 col("ts").gt(lit(0)),
1771 col("k0").eq(lit("foo")),
1772 ],
1773 ..Default::default()
1774 };
1775 let scan_region = ScanRegion::new(
1776 version,
1777 env.access_layer.clone(),
1778 request,
1779 CacheStrategy::Disabled,
1780 );
1781 let predicate =
1782 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1783 let projection = scan_region.request.projection.as_ref().unwrap();
1784 let read_ids = scan_region
1785 .build_read_column_ids(projection, &predicate)
1786 .unwrap();
1787 assert_eq!(vec![4, 0, 2, 3], read_ids);
1788 }
1789
1790 #[tokio::test]
1791 async fn test_build_read_column_ids_empty_projection() {
1792 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1793 let version = new_version(metadata.clone());
1794 let env = SchedulerEnv::new().await;
1795 let request = ScanRequest {
1796 projection: Some(vec![]),
1797 ..Default::default()
1798 };
1799 let scan_region = ScanRegion::new(
1800 version,
1801 env.access_layer.clone(),
1802 request,
1803 CacheStrategy::Disabled,
1804 );
1805 let predicate =
1806 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1807 let projection = scan_region.request.projection.as_ref().unwrap();
1808 let read_ids = scan_region
1809 .build_read_column_ids(projection, &predicate)
1810 .unwrap();
1811 assert_eq!(vec![2], read_ids);
1813 }
1814
1815 #[tokio::test]
1816 async fn test_build_read_column_ids_keeps_projection_order() {
1817 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1818 let version = new_version(metadata.clone());
1819 let env = SchedulerEnv::new().await;
1820 let request = ScanRequest {
1821 projection: Some(vec![4, 1]),
1822 filters: vec![col("v0").gt(lit(1))],
1823 ..Default::default()
1824 };
1825 let scan_region = ScanRegion::new(
1826 version,
1827 env.access_layer.clone(),
1828 request,
1829 CacheStrategy::Disabled,
1830 );
1831 let predicate =
1832 PredicateGroup::new(metadata.as_ref(), &scan_region.request.filters).unwrap();
1833 let projection = scan_region.request.projection.as_ref().unwrap();
1834 let read_ids = scan_region
1835 .build_read_column_ids(projection, &predicate)
1836 .unwrap();
1837 assert_eq!(vec![4, 1, 3], read_ids);
1839 }
1840
1841 #[tokio::test]
1842 async fn test_use_flat_format_honors_request_override() {
1843 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1844 let env = SchedulerEnv::new().await;
1845
1846 let primary_key_version =
1847 new_version_with_sst_format(metadata.clone(), Some(FormatType::PrimaryKey));
1848 let request = ScanRequest::default();
1849 let scan_region = ScanRegion::new(
1850 primary_key_version.clone(),
1851 env.access_layer.clone(),
1852 request,
1853 CacheStrategy::Disabled,
1854 );
1855 assert!(!scan_region.use_flat_format());
1856
1857 let request = ScanRequest {
1858 force_flat_format: true,
1859 ..Default::default()
1860 };
1861 let scan_region = ScanRegion::new(
1862 primary_key_version,
1863 env.access_layer.clone(),
1864 request,
1865 CacheStrategy::Disabled,
1866 );
1867 assert!(scan_region.use_flat_format());
1868
1869 let flat_version = new_version_with_sst_format(metadata, Some(FormatType::Flat));
1870 let request = ScanRequest::default();
1871 let scan_region = ScanRegion::new(
1872 flat_version,
1873 env.access_layer.clone(),
1874 request,
1875 CacheStrategy::Disabled,
1876 );
1877 assert!(scan_region.use_flat_format());
1878 }
1879}