1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::data_type::ConcreteDataType;
35use datatypes::extension::json::is_json_extension_type;
36use datatypes::schema::Schema;
37use datatypes::schema::ext::ArrowSchemaExt;
38use datatypes::types::json_type::JsonNativeType;
39use futures::StreamExt;
40use partition::expr::PartitionExpr;
41use smallvec::SmallVec;
42use snafu::ResultExt;
43use store_api::metadata::{RegionMetadata, RegionMetadataRef};
44use store_api::region_engine::{PartitionRange, RegionScannerRef};
45use store_api::storage::{
46 RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
47 TimeSeriesRowSelector,
48};
49use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
50use tokio::sync::{Semaphore, mpsc};
51use tokio_stream::wrappers::ReceiverStream;
52
53use crate::access_layer::AccessLayerRef;
54use crate::cache::CacheStrategy;
55use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
56use crate::error::{InvalidPartitionExprSnafu, Result};
57#[cfg(feature = "enterprise")]
58use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
59use crate::memtable::{MemtableRange, RangesOptions};
60use crate::metrics::READ_SST_COUNT;
61use crate::read::compat::{self, FlatCompatBatch};
62use crate::read::flat_projection::FlatProjectionMapper;
63use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
64use crate::read::range_cache::ScanRequestFingerprint;
65use crate::read::read_columns::{
66 ReadColumns, merge, read_columns_from_predicate, read_columns_from_projection,
67};
68use crate::read::seq_scan::SeqScan;
69use crate::read::series_scan::SeriesScan;
70use crate::read::stream::ScanBatchStream;
71use crate::read::unordered_scan::UnorderedScan;
72use crate::read::{BoxedRecordBatchStream, RecordBatch};
73use crate::region::options::MergeMode;
74use crate::region::version::VersionRef;
75use crate::sst::file::FileHandle;
76use crate::sst::index::bloom_filter::applier::{
77 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
78};
79use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
80use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
81use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
82use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
83#[cfg(feature = "vector_index")]
84use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
85use crate::sst::parquet::file_range::PreFilterMode;
86use crate::sst::parquet::reader::ReaderMetrics;
87
88#[cfg(feature = "vector_index")]
89const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
90
91pub(crate) enum Scanner {
93 Seq(SeqScan),
95 Unordered(UnorderedScan),
97 Series(SeriesScan),
99}
100
101impl Scanner {
102 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
104 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
105 match self {
106 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
107 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
108 Scanner::Series(series_scan) => series_scan.build_stream().await,
109 }
110 }
111
112 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
114 match self {
115 Scanner::Seq(x) => x.scan_all_partitions(),
116 Scanner::Unordered(x) => x.scan_all_partitions(),
117 Scanner::Series(x) => x.scan_all_partitions(),
118 }
119 }
120}
121
122#[cfg(test)]
123impl Scanner {
124 pub(crate) fn num_files(&self) -> usize {
126 match self {
127 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
128 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
129 Scanner::Series(series_scan) => series_scan.input().num_files(),
130 }
131 }
132
133 pub(crate) fn num_memtables(&self) -> usize {
135 match self {
136 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
137 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
138 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
139 }
140 }
141
142 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
144 match self {
145 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
146 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
147 Scanner::Series(series_scan) => series_scan.input().file_ids(),
148 }
149 }
150
151 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
152 match self {
153 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
154 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
155 Scanner::Series(series_scan) => series_scan.input().index_ids(),
156 }
157 }
158
159 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
160 match self {
161 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
162 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
163 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
164 }
165 }
166
167 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
169 use store_api::region_engine::{PrepareRequest, RegionScanner};
170
171 let request = PrepareRequest::default().with_target_partitions(target_partitions);
172 match self {
173 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
174 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
175 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
176 }
177 }
178}
179
180#[cfg_attr(doc, aquamarine::aquamarine)]
181pub(crate) struct ScanRegion {
231 version: VersionRef,
233 access_layer: AccessLayerRef,
235 request: ScanRequest,
237 cache_strategy: CacheStrategy,
239 max_concurrent_scan_files: usize,
241 ignore_inverted_index: bool,
243 ignore_fulltext_index: bool,
245 ignore_bloom_filter: bool,
247 start_time: Option<Instant>,
249 filter_deleted: bool,
252 #[cfg(feature = "enterprise")]
253 extension_range_provider: Option<BoxedExtensionRangeProvider>,
254}
255
256impl ScanRegion {
257 pub(crate) fn new(
259 version: VersionRef,
260 access_layer: AccessLayerRef,
261 request: ScanRequest,
262 cache_strategy: CacheStrategy,
263 ) -> ScanRegion {
264 ScanRegion {
265 version,
266 access_layer,
267 request,
268 cache_strategy,
269 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
270 ignore_inverted_index: false,
271 ignore_fulltext_index: false,
272 ignore_bloom_filter: false,
273 start_time: None,
274 filter_deleted: true,
275 #[cfg(feature = "enterprise")]
276 extension_range_provider: None,
277 }
278 }
279
280 #[must_use]
282 pub(crate) fn with_max_concurrent_scan_files(
283 mut self,
284 max_concurrent_scan_files: usize,
285 ) -> Self {
286 self.max_concurrent_scan_files = max_concurrent_scan_files;
287 self
288 }
289
290 #[must_use]
292 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
293 self.ignore_inverted_index = ignore;
294 self
295 }
296
297 #[must_use]
299 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
300 self.ignore_fulltext_index = ignore;
301 self
302 }
303
304 #[must_use]
306 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
307 self.ignore_bloom_filter = ignore;
308 self
309 }
310
311 #[must_use]
312 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
313 self.start_time = Some(now);
314 self
315 }
316
317 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
318 self.filter_deleted = filter_deleted;
319 }
320
321 #[cfg(feature = "enterprise")]
322 pub(crate) fn set_extension_range_provider(
323 &mut self,
324 extension_range_provider: BoxedExtensionRangeProvider,
325 ) {
326 self.extension_range_provider = Some(extension_range_provider);
327 }
328
329 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
331 pub(crate) async fn scanner(self) -> Result<Scanner> {
332 if self.use_series_scan() {
333 self.series_scan().await.map(Scanner::Series)
334 } else if self.use_unordered_scan() {
335 self.unordered_scan().await.map(Scanner::Unordered)
338 } else {
339 self.seq_scan().await.map(Scanner::Seq)
340 }
341 }
342
343 #[tracing::instrument(
345 level = tracing::Level::DEBUG,
346 skip_all,
347 fields(region_id = %self.region_id())
348 )]
349 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
350 if self.use_series_scan() {
351 self.series_scan()
352 .await
353 .map(|scanner| Box::new(scanner) as _)
354 } else if self.use_unordered_scan() {
355 self.unordered_scan()
356 .await
357 .map(|scanner| Box::new(scanner) as _)
358 } else {
359 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
360 }
361 }
362
363 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
365 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
366 let input = self.scan_input().await?.with_compaction(false);
367 Ok(SeqScan::new(input))
368 }
369
370 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
372 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
373 let input = self.scan_input().await?;
374 Ok(UnorderedScan::new(input))
375 }
376
377 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
379 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
380 let input = self.scan_input().await?;
381 Ok(SeriesScan::new(input))
382 }
383
384 fn use_unordered_scan(&self) -> bool {
386 self.version.options.append_mode
393 && self.request.series_row_selector.is_none()
394 && (self.request.distribution.is_none()
395 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
396 }
397
398 fn use_series_scan(&self) -> bool {
400 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
401 }
402
403 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
405 async fn scan_input(self) -> Result<ScanInput> {
406 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
407 let time_range = self.build_time_range_predicate();
408 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
409
410 let read_cols = match &self.request.projection_input {
411 Some(p) => {
412 let metadata = &self.version.metadata;
415 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
416 let from_predicate = read_columns_from_predicate(&predicate, metadata);
417 merge(from_projection, from_predicate)
418 }
419 None => {
420 let read_col_ids = self
421 .version
422 .metadata
423 .column_metadatas
424 .iter()
425 .map(|col| col.column_id);
426 ReadColumns::from_deduped_column_ids(read_col_ids)
427 }
428 };
429 let read_col_ids = read_cols.column_ids();
430
431 let mut mapper = match self.request.projection_indices() {
433 Some(p) => FlatProjectionMapper::new_with_read_columns(
434 &self.version.metadata,
435 p.to_vec(),
436 read_cols,
437 )?,
438 None => FlatProjectionMapper::all(&self.version.metadata)?,
439 };
440 concretize_json_types(&mut mapper, &self.request.json_type_hint);
441
442 let ssts = &self.version.ssts;
443 let mut files = Vec::new();
444 for level in ssts.levels() {
445 for file in level.files.values() {
446 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
447 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
448 (Some(_), None) => true,
454 (None, _) => true,
455 };
456
457 if exceed_min_sequence && file_in_range(file, &time_range) {
459 files.push(file.clone());
460 }
461 }
465 }
466
467 let memtables = self.version.memtables.list_memtables();
468 let mut mem_range_builders = Vec::new();
470 let filter_mode = pre_filter_mode(
471 self.version.options.append_mode,
472 self.version.options.merge_mode(),
473 );
474
475 for m in memtables {
476 let Some((start, end)) = m.stats().time_range() else {
478 continue;
479 };
480 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
482 if !memtable_range.intersects(&time_range) {
483 continue;
484 }
485 let ranges_in_memtable = m.ranges(
486 Some(&read_col_ids),
487 RangesOptions::default()
488 .with_predicate(predicate.clone())
489 .with_sequence(SequenceRange::new(
490 self.request.memtable_min_sequence,
491 self.request.memtable_max_sequence,
492 ))
493 .with_pre_filter_mode(filter_mode),
494 )?;
495 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
496 let stats = v.stats().clone();
497 MemRangeBuilder::new(v, stats)
498 }));
499 }
500
501 let region_id = self.region_id();
502 debug!(
503 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
504 region_id,
505 self.request,
506 time_range,
507 mem_range_builders.len(),
508 files.len(),
509 self.version.options.append_mode,
510 );
511
512 let (non_field_filters, field_filters) = self.partition_by_field_filters();
513 let inverted_index_appliers = [
514 self.build_invereted_index_applier(&non_field_filters),
515 self.build_invereted_index_applier(&field_filters),
516 ];
517 let bloom_filter_appliers = [
518 self.build_bloom_filter_applier(&non_field_filters),
519 self.build_bloom_filter_applier(&field_filters),
520 ];
521 let fulltext_index_appliers = [
522 self.build_fulltext_index_applier(&non_field_filters),
523 self.build_fulltext_index_applier(&field_filters),
524 ];
525 #[cfg(feature = "vector_index")]
526 let vector_index_applier = self.build_vector_index_applier();
527 #[cfg(feature = "vector_index")]
528 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
529 if self.request.filters.is_empty() {
530 search.k
531 } else {
532 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
533 }
534 });
535
536 let input = ScanInput::new(self.access_layer, mapper)
537 .with_time_range(Some(time_range))
538 .with_predicate(predicate)
539 .with_memtables(mem_range_builders)
540 .with_files(files)
541 .with_cache(self.cache_strategy)
542 .with_inverted_index_appliers(inverted_index_appliers)
543 .with_bloom_filter_index_appliers(bloom_filter_appliers)
544 .with_fulltext_index_appliers(fulltext_index_appliers)
545 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
546 .with_start_time(self.start_time)
547 .with_append_mode(self.version.options.append_mode)
548 .with_filter_deleted(self.filter_deleted)
549 .with_merge_mode(self.version.options.merge_mode())
550 .with_series_row_selector(self.request.series_row_selector)
551 .with_distribution(self.request.distribution)
552 .with_explain_flat_format(
553 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
554 )
555 .with_snapshot_sequence(
556 self.request
557 .snapshot_on_scan
558 .then_some(self.request.memtable_max_sequence)
559 .flatten(),
560 );
561 #[cfg(feature = "vector_index")]
562 let input = input
563 .with_vector_index_applier(vector_index_applier)
564 .with_vector_index_k(vector_index_k);
565
566 #[cfg(feature = "enterprise")]
567 let input = if let Some(provider) = self.extension_range_provider {
568 let ranges = provider
569 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
570 .await?;
571 debug!("Find extension ranges: {ranges:?}");
572 input.with_extension_ranges(ranges)
573 } else {
574 input
575 };
576 Ok(input)
577 }
578
579 fn region_id(&self) -> RegionId {
580 self.version.metadata.region_id
581 }
582
583 fn build_time_range_predicate(&self) -> TimestampRange {
585 let time_index = self.version.metadata.time_index_column();
586 let unit = time_index
587 .column_schema
588 .data_type
589 .as_timestamp()
590 .expect("Time index must have timestamp-compatible type")
591 .unit();
592 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
593 }
594
595 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
598 let field_columns = self
599 .version
600 .metadata
601 .field_columns()
602 .map(|col| &col.column_schema.name)
603 .collect::<HashSet<_>>();
604
605 let mut columns = HashSet::new();
606
607 self.request.filters.iter().cloned().partition(|expr| {
608 columns.clear();
609 if expr_to_columns(expr, &mut columns).is_err() {
611 return true;
613 }
614 !columns
616 .iter()
617 .any(|column| field_columns.contains(&column.name))
618 })
619 }
620
621 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
623 if self.ignore_inverted_index {
624 return None;
625 }
626
627 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
628 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
629
630 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
631
632 InvertedIndexApplierBuilder::new(
633 self.access_layer.table_dir().to_string(),
634 self.access_layer.path_type(),
635 self.access_layer.object_store().clone(),
636 self.version.metadata.as_ref(),
637 self.version.metadata.inverted_indexed_column_ids(
638 self.version
639 .options
640 .index_options
641 .inverted_index
642 .ignore_column_ids
643 .iter(),
644 ),
645 self.access_layer.puffin_manager_factory().clone(),
646 )
647 .with_file_cache(file_cache)
648 .with_inverted_index_cache(inverted_index_cache)
649 .with_puffin_metadata_cache(puffin_metadata_cache)
650 .build(filters)
651 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
652 .ok()
653 .flatten()
654 .map(Arc::new)
655 }
656
657 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
659 if self.ignore_bloom_filter {
660 return None;
661 }
662
663 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
664 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
665 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
666
667 BloomFilterIndexApplierBuilder::new(
668 self.access_layer.table_dir().to_string(),
669 self.access_layer.path_type(),
670 self.access_layer.object_store().clone(),
671 self.version.metadata.as_ref(),
672 self.access_layer.puffin_manager_factory().clone(),
673 )
674 .with_file_cache(file_cache)
675 .with_bloom_filter_index_cache(bloom_filter_index_cache)
676 .with_puffin_metadata_cache(puffin_metadata_cache)
677 .build(filters)
678 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
679 .ok()
680 .flatten()
681 .map(Arc::new)
682 }
683
684 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
686 if self.ignore_fulltext_index {
687 return None;
688 }
689
690 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
691 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
693 FulltextIndexApplierBuilder::new(
694 self.access_layer.table_dir().to_string(),
695 self.access_layer.path_type(),
696 self.access_layer.object_store().clone(),
697 self.access_layer.puffin_manager_factory().clone(),
698 self.version.metadata.as_ref(),
699 )
700 .with_file_cache(file_cache)
701 .with_puffin_metadata_cache(puffin_metadata_cache)
702 .with_bloom_filter_cache(bloom_filter_index_cache)
703 .build(filters)
704 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
705 .ok()
706 .flatten()
707 .map(Arc::new)
708 }
709
710 #[cfg(feature = "vector_index")]
712 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
713 let vector_search = self.request.vector_search.as_ref()?;
714
715 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
716 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
717 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
718
719 let applier = VectorIndexApplier::new(
720 self.access_layer.table_dir().to_string(),
721 self.access_layer.path_type(),
722 self.access_layer.object_store().clone(),
723 self.access_layer.puffin_manager_factory().clone(),
724 vector_search.column_id,
725 vector_search.query_vector.clone(),
726 vector_search.metric,
727 )
728 .with_file_cache(file_cache)
729 .with_puffin_metadata_cache(puffin_metadata_cache)
730 .with_vector_index_cache(vector_index_cache);
731
732 Some(Arc::new(applier))
733 }
734}
735
736fn concretize_json_types(
737 mapper: &mut FlatProjectionMapper,
738 json_type_hint: &HashMap<String, JsonNativeType>,
739) {
740 let output_schema = mapper.output_schema();
741 let output_arrow_schema = output_schema.arrow_schema();
742 if !output_arrow_schema.has_json_extension_field() {
743 return;
744 }
745
746 let mut column_schemas = output_schema.column_schemas().to_vec();
747 for (idx, column_schema) in column_schemas.iter_mut().enumerate() {
748 if !is_json_extension_type(&output_arrow_schema.fields()[idx]) {
749 continue;
750 }
751 let Some(json_type) = json_type_hint.get(&column_schema.name) else {
752 continue;
753 };
754 column_schema.data_type = ConcreteDataType::from(json_type);
755 }
756
757 let output_schema = Arc::new(Schema::new_with_version(
758 column_schemas,
759 output_schema.version(),
760 ));
761 mapper.with_output_schema(output_schema);
762}
763
764fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
766 if predicate == &TimestampRange::min_to_max() {
767 return true;
768 }
769 let (start, end) = file.time_range();
771 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
772 file_ts_range.intersects(predicate)
773}
774
775pub struct ScanInput {
777 access_layer: AccessLayerRef,
779 pub(crate) mapper: Arc<FlatProjectionMapper>,
781 pub(crate) read_cols: ReadColumns,
785 pub(crate) time_range: Option<TimestampRange>,
787 pub(crate) predicate: PredicateGroup,
789 region_partition_expr: Option<PartitionExpr>,
791 pub(crate) memtables: Vec<MemRangeBuilder>,
793 pub(crate) files: Vec<FileHandle>,
795 pub(crate) cache_strategy: CacheStrategy,
797 ignore_file_not_found: bool,
799 pub(crate) max_concurrent_scan_files: usize,
801 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
803 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
804 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
805 #[cfg(feature = "vector_index")]
807 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
808 #[cfg(feature = "vector_index")]
810 pub(crate) vector_index_k: Option<usize>,
811 pub(crate) query_start: Option<Instant>,
813 pub(crate) append_mode: bool,
815 pub(crate) filter_deleted: bool,
817 pub(crate) merge_mode: MergeMode,
819 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
821 pub(crate) distribution: Option<TimeSeriesDistribution>,
823 explain_flat_format: bool,
825 pub(crate) snapshot_sequence: Option<SequenceNumber>,
827 pub(crate) compaction: bool,
829 #[cfg(feature = "enterprise")]
830 extension_ranges: Vec<BoxedExtensionRange>,
831}
832
833impl ScanInput {
834 #[must_use]
836 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
837 ScanInput {
838 access_layer,
839 read_cols: mapper.read_columns().clone(),
840 mapper: Arc::new(mapper),
841 time_range: None,
842 predicate: PredicateGroup::default(),
843 region_partition_expr: None,
844 memtables: Vec::new(),
845 files: Vec::new(),
846 cache_strategy: CacheStrategy::Disabled,
847 ignore_file_not_found: false,
848 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
849 inverted_index_appliers: [None, None],
850 bloom_filter_index_appliers: [None, None],
851 fulltext_index_appliers: [None, None],
852 #[cfg(feature = "vector_index")]
853 vector_index_applier: None,
854 #[cfg(feature = "vector_index")]
855 vector_index_k: None,
856 query_start: None,
857 append_mode: false,
858 filter_deleted: true,
859 merge_mode: MergeMode::default(),
860 series_row_selector: None,
861 distribution: None,
862 explain_flat_format: false,
863 snapshot_sequence: None,
864 compaction: false,
865 #[cfg(feature = "enterprise")]
866 extension_ranges: Vec::new(),
867 }
868 }
869
870 #[must_use]
872 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
873 self.time_range = time_range;
874 self
875 }
876
877 #[must_use]
879 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
880 self.region_partition_expr = predicate.region_partition_expr().cloned();
881 self.predicate = predicate;
882 self
883 }
884
885 #[must_use]
887 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
888 self.memtables = memtables;
889 self
890 }
891
892 #[must_use]
894 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
895 self.files = files;
896 self
897 }
898
899 #[must_use]
901 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
902 self.cache_strategy = cache;
903 self
904 }
905
906 #[must_use]
908 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
909 self.ignore_file_not_found = ignore;
910 self
911 }
912
913 #[must_use]
915 pub(crate) fn with_max_concurrent_scan_files(
916 mut self,
917 max_concurrent_scan_files: usize,
918 ) -> Self {
919 self.max_concurrent_scan_files = max_concurrent_scan_files;
920 self
921 }
922
923 #[must_use]
925 pub(crate) fn with_inverted_index_appliers(
926 mut self,
927 appliers: [Option<InvertedIndexApplierRef>; 2],
928 ) -> Self {
929 self.inverted_index_appliers = appliers;
930 self
931 }
932
933 #[must_use]
935 pub(crate) fn with_bloom_filter_index_appliers(
936 mut self,
937 appliers: [Option<BloomFilterIndexApplierRef>; 2],
938 ) -> Self {
939 self.bloom_filter_index_appliers = appliers;
940 self
941 }
942
943 #[must_use]
945 pub(crate) fn with_fulltext_index_appliers(
946 mut self,
947 appliers: [Option<FulltextIndexApplierRef>; 2],
948 ) -> Self {
949 self.fulltext_index_appliers = appliers;
950 self
951 }
952
953 #[cfg(feature = "vector_index")]
955 #[must_use]
956 pub(crate) fn with_vector_index_applier(
957 mut self,
958 applier: Option<VectorIndexApplierRef>,
959 ) -> Self {
960 self.vector_index_applier = applier;
961 self
962 }
963
964 #[cfg(feature = "vector_index")]
966 #[must_use]
967 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
968 self.vector_index_k = k;
969 self
970 }
971
972 #[must_use]
974 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
975 self.query_start = now;
976 self
977 }
978
979 #[must_use]
980 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
981 self.append_mode = is_append_mode;
982 self
983 }
984
985 #[must_use]
987 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
988 self.filter_deleted = filter_deleted;
989 self
990 }
991
992 #[must_use]
994 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
995 self.merge_mode = merge_mode;
996 self
997 }
998
999 #[must_use]
1001 pub(crate) fn with_distribution(
1002 mut self,
1003 distribution: Option<TimeSeriesDistribution>,
1004 ) -> Self {
1005 self.distribution = distribution;
1006 self
1007 }
1008
1009 #[must_use]
1011 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1012 self.explain_flat_format = explain_flat_format;
1013 self
1014 }
1015
1016 #[must_use]
1018 pub(crate) fn with_series_row_selector(
1019 mut self,
1020 series_row_selector: Option<TimeSeriesRowSelector>,
1021 ) -> Self {
1022 self.series_row_selector = series_row_selector;
1023 self
1024 }
1025
1026 #[must_use]
1027 pub(crate) fn with_snapshot_sequence(
1028 mut self,
1029 snapshot_sequence: Option<SequenceNumber>,
1030 ) -> Self {
1031 self.snapshot_sequence = snapshot_sequence;
1032 self
1033 }
1034
1035 #[must_use]
1037 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1038 self.compaction = compaction;
1039 self
1040 }
1041
1042 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1044 let memtable = &self.memtables[index.index];
1045 let mut ranges = SmallVec::new();
1046 memtable.build_ranges(index.row_group_index, &mut ranges);
1047 ranges
1048 }
1049
1050 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1051 if self.should_skip_region_partition(file) {
1052 self.predicate.predicate_without_region().cloned()
1053 } else {
1054 self.predicate.predicate().cloned()
1055 }
1056 }
1057
1058 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1059 match (
1060 self.region_partition_expr.as_ref(),
1061 file.meta_ref().partition_expr.as_ref(),
1062 ) {
1063 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1064 _ => false,
1065 }
1066 }
1067
1068 #[tracing::instrument(
1070 skip_all,
1071 fields(
1072 region_id = %self.region_metadata().region_id,
1073 file_id = %file.file_id()
1074 )
1075 )]
1076 pub async fn prune_file(
1077 &self,
1078 file: &FileHandle,
1079 pre_filter_mode: PreFilterMode,
1080 reader_metrics: &mut ReaderMetrics,
1081 ) -> Result<FileRangeBuilder> {
1082 let predicate = self.predicate_for_file(file);
1083 let decode_pk_values = !self.compaction
1084 && self
1085 .mapper
1086 .read_columns()
1087 .column_ids_iter()
1088 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1089 let reader = self
1090 .access_layer
1091 .read_sst(file.clone())
1092 .predicate(predicate)
1093 .projection(Some(self.read_cols.clone()))
1094 .cache(self.cache_strategy.clone())
1095 .inverted_index_appliers(self.inverted_index_appliers.clone())
1096 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1097 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1098 #[cfg(feature = "vector_index")]
1099 let reader = {
1100 let mut reader = reader;
1101 reader =
1102 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1103 reader
1104 };
1105 let res = reader
1106 .expected_metadata(Some(self.mapper.metadata().clone()))
1107 .compaction(self.compaction)
1108 .pre_filter_mode(pre_filter_mode)
1109 .decode_primary_key_values(decode_pk_values)
1110 .build_reader_input(reader_metrics)
1111 .await;
1112 let read_input = match res {
1113 Ok(x) => x,
1114 Err(e) => {
1115 if e.is_object_not_found() && self.ignore_file_not_found {
1116 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1117 return Ok(FileRangeBuilder::default());
1118 } else {
1119 return Err(e);
1120 }
1121 }
1122 };
1123
1124 let Some((mut file_range_ctx, selection)) = read_input else {
1125 return Ok(FileRangeBuilder::default());
1126 };
1127
1128 let need_compat = !compat::has_same_columns_and_pk_encoding(
1129 self.mapper.metadata(),
1130 file_range_ctx.read_format().metadata(),
1131 );
1132 if need_compat {
1133 let compat = FlatCompatBatch::try_new(
1136 &self.mapper,
1137 file_range_ctx.read_format().metadata(),
1138 file_range_ctx.read_format().format_projection(),
1139 self.compaction,
1140 )?;
1141 file_range_ctx.set_compat_batch(compat);
1142 }
1143 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1144 }
1145
1146 #[tracing::instrument(
1150 skip(self, sources, semaphore),
1151 fields(
1152 region_id = %self.region_metadata().region_id,
1153 source_count = sources.len()
1154 )
1155 )]
1156 pub(crate) fn create_parallel_flat_sources(
1157 &self,
1158 sources: Vec<BoxedRecordBatchStream>,
1159 semaphore: Arc<Semaphore>,
1160 channel_size: usize,
1161 ) -> Result<Vec<BoxedRecordBatchStream>> {
1162 if sources.len() <= 1 {
1163 return Ok(sources);
1164 }
1165
1166 let sources = sources
1168 .into_iter()
1169 .map(|source| {
1170 let (sender, receiver) = mpsc::channel(channel_size);
1171 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1172 let stream = Box::pin(ReceiverStream::new(receiver));
1173 Box::pin(stream) as _
1174 })
1175 .collect();
1176 Ok(sources)
1177 }
1178
1179 #[tracing::instrument(
1181 skip(self, input, semaphore, sender),
1182 fields(region_id = %self.region_metadata().region_id)
1183 )]
1184 pub(crate) fn spawn_flat_scan_task(
1185 &self,
1186 mut input: BoxedRecordBatchStream,
1187 semaphore: Arc<Semaphore>,
1188 sender: mpsc::Sender<Result<RecordBatch>>,
1189 ) {
1190 let region_id = self.region_metadata().region_id;
1191 let span = tracing::info_span!(
1192 "ScanInput::parallel_scan_task",
1193 region_id = %region_id,
1194 stream_kind = "flat"
1195 );
1196 common_runtime::spawn_global(
1197 async move {
1198 loop {
1199 let maybe_batch = {
1202 let _permit = semaphore.acquire().await.unwrap();
1204 input.next().await
1205 };
1206 match maybe_batch {
1207 Some(Ok(batch)) => {
1208 let _ = sender.send(Ok(batch)).await;
1209 }
1210 Some(Err(e)) => {
1211 let _ = sender.send(Err(e)).await;
1212 break;
1213 }
1214 None => break,
1215 }
1216 }
1217 }
1218 .instrument(span),
1219 );
1220 }
1221
1222 pub(crate) fn total_rows(&self) -> usize {
1223 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1224 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1225
1226 let rows = rows_in_files + rows_in_memtables;
1227 #[cfg(feature = "enterprise")]
1228 let rows = rows
1229 + self
1230 .extension_ranges
1231 .iter()
1232 .map(|x| x.num_rows())
1233 .sum::<u64>() as usize;
1234 rows
1235 }
1236
1237 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1238 &self.predicate
1239 }
1240
1241 pub(crate) fn num_memtables(&self) -> usize {
1243 self.memtables.len()
1244 }
1245
1246 pub(crate) fn num_files(&self) -> usize {
1248 self.files.len()
1249 }
1250
1251 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1253 let file_index = index.index - self.num_memtables();
1254 &self.files[file_index]
1255 }
1256
1257 pub fn region_metadata(&self) -> &RegionMetadataRef {
1258 self.mapper.metadata()
1259 }
1260
1261 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1262 if source_count <= 1 {
1263 return PreFilterMode::All;
1268 }
1269
1270 pre_filter_mode(self.append_mode, self.merge_mode)
1271 }
1272}
1273
1274#[cfg(feature = "enterprise")]
1275impl ScanInput {
1276 #[must_use]
1277 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1278 Self {
1279 extension_ranges,
1280 ..self
1281 }
1282 }
1283
1284 #[cfg(feature = "enterprise")]
1285 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1286 &self.extension_ranges
1287 }
1288
1289 #[cfg(feature = "enterprise")]
1291 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1292 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1293 }
1294}
1295
1296#[cfg(test)]
1297impl ScanInput {
1298 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1300 self.files.iter().map(|file| file.file_id()).collect()
1301 }
1302
1303 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1304 self.files.iter().map(|file| file.index_id()).collect()
1305 }
1306}
1307
1308fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1309 if append_mode {
1310 return PreFilterMode::All;
1311 }
1312
1313 match merge_mode {
1314 MergeMode::LastRow => PreFilterMode::SkipFields,
1315 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1316 }
1317}
1318
1319pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanRequestFingerprint> {
1322 let eligible = !input.compaction
1323 && !input.files.is_empty()
1324 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1325
1326 if !eligible {
1327 return None;
1328 }
1329
1330 let metadata = input.region_metadata();
1331 let tag_names: HashSet<&str> = metadata
1332 .column_metadatas
1333 .iter()
1334 .filter(|col| col.semantic_type == SemanticType::Tag)
1335 .map(|col| col.column_schema.name.as_str())
1336 .collect();
1337
1338 let time_index = metadata.time_index_column();
1339 let time_index_name = time_index.column_schema.name.clone();
1340 let ts_col_unit = time_index
1341 .column_schema
1342 .data_type
1343 .as_timestamp()
1344 .expect("Time index must have timestamp-compatible type")
1345 .unit();
1346
1347 let exprs = input
1348 .predicate_group()
1349 .predicate_without_region()
1350 .map(|predicate| predicate.exprs())
1351 .unwrap_or_default();
1352
1353 let mut filters = Vec::new();
1354 let mut time_filters = Vec::new();
1355 let mut has_tag_filter = false;
1356 let mut columns = HashSet::new();
1357
1358 for expr in exprs {
1359 columns.clear();
1360 let is_time_only = match expr_to_columns(expr, &mut columns) {
1361 Ok(()) if !columns.is_empty() => {
1362 has_tag_filter |= columns
1363 .iter()
1364 .any(|col| tag_names.contains(col.name.as_str()));
1365 columns.iter().all(|col| col.name == time_index_name)
1366 }
1367 _ => false,
1368 };
1369
1370 if is_time_only
1381 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1382 {
1383 time_filters.push(expr.to_string());
1384 } else {
1385 filters.push(expr.to_string());
1386 }
1387 }
1388
1389 if !has_tag_filter {
1390 return None;
1392 }
1393
1394 filters.sort_unstable();
1396 time_filters.sort_unstable();
1397 let read_columns = input.read_cols.clone();
1398 Some(
1399 crate::read::range_cache::ScanRequestFingerprintBuilder {
1400 read_column_types: read_columns
1401 .column_ids_iter()
1402 .map(|id| {
1403 metadata
1404 .column_by_id(id)
1405 .map(|col| col.column_schema.data_type.clone())
1406 })
1407 .collect(),
1408 read_columns,
1409 filters,
1410 time_filters,
1411 series_row_selector: input.series_row_selector,
1412 append_mode: input.append_mode,
1413 filter_deleted: input.filter_deleted,
1414 merge_mode: input.merge_mode,
1415 partition_expr_version: metadata.partition_expr_version,
1416 }
1417 .build(),
1418 )
1419}
1420
1421pub struct StreamContext {
1424 pub input: ScanInput,
1426 pub(crate) ranges: Vec<RangeMeta>,
1428 #[allow(dead_code)]
1431 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1432
1433 pub(crate) query_start: Instant,
1436}
1437
1438impl StreamContext {
1439 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1441 let query_start = input.query_start.unwrap_or_else(Instant::now);
1442 let ranges = RangeMeta::seq_scan_ranges(&input);
1443 READ_SST_COUNT.observe(input.num_files() as f64);
1444 let scan_fingerprint = build_scan_fingerprint(&input);
1445
1446 Self {
1447 input,
1448 ranges,
1449 scan_fingerprint,
1450 query_start,
1451 }
1452 }
1453
1454 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1456 let query_start = input.query_start.unwrap_or_else(Instant::now);
1457 let ranges = RangeMeta::unordered_scan_ranges(&input);
1458 READ_SST_COUNT.observe(input.num_files() as f64);
1459 let scan_fingerprint = build_scan_fingerprint(&input);
1460
1461 Self {
1462 input,
1463 ranges,
1464 scan_fingerprint,
1465 query_start,
1466 }
1467 }
1468
1469 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1471 self.input.num_memtables() > index.index
1472 }
1473
1474 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1475 !self.is_mem_range_index(index)
1476 && index.index < self.input.num_files() + self.input.num_memtables()
1477 }
1478
1479 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1480 let range_meta = &self.ranges[part_range.identifier];
1481 let source_count = range_meta.indices.len();
1482
1483 self.input.range_pre_filter_mode(source_count)
1484 }
1485
1486 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1488 self.ranges
1489 .iter()
1490 .enumerate()
1491 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1492 .collect()
1493 }
1494
1495 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1497 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1498 for range_meta in &self.ranges {
1499 for idx in &range_meta.row_group_indices {
1500 if self.is_mem_range_index(*idx) {
1501 num_mem_ranges += 1;
1502 } else if self.is_file_range_index(*idx) {
1503 num_file_ranges += 1;
1504 } else {
1505 num_other_ranges += 1;
1506 }
1507 }
1508 }
1509 if verbose {
1510 write!(f, "{{")?;
1511 }
1512 write!(
1513 f,
1514 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1515 self.ranges.len(),
1516 num_mem_ranges,
1517 self.input.num_files(),
1518 num_file_ranges,
1519 )?;
1520 if num_other_ranges > 0 {
1521 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1522 }
1523 write!(f, "}}")?;
1524
1525 if let Some(selector) = &self.input.series_row_selector {
1526 write!(f, ", \"selector\":\"{}\"", selector)?;
1527 }
1528 if let Some(distribution) = &self.input.distribution {
1529 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1530 }
1531
1532 if verbose {
1533 self.format_verbose_content(f)?;
1534 }
1535
1536 Ok(())
1537 }
1538
1539 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1540 struct FileWrapper<'a> {
1541 file: &'a FileHandle,
1542 }
1543
1544 impl fmt::Debug for FileWrapper<'_> {
1545 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1546 let (start, end) = self.file.time_range();
1547 write!(
1548 f,
1549 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1550 self.file.file_id(),
1551 start.value(),
1552 start.unit(),
1553 end.value(),
1554 end.unit(),
1555 self.file.num_rows(),
1556 self.file.size(),
1557 self.file.index_size()
1558 )
1559 }
1560 }
1561
1562 struct InputWrapper<'a> {
1563 input: &'a ScanInput,
1564 }
1565
1566 #[cfg(feature = "enterprise")]
1567 impl InputWrapper<'_> {
1568 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1569 if self.input.extension_ranges.is_empty() {
1570 return Ok(());
1571 }
1572
1573 let mut delimiter = "";
1574 write!(f, ", extension_ranges: [")?;
1575 for range in self.input.extension_ranges() {
1576 write!(f, "{}{:?}", delimiter, range)?;
1577 delimiter = ", ";
1578 }
1579 write!(f, "]")?;
1580 Ok(())
1581 }
1582 }
1583
1584 impl fmt::Debug for InputWrapper<'_> {
1585 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1586 let output_schema = self.input.mapper.output_schema();
1587 if !output_schema.is_empty() {
1588 let names: Vec<_> = output_schema
1589 .column_schemas()
1590 .iter()
1591 .map(|col| &col.name)
1592 .collect();
1593 write!(f, ", \"projection\": {:?}", names)?;
1594 }
1595 if let Some(predicate) = &self.input.predicate.predicate() {
1596 if !predicate.exprs().is_empty() {
1597 let exprs: Vec<_> =
1598 predicate.exprs().iter().map(|e| e.to_string()).collect();
1599 write!(f, ", \"filters\": {:?}", exprs)?;
1600 }
1601 if !predicate.dyn_filters().is_empty() {
1602 let dyn_filters: Vec<_> = predicate
1603 .dyn_filters()
1604 .iter()
1605 .map(|f| format!("{}", f))
1606 .collect();
1607 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1608 }
1609 }
1610 #[cfg(feature = "vector_index")]
1611 if let Some(vector_index_k) = self.input.vector_index_k {
1612 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1613 }
1614 if !self.input.files.is_empty() {
1615 write!(f, ", \"files\": ")?;
1616 f.debug_list()
1617 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1618 .finish()?;
1619 }
1620 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1621 #[cfg(feature = "enterprise")]
1622 self.format_extension_ranges(f)?;
1623
1624 Ok(())
1625 }
1626 }
1627
1628 write!(f, "{:?}", InputWrapper { input: &self.input })
1629 }
1630
1631 pub(crate) fn add_dyn_filter_to_predicate(
1634 self: &Arc<Self>,
1635 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1636 ) -> Vec<bool> {
1637 let mut supported = Vec::with_capacity(filter_exprs.len());
1638 let filter_expr = filter_exprs
1639 .into_iter()
1640 .filter_map(|expr| {
1641 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1642 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1643 {
1644 supported.push(true);
1645 Some(dyn_filter)
1646 } else {
1647 supported.push(false);
1648 None
1649 }
1650 })
1651 .collect();
1652 self.input.predicate.add_dyn_filters(filter_expr);
1653 supported
1654 }
1655}
1656
1657#[derive(Clone, Default)]
1660pub struct PredicateGroup {
1661 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1662 predicate_all: Predicate,
1664 predicate_without_region: Predicate,
1666 region_partition_expr: Option<PartitionExpr>,
1668}
1669
1670impl PredicateGroup {
1671 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1673 let mut combined_exprs = exprs.to_vec();
1674 let mut region_partition_expr = None;
1675
1676 if let Some(expr_json) = metadata.partition_expr.as_ref()
1677 && !expr_json.is_empty()
1678 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1679 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1680 {
1681 let logical_expr = expr
1682 .try_as_logical_expr()
1683 .context(InvalidPartitionExprSnafu {
1684 expr: expr_json.clone(),
1685 })?;
1686
1687 combined_exprs.push(logical_expr);
1688 region_partition_expr = Some(expr);
1689 }
1690
1691 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1692 let mut columns = HashSet::new();
1694 for expr in &combined_exprs {
1695 columns.clear();
1696 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1697 continue;
1698 };
1699 time_filters.push(filter);
1700 }
1701 let time_filters = if time_filters.is_empty() {
1702 None
1703 } else {
1704 Some(Arc::new(time_filters))
1705 };
1706
1707 let predicate_all = Predicate::new(combined_exprs);
1708 let predicate_without_region = Predicate::new(exprs.to_vec());
1709
1710 Ok(Self {
1711 time_filters,
1712 predicate_all,
1713 predicate_without_region,
1714 region_partition_expr,
1715 })
1716 }
1717
1718 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1720 self.time_filters.clone()
1721 }
1722
1723 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1725 if self.predicate_all.is_empty() {
1726 None
1727 } else {
1728 Some(&self.predicate_all)
1729 }
1730 }
1731
1732 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1734 if self.predicate_without_region.is_empty() {
1735 None
1736 } else {
1737 Some(&self.predicate_without_region)
1738 }
1739 }
1740
1741 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1743 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1744 self.predicate_without_region.add_dyn_filters(dyn_filters);
1745 }
1746
1747 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1749 self.region_partition_expr.as_ref()
1750 }
1751
1752 fn expr_to_filter(
1753 expr: &Expr,
1754 metadata: &RegionMetadata,
1755 columns: &mut HashSet<Column>,
1756 ) -> Option<SimpleFilterEvaluator> {
1757 columns.clear();
1758 expr_to_columns(expr, columns).ok()?;
1761 if columns.len() > 1 {
1762 return None;
1764 }
1765 let column = columns.iter().next()?;
1766 let column_meta = metadata.column_by_name(&column.name)?;
1767 if column_meta.semantic_type == SemanticType::Timestamp {
1768 SimpleFilterEvaluator::try_new(expr)
1769 } else {
1770 None
1771 }
1772 }
1773}
1774
1775#[cfg(test)]
1776mod tests {
1777 use std::collections::HashMap;
1778 use std::sync::Arc;
1779
1780 use datafusion::physical_plan::expressions::lit as physical_lit;
1781 use datafusion_common::ScalarValue;
1782 use datafusion_expr::{col, lit};
1783 use datatypes::extension::json::{JsonExtensionType, JsonMetadata};
1784 use datatypes::schema::ColumnSchema;
1785 use datatypes::types::json_type::JsonObjectType;
1786 use datatypes::value::Value;
1787 use partition::expr::col as partition_col;
1788 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1789 use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1790
1791 use super::*;
1792 use crate::cache::CacheManager;
1793 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1794 use crate::test_util::memtable_util::metadata_with_primary_key;
1795 use crate::test_util::scheduler_util::SchedulerEnv;
1796
1797 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1798 let env = SchedulerEnv::new().await;
1799 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1800 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1801 let file = FileHandle::new(
1802 crate::sst::file::FileMeta::default(),
1803 Arc::new(crate::sst::file_purger::NoopFilePurger),
1804 );
1805
1806 ScanInput::new(env.access_layer.clone(), mapper)
1807 .with_predicate(predicate)
1808 .with_cache(CacheStrategy::EnableAll(Arc::new(
1809 CacheManager::builder()
1810 .range_result_cache_size(1024)
1811 .build(),
1812 )))
1813 .with_files(vec![file])
1814 }
1815
1816 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1818 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1819 }
1820
1821 fn metadata_with_json_field() -> RegionMetadataRef {
1822 let mut json_column = ColumnSchema::new(
1823 "payload",
1824 ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
1825 true,
1826 );
1827 json_column
1828 .with_extension_type(&JsonExtensionType::new(Arc::new(JsonMetadata::default())))
1829 .unwrap();
1830
1831 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 1));
1832 builder
1833 .push_column_metadata(ColumnMetadata {
1834 column_schema: ColumnSchema::new(
1835 "host",
1836 ConcreteDataType::string_datatype(),
1837 false,
1838 ),
1839 semantic_type: SemanticType::Tag,
1840 column_id: 1,
1841 })
1842 .push_column_metadata(ColumnMetadata {
1843 column_schema: ColumnSchema::new(
1844 "ts",
1845 ConcreteDataType::timestamp_millisecond_datatype(),
1846 false,
1847 ),
1848 semantic_type: SemanticType::Timestamp,
1849 column_id: 2,
1850 })
1851 .push_column_metadata(ColumnMetadata {
1852 column_schema: json_column,
1853 semantic_type: SemanticType::Field,
1854 column_id: 3,
1855 })
1856 .push_column_metadata(ColumnMetadata {
1857 column_schema: ColumnSchema::new("value", ConcreteDataType::int64_datatype(), true),
1858 semantic_type: SemanticType::Field,
1859 column_id: 4,
1860 })
1861 .primary_key(vec![1]);
1862
1863 Arc::new(builder.build().unwrap())
1864 }
1865
1866 fn concrete_json_type_hint() -> JsonNativeType {
1867 JsonNativeType::Object(JsonObjectType::from([
1868 ("active".to_string(), JsonNativeType::Bool),
1869 ("name".to_string(), JsonNativeType::String),
1870 ]))
1871 }
1872
1873 #[test]
1874 fn test_concretize_json_types_rewrites_json_output_schema() -> Result<()> {
1875 let metadata = metadata_with_json_field();
1876 let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3])?;
1877 let output_schema = mapper.output_schema();
1878 let original_arrow_schema = output_schema.arrow_schema();
1879 assert!(original_arrow_schema.has_json_extension_field());
1880 assert!(is_json_extension_type(&original_arrow_schema.fields()[1]));
1881
1882 let expected_type = concrete_json_type_hint();
1883 concretize_json_types(
1884 &mut mapper,
1885 &HashMap::from([("payload".to_string(), expected_type.clone())]),
1886 );
1887
1888 let output_schema = mapper.output_schema();
1889 let output_arrow_schema = output_schema.arrow_schema();
1890 assert!(output_arrow_schema.has_json_extension_field());
1891 assert_eq!(
1892 ConcreteDataType::string_datatype(),
1893 output_schema.column_schemas()[0].data_type
1894 );
1895 assert_eq!(
1896 ConcreteDataType::from(&expected_type),
1897 output_schema.column_schemas()[1].data_type
1898 );
1899 assert_eq!(
1900 ConcreteDataType::int64_datatype(),
1901 output_schema.column_schemas()[2].data_type
1902 );
1903 Ok(())
1904 }
1905
1906 #[test]
1907 fn test_concretize_json_types_keeps_json_without_hint() {
1908 let metadata = metadata_with_json_field();
1909 let mut mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1910
1911 concretize_json_types(
1912 &mut mapper,
1913 &HashMap::from([("missing".to_string(), JsonNativeType::String)]),
1914 );
1915
1916 let output_schema = mapper.output_schema();
1917 let output_arrow_schema = output_schema.arrow_schema();
1918 assert!(output_arrow_schema.has_json_extension_field());
1919 assert!(is_json_extension_type(&output_arrow_schema.fields()[1]));
1920
1921 assert_eq!(
1923 ConcreteDataType::from(&JsonNativeType::Object(JsonObjectType::new())),
1924 output_schema.column_schemas()[1].data_type
1925 );
1926 }
1927
1928 #[tokio::test]
1929 async fn test_build_scan_fingerprint_for_eligible_scan() {
1930 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1931 let input = new_scan_input(
1932 metadata.clone(),
1933 vec![
1934 col("ts").gt_eq(ts_lit(1000)),
1935 col("k0").eq(lit("foo")),
1936 col("v0").gt(lit(1)),
1937 ],
1938 )
1939 .await
1940 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1941 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1942 .with_merge_mode(MergeMode::LastNonNull)
1943 .with_filter_deleted(false);
1944
1945 let fingerprint = build_scan_fingerprint(&input).unwrap();
1946
1947 let expected = ScanRequestFingerprintBuilder {
1948 read_columns: input.read_cols,
1949 read_column_types: vec![
1950 metadata
1951 .column_by_id(0)
1952 .map(|col| col.column_schema.data_type.clone()),
1953 metadata
1954 .column_by_id(2)
1955 .map(|col| col.column_schema.data_type.clone()),
1956 metadata
1957 .column_by_id(3)
1958 .map(|col| col.column_schema.data_type.clone()),
1959 ],
1960 filters: vec![
1961 col("k0").eq(lit("foo")).to_string(),
1962 col("v0").gt(lit(1)).to_string(),
1963 ],
1964 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
1965 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
1966 append_mode: false,
1967 filter_deleted: false,
1968 merge_mode: MergeMode::LastNonNull,
1969 partition_expr_version: 0,
1970 }
1971 .build();
1972 assert_eq!(expected, fingerprint);
1973 }
1974
1975 #[tokio::test]
1976 async fn test_build_scan_fingerprint_requires_tag_filter() {
1977 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1978 let input = new_scan_input(
1979 metadata,
1980 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
1981 )
1982 .await;
1983
1984 assert!(build_scan_fingerprint(&input).is_none());
1985 }
1986
1987 #[tokio::test]
1988 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
1989 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1990 let filters = vec![col("k0").eq(lit("foo"))];
1991
1992 let disabled = ScanInput::new(
1993 SchedulerEnv::new().await.access_layer.clone(),
1994 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
1995 )
1996 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
1997 assert!(build_scan_fingerprint(&disabled).is_none());
1998
1999 let compaction = new_scan_input(metadata.clone(), filters.clone())
2000 .await
2001 .with_compaction(true);
2002 assert!(build_scan_fingerprint(&compaction).is_none());
2003
2004 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2006 assert!(build_scan_fingerprint(&no_files).is_none());
2007 }
2008
2009 #[tokio::test]
2010 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2011 let base = metadata_with_primary_key(vec![0, 1], false);
2012 let mut builder = RegionMetadataBuilder::from_existing(base);
2013 let partition_expr = partition_col("k0")
2014 .gt_eq(Value::String("foo".into()))
2015 .as_json_str()
2016 .unwrap();
2017 builder.partition_expr_json(Some(partition_expr));
2018 let metadata = Arc::new(builder.build_without_validation().unwrap());
2019
2020 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2021 let fingerprint = build_scan_fingerprint(&input).unwrap();
2022
2023 let expected = ScanRequestFingerprintBuilder {
2024 read_columns: input.read_cols,
2025 read_column_types: vec![
2026 metadata
2027 .column_by_id(0)
2028 .map(|col| col.column_schema.data_type.clone()),
2029 metadata
2030 .column_by_id(2)
2031 .map(|col| col.column_schema.data_type.clone()),
2032 metadata
2033 .column_by_id(3)
2034 .map(|col| col.column_schema.data_type.clone()),
2035 ],
2036 filters: vec![col("k0").eq(lit("foo")).to_string()],
2037 time_filters: vec![],
2038 series_row_selector: None,
2039 append_mode: false,
2040 filter_deleted: true,
2041 merge_mode: MergeMode::LastRow,
2042 partition_expr_version: metadata.partition_expr_version,
2043 }
2044 .build();
2045 assert_eq!(expected, fingerprint);
2046 assert_ne!(0, metadata.partition_expr_version);
2047 }
2048
2049 #[test]
2050 fn test_update_dyn_filters_with_empty_base_predicates() {
2051 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2052 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2053 assert!(predicate_group.predicate().is_none());
2054 assert!(predicate_group.predicate_without_region().is_none());
2055
2056 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2057 predicate_group.add_dyn_filters(vec![dyn_filter]);
2058
2059 let predicate_all = predicate_group.predicate().unwrap();
2060 assert!(predicate_all.exprs().is_empty());
2061 assert_eq!(1, predicate_all.dyn_filters().len());
2062
2063 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2064 assert!(predicate_without_region.exprs().is_empty());
2065 assert_eq!(1, predicate_without_region.dyn_filters().len());
2066 }
2067
2068 #[tokio::test]
2069 async fn test_range_pre_filter_mode() {
2070 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2071 let cases = [
2072 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2073 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2074 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2075 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2076 ];
2077
2078 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2079 let input = new_scan_input(metadata.clone(), vec![])
2080 .await
2081 .with_append_mode(append_mode)
2082 .with_merge_mode(merge_mode);
2083
2084 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2085 }
2086 }
2087}