1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::extension::json::is_structured_json_field;
35use datatypes::types::json_type::JsonNativeType;
36use futures::StreamExt;
37use itertools::Itertools;
38use partition::expr::PartitionExpr;
39use smallvec::SmallVec;
40use snafu::ResultExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::region_engine::{PartitionRange, RegionScannerRef};
43use store_api::storage::{
44 NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
45 TimeSeriesRowSelector,
46};
47use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
48use tokio::sync::{Semaphore, mpsc};
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::access_layer::AccessLayerRef;
52use crate::cache::CacheStrategy;
53use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
54use crate::error::{InvalidPartitionExprSnafu, Result};
55#[cfg(feature = "enterprise")]
56use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
57use crate::memtable::{MemtableRange, RangesOptions};
58use crate::metrics::READ_SST_COUNT;
59use crate::read::compat::{self, FlatCompatBatch};
60use crate::read::flat_projection::FlatProjectionMapper;
61use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
62use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
63use crate::read::read_columns::{
64 ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
65 read_columns_from_projection,
66};
67use crate::read::seq_scan::SeqScan;
68use crate::read::series_scan::SeriesScan;
69use crate::read::stream::ScanBatchStream;
70use crate::read::unordered_scan::UnorderedScan;
71use crate::read::{BoxedRecordBatchStream, RecordBatch};
72use crate::region::options::MergeMode;
73use crate::region::version::VersionRef;
74use crate::sst::file::FileHandle;
75use crate::sst::index::bloom_filter::applier::{
76 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
77};
78use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
79use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
80use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
81use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
82#[cfg(feature = "vector_index")]
83use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
84use crate::sst::parquet::file_range::PreFilterMode;
85use crate::sst::parquet::reader::ReaderMetrics;
86
87#[cfg(feature = "vector_index")]
88const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
89
90pub(crate) enum Scanner {
92 Seq(SeqScan),
94 Unordered(UnorderedScan),
96 Series(SeriesScan),
98}
99
100impl Scanner {
101 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
103 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104 match self {
105 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
106 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
107 Scanner::Series(series_scan) => series_scan.build_stream().await,
108 }
109 }
110
111 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
113 match self {
114 Scanner::Seq(x) => x.scan_all_partitions(),
115 Scanner::Unordered(x) => x.scan_all_partitions(),
116 Scanner::Series(x) => x.scan_all_partitions(),
117 }
118 }
119}
120
121#[cfg(test)]
122impl Scanner {
123 pub(crate) fn num_files(&self) -> usize {
125 match self {
126 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
127 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
128 Scanner::Series(series_scan) => series_scan.input().num_files(),
129 }
130 }
131
132 pub(crate) fn num_memtables(&self) -> usize {
134 match self {
135 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
136 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
137 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
138 }
139 }
140
141 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
143 match self {
144 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
145 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
146 Scanner::Series(series_scan) => series_scan.input().file_ids(),
147 }
148 }
149
150 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
151 match self {
152 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
153 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
154 Scanner::Series(series_scan) => series_scan.input().index_ids(),
155 }
156 }
157
158 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
159 match self {
160 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
161 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
162 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
163 }
164 }
165
166 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
168 use store_api::region_engine::{PrepareRequest, RegionScanner};
169
170 let request = PrepareRequest::default().with_target_partitions(target_partitions);
171 match self {
172 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
173 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
174 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
175 }
176 }
177}
178
179#[cfg_attr(doc, aquamarine::aquamarine)]
180pub(crate) struct ScanRegion {
230 version: VersionRef,
232 access_layer: AccessLayerRef,
234 request: ScanRequest,
236 cache_strategy: CacheStrategy,
238 max_concurrent_scan_files: usize,
240 ignore_inverted_index: bool,
242 ignore_fulltext_index: bool,
244 ignore_bloom_filter: bool,
246 start_time: Option<Instant>,
248 filter_deleted: bool,
251 #[cfg(feature = "enterprise")]
252 extension_range_provider: Option<BoxedExtensionRangeProvider>,
253}
254
255impl ScanRegion {
256 pub(crate) fn new(
258 version: VersionRef,
259 access_layer: AccessLayerRef,
260 request: ScanRequest,
261 cache_strategy: CacheStrategy,
262 ) -> ScanRegion {
263 ScanRegion {
264 version,
265 access_layer,
266 request,
267 cache_strategy,
268 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
269 ignore_inverted_index: false,
270 ignore_fulltext_index: false,
271 ignore_bloom_filter: false,
272 start_time: None,
273 filter_deleted: true,
274 #[cfg(feature = "enterprise")]
275 extension_range_provider: None,
276 }
277 }
278
279 #[must_use]
281 pub(crate) fn with_max_concurrent_scan_files(
282 mut self,
283 max_concurrent_scan_files: usize,
284 ) -> Self {
285 self.max_concurrent_scan_files = max_concurrent_scan_files;
286 self
287 }
288
289 #[must_use]
291 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292 self.ignore_inverted_index = ignore;
293 self
294 }
295
296 #[must_use]
298 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299 self.ignore_fulltext_index = ignore;
300 self
301 }
302
303 #[must_use]
305 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306 self.ignore_bloom_filter = ignore;
307 self
308 }
309
310 #[must_use]
311 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312 self.start_time = Some(now);
313 self
314 }
315
316 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317 self.filter_deleted = filter_deleted;
318 }
319
320 #[cfg(feature = "enterprise")]
321 pub(crate) fn set_extension_range_provider(
322 &mut self,
323 extension_range_provider: BoxedExtensionRangeProvider,
324 ) {
325 self.extension_range_provider = Some(extension_range_provider);
326 }
327
328 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330 pub(crate) async fn scanner(self) -> Result<Scanner> {
331 if self.use_series_scan() {
332 self.series_scan().await.map(Scanner::Series)
333 } else if self.use_unordered_scan() {
334 self.unordered_scan().await.map(Scanner::Unordered)
337 } else {
338 self.seq_scan().await.map(Scanner::Seq)
339 }
340 }
341
342 #[tracing::instrument(
344 level = tracing::Level::DEBUG,
345 skip_all,
346 fields(region_id = %self.region_id())
347 )]
348 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349 if self.use_series_scan() {
350 self.series_scan()
351 .await
352 .map(|scanner| Box::new(scanner) as _)
353 } else if self.use_unordered_scan() {
354 self.unordered_scan()
355 .await
356 .map(|scanner| Box::new(scanner) as _)
357 } else {
358 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359 }
360 }
361
362 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365 let input = self.scan_input().await?.with_compaction(false);
366 Ok(SeqScan::new(input))
367 }
368
369 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372 let input = self.scan_input().await?;
373 Ok(UnorderedScan::new(input))
374 }
375
376 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379 let input = self.scan_input().await?;
380 Ok(SeriesScan::new(input))
381 }
382
383 fn use_unordered_scan(&self) -> bool {
385 self.version.options.append_mode
392 && self.request.series_row_selector.is_none()
393 && (self.request.distribution.is_none()
394 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395 }
396
397 fn use_series_scan(&self) -> bool {
399 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400 }
401
402 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
404 async fn scan_input(self) -> Result<ScanInput> {
405 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
406 let time_range = self.build_time_range_predicate();
407 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
408
409 let mut read_cols = match &self.request.projection_input {
410 Some(p) => {
411 let metadata = &self.version.metadata;
414 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
415 let from_predicate = read_columns_from_predicate(&predicate, metadata);
416 merge(from_projection, from_predicate)
417 }
418 None => {
419 let read_col_ids = self
420 .version
421 .metadata
422 .column_metadatas
423 .iter()
424 .map(|col| col.column_id);
425 ReadColumns::from_deduped_column_ids(read_col_ids)
426 }
427 };
428 let has_structured_json = self
432 .version
433 .metadata
434 .schema
435 .arrow_schema()
436 .fields()
437 .iter()
438 .any(is_structured_json_field);
439 if has_structured_json {
440 narrow_read_columns_by_json_type_hint(
441 &mut read_cols,
442 &self.request.json_type_hint,
443 &self.version.metadata,
444 );
445 }
446 let read_col_ids = read_cols.column_ids();
447
448 let projection = self
450 .request
451 .projection_indices()
452 .map(|x| x.to_vec())
453 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
454 let json_type_hint = has_structured_json
455 .then_some(&self.request.json_type_hint)
456 .inspect(|json_type_hint| {
457 debug!(
458 "Concretized JSON type: {{{}}}",
459 json_type_hint
460 .iter()
461 .map(|(k, v)| format!("{}: {}", k, v))
462 .join(", ")
463 );
464 });
465 let mapper = FlatProjectionMapper::new_with_read_columns(
466 &self.version.metadata,
467 projection,
468 read_cols,
469 json_type_hint,
470 )?;
471
472 let ssts = &self.version.ssts;
473 let mut files = Vec::new();
474 if !self.request.skip_sst_files {
475 for level in ssts.levels() {
476 for file in level.files.values() {
477 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
478 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
479 (Some(_), None) => true,
485 (None, _) => true,
486 };
487
488 if exceed_min_sequence && file_in_range(file, &time_range) {
490 files.push(file.clone());
491 }
492 }
496 }
497 }
498
499 let memtables = self.version.memtables.list_memtables();
500 let mut mem_range_builders = Vec::new();
502 let filter_mode = pre_filter_mode(
503 self.version.options.append_mode,
504 self.version.options.merge_mode(),
505 );
506
507 for m in memtables {
508 let Some((start, end)) = m.stats().time_range() else {
510 continue;
511 };
512 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
514 if !memtable_range.intersects(&time_range) {
515 continue;
516 }
517 let ranges_in_memtable = m.ranges(
518 Some(&read_col_ids),
519 RangesOptions::default()
520 .with_predicate(predicate.clone())
521 .with_sequence(SequenceRange::new(
522 self.request.memtable_min_sequence,
523 self.request.memtable_max_sequence,
524 ))
525 .with_pre_filter_mode(filter_mode),
526 )?;
527 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
528 let stats = v.stats().clone();
529 MemRangeBuilder::new(v, stats)
530 }));
531 }
532
533 let region_id = self.region_id();
534 debug!(
535 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
536 region_id,
537 self.request,
538 time_range,
539 mem_range_builders.len(),
540 files.len(),
541 self.version.options.append_mode,
542 );
543
544 let (non_field_filters, field_filters) = self.partition_by_field_filters();
545 let inverted_index_appliers = [
546 self.build_invereted_index_applier(&non_field_filters),
547 self.build_invereted_index_applier(&field_filters),
548 ];
549 let bloom_filter_appliers = [
550 self.build_bloom_filter_applier(&non_field_filters),
551 self.build_bloom_filter_applier(&field_filters),
552 ];
553 let fulltext_index_appliers = [
554 self.build_fulltext_index_applier(&non_field_filters),
555 self.build_fulltext_index_applier(&field_filters),
556 ];
557 #[cfg(feature = "vector_index")]
558 let vector_index_applier = self.build_vector_index_applier();
559 #[cfg(feature = "vector_index")]
560 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
561 if self.request.filters.is_empty() {
562 search.k
563 } else {
564 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
565 }
566 });
567
568 let input = ScanInput::new(self.access_layer, mapper)
569 .with_time_range(Some(time_range))
570 .with_predicate(predicate)
571 .with_memtables(mem_range_builders)
572 .with_files(files)
573 .with_cache(self.cache_strategy)
574 .with_inverted_index_appliers(inverted_index_appliers)
575 .with_bloom_filter_index_appliers(bloom_filter_appliers)
576 .with_fulltext_index_appliers(fulltext_index_appliers)
577 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
578 .with_start_time(self.start_time)
579 .with_append_mode(self.version.options.append_mode)
580 .with_filter_deleted(self.filter_deleted)
581 .with_merge_mode(self.version.options.merge_mode())
582 .with_series_row_selector(self.request.series_row_selector)
583 .with_distribution(self.request.distribution)
584 .with_explain_flat_format(
585 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
586 )
587 .with_snapshot_sequence(
588 self.request
589 .snapshot_on_scan
590 .then_some(self.request.memtable_max_sequence)
591 .flatten(),
592 );
593 #[cfg(feature = "vector_index")]
594 let input = input
595 .with_vector_index_applier(vector_index_applier)
596 .with_vector_index_k(vector_index_k);
597
598 #[cfg(feature = "enterprise")]
599 let input = if !self.request.skip_sst_files
600 && let Some(provider) = self.extension_range_provider
601 {
602 let ranges = provider
603 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
604 .await?;
605 debug!("Find extension ranges: {ranges:?}");
606 input.with_extension_ranges(ranges)
607 } else {
608 input
609 };
610 Ok(input)
611 }
612
613 fn region_id(&self) -> RegionId {
614 self.version.metadata.region_id
615 }
616
617 fn build_time_range_predicate(&self) -> TimestampRange {
619 let time_index = self.version.metadata.time_index_column();
620 let unit = time_index
621 .column_schema
622 .data_type
623 .as_timestamp()
624 .expect("Time index must have timestamp-compatible type")
625 .unit();
626 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
627 }
628
629 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
632 let field_columns = self
633 .version
634 .metadata
635 .field_columns()
636 .map(|col| &col.column_schema.name)
637 .collect::<HashSet<_>>();
638
639 let mut columns = HashSet::new();
640
641 self.request.filters.iter().cloned().partition(|expr| {
642 columns.clear();
643 if expr_to_columns(expr, &mut columns).is_err() {
645 return true;
647 }
648 !columns
650 .iter()
651 .any(|column| field_columns.contains(&column.name))
652 })
653 }
654
655 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
657 if self.ignore_inverted_index {
658 return None;
659 }
660
661 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
662 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
663
664 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
665
666 InvertedIndexApplierBuilder::new(
667 self.access_layer.table_dir().to_string(),
668 self.access_layer.path_type(),
669 self.access_layer.object_store().clone(),
670 self.version.metadata.as_ref(),
671 self.version.metadata.inverted_indexed_column_ids(
672 self.version
673 .options
674 .index_options
675 .inverted_index
676 .ignore_column_ids
677 .iter(),
678 ),
679 self.access_layer.puffin_manager_factory().clone(),
680 )
681 .with_file_cache(file_cache)
682 .with_inverted_index_cache(inverted_index_cache)
683 .with_puffin_metadata_cache(puffin_metadata_cache)
684 .build(filters)
685 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
686 .ok()
687 .flatten()
688 .map(Arc::new)
689 }
690
691 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
693 if self.ignore_bloom_filter {
694 return None;
695 }
696
697 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
698 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
699 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
700
701 BloomFilterIndexApplierBuilder::new(
702 self.access_layer.table_dir().to_string(),
703 self.access_layer.path_type(),
704 self.access_layer.object_store().clone(),
705 self.version.metadata.as_ref(),
706 self.access_layer.puffin_manager_factory().clone(),
707 )
708 .with_file_cache(file_cache)
709 .with_bloom_filter_index_cache(bloom_filter_index_cache)
710 .with_puffin_metadata_cache(puffin_metadata_cache)
711 .build(filters)
712 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
713 .ok()
714 .flatten()
715 .map(Arc::new)
716 }
717
718 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
720 if self.ignore_fulltext_index {
721 return None;
722 }
723
724 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
725 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
726 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
727 FulltextIndexApplierBuilder::new(
728 self.access_layer.table_dir().to_string(),
729 self.access_layer.path_type(),
730 self.access_layer.object_store().clone(),
731 self.access_layer.puffin_manager_factory().clone(),
732 self.version.metadata.as_ref(),
733 )
734 .with_file_cache(file_cache)
735 .with_puffin_metadata_cache(puffin_metadata_cache)
736 .with_bloom_filter_cache(bloom_filter_index_cache)
737 .build(filters)
738 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
739 .ok()
740 .flatten()
741 .map(Arc::new)
742 }
743
744 #[cfg(feature = "vector_index")]
746 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
747 let vector_search = self.request.vector_search.as_ref()?;
748
749 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
750 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
751 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
752
753 let applier = VectorIndexApplier::new(
754 self.access_layer.table_dir().to_string(),
755 self.access_layer.path_type(),
756 self.access_layer.object_store().clone(),
757 self.access_layer.puffin_manager_factory().clone(),
758 vector_search.column_id,
759 vector_search.query_vector.clone(),
760 vector_search.metric,
761 )
762 .with_file_cache(file_cache)
763 .with_puffin_metadata_cache(puffin_metadata_cache)
764 .with_vector_index_cache(vector_index_cache);
765
766 Some(Arc::new(applier))
767 }
768}
769
770fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
772 if predicate == &TimestampRange::min_to_max() {
773 return true;
774 }
775 let (start, end) = file.time_range();
777 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
778 file_ts_range.intersects(predicate)
779}
780
781pub struct ScanInput {
783 access_layer: AccessLayerRef,
785 pub(crate) mapper: Arc<FlatProjectionMapper>,
787 pub(crate) read_cols: ReadColumns,
791 pub(crate) time_range: Option<TimestampRange>,
793 pub(crate) predicate: PredicateGroup,
795 region_partition_expr: Option<PartitionExpr>,
797 pub(crate) memtables: Vec<MemRangeBuilder>,
799 pub(crate) files: Vec<FileHandle>,
801 pub(crate) cache_strategy: CacheStrategy,
803 ignore_file_not_found: bool,
805 pub(crate) max_concurrent_scan_files: usize,
807 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
809 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
810 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
811 #[cfg(feature = "vector_index")]
813 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
814 #[cfg(feature = "vector_index")]
816 pub(crate) vector_index_k: Option<usize>,
817 pub(crate) query_start: Option<Instant>,
819 pub(crate) append_mode: bool,
821 pub(crate) filter_deleted: bool,
823 pub(crate) merge_mode: MergeMode,
825 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
827 pub(crate) distribution: Option<TimeSeriesDistribution>,
829 explain_flat_format: bool,
831 pub(crate) snapshot_sequence: Option<SequenceNumber>,
833 pub(crate) compaction: bool,
835 #[cfg(feature = "enterprise")]
836 extension_ranges: Vec<BoxedExtensionRange>,
837}
838
839impl ScanInput {
840 #[must_use]
842 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
843 ScanInput {
844 access_layer,
845 read_cols: mapper.read_columns().clone(),
846 mapper: Arc::new(mapper),
847 time_range: None,
848 predicate: PredicateGroup::default(),
849 region_partition_expr: None,
850 memtables: Vec::new(),
851 files: Vec::new(),
852 cache_strategy: CacheStrategy::Disabled,
853 ignore_file_not_found: false,
854 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
855 inverted_index_appliers: [None, None],
856 bloom_filter_index_appliers: [None, None],
857 fulltext_index_appliers: [None, None],
858 #[cfg(feature = "vector_index")]
859 vector_index_applier: None,
860 #[cfg(feature = "vector_index")]
861 vector_index_k: None,
862 query_start: None,
863 append_mode: false,
864 filter_deleted: true,
865 merge_mode: MergeMode::default(),
866 series_row_selector: None,
867 distribution: None,
868 explain_flat_format: false,
869 snapshot_sequence: None,
870 compaction: false,
871 #[cfg(feature = "enterprise")]
872 extension_ranges: Vec::new(),
873 }
874 }
875
876 #[must_use]
878 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
879 self.time_range = time_range;
880 self
881 }
882
883 #[must_use]
885 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
886 self.region_partition_expr = predicate.region_partition_expr().cloned();
887 self.predicate = predicate;
888 self
889 }
890
891 #[must_use]
893 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
894 self.memtables = memtables;
895 self
896 }
897
898 #[must_use]
900 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
901 self.files = files;
902 self
903 }
904
905 #[must_use]
907 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
908 self.cache_strategy = cache;
909 self
910 }
911
912 #[must_use]
914 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
915 self.ignore_file_not_found = ignore;
916 self
917 }
918
919 #[must_use]
921 pub(crate) fn with_max_concurrent_scan_files(
922 mut self,
923 max_concurrent_scan_files: usize,
924 ) -> Self {
925 self.max_concurrent_scan_files = max_concurrent_scan_files;
926 self
927 }
928
929 #[must_use]
931 pub(crate) fn with_inverted_index_appliers(
932 mut self,
933 appliers: [Option<InvertedIndexApplierRef>; 2],
934 ) -> Self {
935 self.inverted_index_appliers = appliers;
936 self
937 }
938
939 #[must_use]
941 pub(crate) fn with_bloom_filter_index_appliers(
942 mut self,
943 appliers: [Option<BloomFilterIndexApplierRef>; 2],
944 ) -> Self {
945 self.bloom_filter_index_appliers = appliers;
946 self
947 }
948
949 #[must_use]
951 pub(crate) fn with_fulltext_index_appliers(
952 mut self,
953 appliers: [Option<FulltextIndexApplierRef>; 2],
954 ) -> Self {
955 self.fulltext_index_appliers = appliers;
956 self
957 }
958
959 #[cfg(feature = "vector_index")]
961 #[must_use]
962 pub(crate) fn with_vector_index_applier(
963 mut self,
964 applier: Option<VectorIndexApplierRef>,
965 ) -> Self {
966 self.vector_index_applier = applier;
967 self
968 }
969
970 #[cfg(feature = "vector_index")]
972 #[must_use]
973 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
974 self.vector_index_k = k;
975 self
976 }
977
978 #[must_use]
980 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
981 self.query_start = now;
982 self
983 }
984
985 #[must_use]
986 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
987 self.append_mode = is_append_mode;
988 self
989 }
990
991 #[must_use]
993 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
994 self.filter_deleted = filter_deleted;
995 self
996 }
997
998 #[must_use]
1000 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1001 self.merge_mode = merge_mode;
1002 self
1003 }
1004
1005 #[must_use]
1007 pub(crate) fn with_distribution(
1008 mut self,
1009 distribution: Option<TimeSeriesDistribution>,
1010 ) -> Self {
1011 self.distribution = distribution;
1012 self
1013 }
1014
1015 #[must_use]
1017 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1018 self.explain_flat_format = explain_flat_format;
1019 self
1020 }
1021
1022 #[must_use]
1024 pub(crate) fn with_series_row_selector(
1025 mut self,
1026 series_row_selector: Option<TimeSeriesRowSelector>,
1027 ) -> Self {
1028 self.series_row_selector = series_row_selector;
1029 self
1030 }
1031
1032 #[must_use]
1033 pub(crate) fn with_snapshot_sequence(
1034 mut self,
1035 snapshot_sequence: Option<SequenceNumber>,
1036 ) -> Self {
1037 self.snapshot_sequence = snapshot_sequence;
1038 self
1039 }
1040
1041 #[must_use]
1043 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1044 self.compaction = compaction;
1045 self
1046 }
1047
1048 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1050 let memtable = &self.memtables[index.index];
1051 let mut ranges = SmallVec::new();
1052 memtable.build_ranges(index.row_group_index, &mut ranges);
1053 ranges
1054 }
1055
1056 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1057 if self.should_skip_region_partition(file) {
1058 self.predicate.predicate_without_region().cloned()
1059 } else {
1060 self.predicate.predicate().cloned()
1061 }
1062 }
1063
1064 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1065 match (
1066 self.region_partition_expr.as_ref(),
1067 file.meta_ref().partition_expr.as_ref(),
1068 ) {
1069 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1070 _ => false,
1071 }
1072 }
1073
1074 #[tracing::instrument(
1076 skip_all,
1077 fields(
1078 region_id = %self.region_metadata().region_id,
1079 file_id = %file.file_id()
1080 )
1081 )]
1082 pub async fn prune_file(
1083 &self,
1084 file: &FileHandle,
1085 pre_filter_mode: PreFilterMode,
1086 reader_metrics: &mut ReaderMetrics,
1087 ) -> Result<FileRangeBuilder> {
1088 let predicate = self.predicate_for_file(file);
1089 let may_build_selective_row_selection = predicate.is_some();
1090 let decode_pk_values = !self.compaction
1091 && self
1092 .mapper
1093 .read_columns()
1094 .column_ids_iter()
1095 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1096 let reader = self
1097 .access_layer
1098 .read_sst(file.clone())
1099 .predicate(predicate)
1100 .projection(Some(self.read_cols.clone()))
1101 .cache(self.cache_strategy.clone())
1102 .inverted_index_appliers(self.inverted_index_appliers.clone())
1103 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1104 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1105 let reader = if !self.compaction && may_build_selective_row_selection {
1106 reader.deferred_optional_page_index()
1107 } else {
1108 reader
1109 };
1110 #[cfg(feature = "vector_index")]
1111 let reader = {
1112 let mut reader = reader;
1113 reader =
1114 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1115 reader
1116 };
1117 let res = reader
1118 .expected_metadata(Some(self.mapper.metadata().clone()))
1119 .compaction(self.compaction)
1120 .pre_filter_mode(pre_filter_mode)
1121 .decode_primary_key_values(decode_pk_values)
1122 .build_reader_input(reader_metrics)
1123 .await;
1124 let read_input = match res {
1125 Ok(x) => x,
1126 Err(e) => {
1127 if e.is_object_not_found() && self.ignore_file_not_found {
1128 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1129 return Ok(FileRangeBuilder::default());
1130 } else {
1131 return Err(e);
1132 }
1133 }
1134 };
1135
1136 let Some((mut file_range_ctx, selection)) = read_input else {
1137 return Ok(FileRangeBuilder::default());
1138 };
1139
1140 let need_compat = !compat::has_same_columns_and_pk_encoding(
1141 &self.mapper,
1142 file_range_ctx.read_format(),
1143 self.compaction,
1144 );
1145 if need_compat {
1146 let compat = FlatCompatBatch::try_new(
1149 &self.mapper,
1150 file_range_ctx.read_format(),
1151 self.compaction,
1152 )?;
1153 file_range_ctx.set_compat_batch(compat);
1154 }
1155 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1156 }
1157
1158 #[tracing::instrument(
1162 skip(self, sources, semaphore),
1163 fields(
1164 region_id = %self.region_metadata().region_id,
1165 source_count = sources.len()
1166 )
1167 )]
1168 pub(crate) fn create_parallel_flat_sources(
1169 &self,
1170 sources: Vec<BoxedRecordBatchStream>,
1171 semaphore: Arc<Semaphore>,
1172 channel_size: usize,
1173 ) -> Result<Vec<BoxedRecordBatchStream>> {
1174 if sources.len() <= 1 {
1175 return Ok(sources);
1176 }
1177
1178 let sources = sources
1180 .into_iter()
1181 .map(|source| {
1182 let (sender, receiver) = mpsc::channel(channel_size);
1183 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1184 let stream = Box::pin(ReceiverStream::new(receiver));
1185 Box::pin(stream) as _
1186 })
1187 .collect();
1188 Ok(sources)
1189 }
1190
1191 #[tracing::instrument(
1193 skip(self, input, semaphore, sender),
1194 fields(region_id = %self.region_metadata().region_id)
1195 )]
1196 pub(crate) fn spawn_flat_scan_task(
1197 &self,
1198 mut input: BoxedRecordBatchStream,
1199 semaphore: Arc<Semaphore>,
1200 sender: mpsc::Sender<Result<RecordBatch>>,
1201 ) {
1202 let region_id = self.region_metadata().region_id;
1203 let span = tracing::info_span!(
1204 "ScanInput::parallel_scan_task",
1205 region_id = %region_id,
1206 stream_kind = "flat"
1207 );
1208 common_runtime::spawn_query(
1209 async move {
1210 loop {
1211 let maybe_batch = {
1214 let _permit = semaphore.acquire().await.unwrap();
1216 input.next().await
1217 };
1218 match maybe_batch {
1219 Some(Ok(batch)) => {
1220 let _ = sender.send(Ok(batch)).await;
1221 }
1222 Some(Err(e)) => {
1223 let _ = sender.send(Err(e)).await;
1224 break;
1225 }
1226 None => break,
1227 }
1228 }
1229 }
1230 .instrument(span),
1231 );
1232 }
1233
1234 pub(crate) fn total_rows(&self) -> usize {
1235 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1236 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1237
1238 let rows = rows_in_files + rows_in_memtables;
1239 #[cfg(feature = "enterprise")]
1240 let rows = rows
1241 + self
1242 .extension_ranges
1243 .iter()
1244 .map(|x| x.num_rows())
1245 .sum::<u64>() as usize;
1246 rows
1247 }
1248
1249 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1250 &self.predicate
1251 }
1252
1253 pub(crate) fn num_memtables(&self) -> usize {
1255 self.memtables.len()
1256 }
1257
1258 pub(crate) fn num_files(&self) -> usize {
1260 self.files.len()
1261 }
1262
1263 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1265 let file_index = index.index - self.num_memtables();
1266 &self.files[file_index]
1267 }
1268
1269 pub fn region_metadata(&self) -> &RegionMetadataRef {
1270 self.mapper.metadata()
1271 }
1272
1273 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1274 if source_count <= 1 {
1275 return PreFilterMode::All;
1280 }
1281
1282 pre_filter_mode(self.append_mode, self.merge_mode)
1283 }
1284}
1285
1286#[cfg(feature = "enterprise")]
1287impl ScanInput {
1288 #[must_use]
1289 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1290 Self {
1291 extension_ranges,
1292 ..self
1293 }
1294 }
1295
1296 #[cfg(feature = "enterprise")]
1297 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1298 &self.extension_ranges
1299 }
1300
1301 #[cfg(feature = "enterprise")]
1303 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1304 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1305 }
1306}
1307
1308#[cfg(test)]
1309impl ScanInput {
1310 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1312 self.files.iter().map(|file| file.file_id()).collect()
1313 }
1314
1315 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1316 self.files.iter().map(|file| file.index_id()).collect()
1317 }
1318}
1319
1320fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1321 if append_mode {
1322 return PreFilterMode::All;
1323 }
1324
1325 match merge_mode {
1326 MergeMode::LastRow => PreFilterMode::SkipFields,
1327 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1328 }
1329}
1330
1331fn narrow_read_columns_by_json_type_hint(
1332 read_columns: &mut ReadColumns,
1333 json_type_hint: &HashMap<String, JsonNativeType>,
1334 metadata: &RegionMetadata,
1335) {
1336 if json_type_hint.is_empty() {
1337 return;
1338 }
1339
1340 for read_column in &mut read_columns.cols {
1341 let Some(column) = metadata.column_by_id(read_column.column_id) else {
1342 continue;
1343 };
1344 let column_name = &column.column_schema.name;
1345 let Some(json_type) = json_type_hint.get(column_name) else {
1346 continue;
1347 };
1348
1349 let mut paths = Vec::new();
1350 let mut current = vec![column_name.clone()];
1351 collect_json_nested_paths(json_type, &mut current, &mut paths);
1352 merge_nested_paths(&mut read_column.nested_paths, paths)
1353 }
1354}
1355
1356fn collect_json_nested_paths(
1357 json_type: &JsonNativeType,
1358 current: &mut NestedPath,
1359 paths: &mut Vec<NestedPath>,
1360) {
1361 match json_type {
1362 JsonNativeType::Object(fields) if !fields.is_empty() => {
1363 for (field, child) in fields {
1364 current.push(field.clone());
1365 collect_json_nested_paths(child, current, paths);
1366 current.pop();
1367 }
1368 }
1369 _ => paths.push(current.clone()),
1370 }
1371}
1372
1373pub(crate) struct ScanFingerprintBundle {
1377 pub(crate) fingerprint: ScanRequestFingerprint,
1378 pub(crate) implied_time_range: Option<TimestampRange>,
1383}
1384
1385pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1388 let eligible = !input.compaction
1389 && !input.files.is_empty()
1390 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1391
1392 if !eligible {
1393 return None;
1394 }
1395
1396 let metadata = input.region_metadata();
1397 let tag_names: HashSet<&str> = metadata
1398 .column_metadatas
1399 .iter()
1400 .filter(|col| col.semantic_type == SemanticType::Tag)
1401 .map(|col| col.column_schema.name.as_str())
1402 .collect();
1403
1404 let time_index = metadata.time_index_column();
1405 let time_index_name = time_index.column_schema.name.clone();
1406 let ts_col_unit = time_index
1407 .column_schema
1408 .data_type
1409 .as_timestamp()
1410 .expect("Time index must have timestamp-compatible type")
1411 .unit();
1412
1413 let exprs = input
1414 .predicate_group()
1415 .predicate_without_region()
1416 .map(|predicate| predicate.exprs())
1417 .unwrap_or_default();
1418
1419 let mut filters = Vec::new();
1420 let mut time_only_exprs: Vec<&Expr> = Vec::new();
1421 let mut has_tag_filter = false;
1422 let mut columns = HashSet::new();
1423
1424 for expr in exprs {
1425 columns.clear();
1426 let is_time_only = match expr_to_columns(expr, &mut columns) {
1427 Ok(()) if !columns.is_empty() => {
1428 has_tag_filter |= columns
1429 .iter()
1430 .any(|col| tag_names.contains(col.name.as_str()));
1431 columns.iter().all(|col| col.name == time_index_name)
1432 }
1433 _ => false,
1434 };
1435
1436 if is_time_only
1444 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1445 {
1446 time_only_exprs.push(expr);
1447 } else {
1448 filters.push(expr.to_string());
1449 }
1450 }
1451
1452 if !has_tag_filter {
1453 return None;
1455 }
1456
1457 let implied_time_range =
1458 implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1459 let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1460
1461 filters.sort_unstable();
1463 time_filters.sort_unstable();
1464 let read_columns = input.read_cols.clone();
1465 let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1466 read_column_types: read_columns
1467 .column_ids_iter()
1468 .map(|id| {
1469 metadata
1470 .column_by_id(id)
1471 .map(|col| col.column_schema.data_type.clone())
1472 })
1473 .collect(),
1474 read_columns,
1475 filters,
1476 time_filters,
1477 series_row_selector: input.series_row_selector,
1478 append_mode: input.append_mode,
1479 filter_deleted: input.filter_deleted,
1480 merge_mode: input.merge_mode,
1481 partition_expr_version: metadata.partition_expr_version,
1482 }
1483 .build();
1484
1485 Some(ScanFingerprintBundle {
1486 fingerprint,
1487 implied_time_range,
1488 })
1489}
1490
1491pub struct StreamContext {
1494 pub input: ScanInput,
1496 pub(crate) ranges: Vec<RangeMeta>,
1498 #[allow(dead_code)]
1501 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1502 pub(crate) scan_implied_time_range: Option<TimestampRange>,
1509
1510 pub(crate) query_start: Instant,
1513}
1514
1515impl StreamContext {
1516 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1518 let query_start = input.query_start.unwrap_or_else(Instant::now);
1519 let ranges = RangeMeta::seq_scan_ranges(&input);
1520 READ_SST_COUNT.observe(input.num_files() as f64);
1521 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1522 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1523 None => (None, None),
1524 };
1525
1526 Self {
1527 input,
1528 ranges,
1529 scan_fingerprint,
1530 scan_implied_time_range,
1531 query_start,
1532 }
1533 }
1534
1535 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1537 let query_start = input.query_start.unwrap_or_else(Instant::now);
1538 let ranges = RangeMeta::unordered_scan_ranges(&input);
1539 READ_SST_COUNT.observe(input.num_files() as f64);
1540 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1541 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1542 None => (None, None),
1543 };
1544
1545 Self {
1546 input,
1547 ranges,
1548 scan_fingerprint,
1549 scan_implied_time_range,
1550 query_start,
1551 }
1552 }
1553
1554 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1556 self.input.num_memtables() > index.index
1557 }
1558
1559 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1560 !self.is_mem_range_index(index)
1561 && index.index < self.input.num_files() + self.input.num_memtables()
1562 }
1563
1564 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1565 let range_meta = &self.ranges[part_range.identifier];
1566 let source_count = range_meta.indices.len();
1567
1568 self.input.range_pre_filter_mode(source_count)
1569 }
1570
1571 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1573 self.ranges
1574 .iter()
1575 .enumerate()
1576 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1577 .collect()
1578 }
1579
1580 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1582 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1583 for range_meta in &self.ranges {
1584 for idx in &range_meta.row_group_indices {
1585 if self.is_mem_range_index(*idx) {
1586 num_mem_ranges += 1;
1587 } else if self.is_file_range_index(*idx) {
1588 num_file_ranges += 1;
1589 } else {
1590 num_other_ranges += 1;
1591 }
1592 }
1593 }
1594 if verbose {
1595 write!(f, "{{")?;
1596 }
1597 write!(
1598 f,
1599 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1600 self.ranges.len(),
1601 num_mem_ranges,
1602 self.input.num_files(),
1603 num_file_ranges,
1604 )?;
1605 if num_other_ranges > 0 {
1606 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1607 }
1608 write!(f, "}}")?;
1609
1610 if let Some(selector) = &self.input.series_row_selector {
1611 write!(f, ", \"selector\":\"{}\"", selector)?;
1612 }
1613 if let Some(distribution) = &self.input.distribution {
1614 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1615 }
1616
1617 if verbose {
1618 self.format_verbose_content(f)?;
1619 }
1620
1621 Ok(())
1622 }
1623
1624 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1625 struct FileWrapper<'a> {
1626 file: &'a FileHandle,
1627 }
1628
1629 impl fmt::Debug for FileWrapper<'_> {
1630 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1631 let (start, end) = self.file.time_range();
1632 write!(
1633 f,
1634 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1635 self.file.file_id(),
1636 start.value(),
1637 start.unit(),
1638 end.value(),
1639 end.unit(),
1640 self.file.num_rows(),
1641 self.file.size(),
1642 self.file.index_size()
1643 )
1644 }
1645 }
1646
1647 struct InputWrapper<'a> {
1648 input: &'a ScanInput,
1649 }
1650
1651 #[cfg(feature = "enterprise")]
1652 impl InputWrapper<'_> {
1653 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1654 if self.input.extension_ranges.is_empty() {
1655 return Ok(());
1656 }
1657
1658 let mut delimiter = "";
1659 write!(f, ", extension_ranges: [")?;
1660 for range in self.input.extension_ranges() {
1661 write!(f, "{}{:?}", delimiter, range)?;
1662 delimiter = ", ";
1663 }
1664 write!(f, "]")?;
1665 Ok(())
1666 }
1667 }
1668
1669 impl fmt::Debug for InputWrapper<'_> {
1670 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1671 let output_schema = self.input.mapper.output_schema();
1672 if !output_schema.is_empty() {
1673 let names: Vec<_> = output_schema
1674 .column_schemas()
1675 .iter()
1676 .map(|col| &col.name)
1677 .collect();
1678 write!(f, ", \"projection\": {:?}", names)?;
1679 }
1680 if let Some(predicate) = &self.input.predicate.predicate() {
1681 if !predicate.exprs().is_empty() {
1682 let exprs: Vec<_> =
1683 predicate.exprs().iter().map(|e| e.to_string()).collect();
1684 write!(f, ", \"filters\": {:?}", exprs)?;
1685 }
1686 if !predicate.dyn_filters().is_empty() {
1687 let dyn_filters: Vec<_> = predicate
1688 .dyn_filters()
1689 .iter()
1690 .map(|f| format!("{}", f))
1691 .collect();
1692 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1693 }
1694 }
1695 #[cfg(feature = "vector_index")]
1696 if let Some(vector_index_k) = self.input.vector_index_k {
1697 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1698 }
1699 if !self.input.files.is_empty() {
1700 write!(f, ", \"files\": ")?;
1701 f.debug_list()
1702 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1703 .finish()?;
1704 }
1705 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1706 #[cfg(feature = "enterprise")]
1707 self.format_extension_ranges(f)?;
1708
1709 Ok(())
1710 }
1711 }
1712
1713 write!(f, "{:?}", InputWrapper { input: &self.input })
1714 }
1715
1716 pub(crate) fn add_dyn_filter_to_predicate(
1719 self: &Arc<Self>,
1720 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1721 ) -> Vec<bool> {
1722 let mut supported = Vec::with_capacity(filter_exprs.len());
1723 let filter_expr = filter_exprs
1724 .into_iter()
1725 .filter_map(|expr| {
1726 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1727 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1728 {
1729 supported.push(true);
1730 Some(dyn_filter)
1731 } else {
1732 supported.push(false);
1733 None
1734 }
1735 })
1736 .collect();
1737 self.input.predicate.add_dyn_filters(filter_expr);
1738 supported
1739 }
1740}
1741
1742#[derive(Clone, Default)]
1745pub struct PredicateGroup {
1746 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1747 predicate_all: Predicate,
1749 predicate_without_region: Predicate,
1751 region_partition_expr: Option<PartitionExpr>,
1753}
1754
1755impl PredicateGroup {
1756 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1758 let mut combined_exprs = exprs.to_vec();
1759 let mut region_partition_expr = None;
1760
1761 if let Some(expr_json) = metadata.partition_expr.as_ref()
1762 && !expr_json.is_empty()
1763 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1764 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1765 {
1766 let logical_expr = expr
1767 .try_as_logical_expr()
1768 .context(InvalidPartitionExprSnafu {
1769 expr: expr_json.clone(),
1770 })?;
1771
1772 combined_exprs.push(logical_expr);
1773 region_partition_expr = Some(expr);
1774 }
1775
1776 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1777 let mut columns = HashSet::new();
1779 for expr in &combined_exprs {
1780 columns.clear();
1781 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1782 continue;
1783 };
1784 time_filters.push(filter);
1785 }
1786 let time_filters = if time_filters.is_empty() {
1787 None
1788 } else {
1789 Some(Arc::new(time_filters))
1790 };
1791
1792 let predicate_all = Predicate::new(combined_exprs);
1793 let predicate_without_region = Predicate::new(exprs.to_vec());
1794
1795 Ok(Self {
1796 time_filters,
1797 predicate_all,
1798 predicate_without_region,
1799 region_partition_expr,
1800 })
1801 }
1802
1803 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1805 self.time_filters.clone()
1806 }
1807
1808 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1810 if self.predicate_all.is_empty() {
1811 None
1812 } else {
1813 Some(&self.predicate_all)
1814 }
1815 }
1816
1817 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1819 if self.predicate_without_region.is_empty() {
1820 None
1821 } else {
1822 Some(&self.predicate_without_region)
1823 }
1824 }
1825
1826 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1828 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1829 self.predicate_without_region.add_dyn_filters(dyn_filters);
1830 }
1831
1832 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1834 self.region_partition_expr.as_ref()
1835 }
1836
1837 fn expr_to_filter(
1838 expr: &Expr,
1839 metadata: &RegionMetadata,
1840 columns: &mut HashSet<Column>,
1841 ) -> Option<SimpleFilterEvaluator> {
1842 columns.clear();
1843 expr_to_columns(expr, columns).ok()?;
1846 if columns.len() > 1 {
1847 return None;
1849 }
1850 let column = columns.iter().next()?;
1851 let column_meta = metadata.column_by_name(&column.name)?;
1852 if column_meta.semantic_type == SemanticType::Timestamp {
1853 SimpleFilterEvaluator::try_new(expr)
1854 } else {
1855 None
1856 }
1857 }
1858}
1859
1860#[cfg(test)]
1861mod tests {
1862 use std::sync::Arc;
1863
1864 use datafusion::physical_plan::expressions::lit as physical_lit;
1865 use datafusion_common::ScalarValue;
1866 use datafusion_expr::{col, lit};
1867 use datatypes::prelude::ConcreteDataType;
1868 use datatypes::schema::ColumnSchema;
1869 use datatypes::types::json_type::JsonObjectType;
1870 use datatypes::value::Value;
1871 use partition::expr::col as partition_col;
1872 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1873 use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1874
1875 use super::*;
1876 use crate::cache::CacheManager;
1877 use crate::error::InvalidMetadataSnafu;
1878 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1879 use crate::read::read_columns::ReadColumn;
1880 use crate::test_util::memtable_util::metadata_with_primary_key;
1881 use crate::test_util::scheduler_util::SchedulerEnv;
1882
1883 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1884 let env = SchedulerEnv::new().await;
1885 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1886 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1887 let file = FileHandle::new(
1888 crate::sst::file::FileMeta::default(),
1889 Arc::new(crate::sst::file_purger::NoopFilePurger),
1890 );
1891
1892 ScanInput::new(env.access_layer.clone(), mapper)
1893 .with_predicate(predicate)
1894 .with_cache(CacheStrategy::EnableAll(Arc::new(
1895 CacheManager::builder()
1896 .range_result_cache_size(1024)
1897 .build(),
1898 )))
1899 .with_files(vec![file])
1900 }
1901
1902 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1904 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1905 }
1906
1907 #[test]
1908 fn test_fill_json_nested_paths_from_hint() -> Result<()> {
1909 fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
1910 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
1911 builder
1912 .push_column_metadata(ColumnMetadata {
1913 column_schema: ColumnSchema::new(
1914 "tag".to_string(),
1915 ConcreteDataType::string_datatype(),
1916 true,
1917 ),
1918 semantic_type: SemanticType::Tag,
1919 column_id: 0,
1920 })
1921 .push_column_metadata(ColumnMetadata {
1922 column_schema: ColumnSchema::new(
1923 "j".to_string(),
1924 ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
1925 true,
1926 ),
1927 semantic_type: SemanticType::Field,
1928 column_id: 1,
1929 })
1930 .push_column_metadata(ColumnMetadata {
1931 column_schema: ColumnSchema::new(
1932 "ts".to_string(),
1933 ConcreteDataType::timestamp_millisecond_datatype(),
1934 false,
1935 ),
1936 semantic_type: SemanticType::Timestamp,
1937 column_id: 2,
1938 });
1939 builder.primary_key(vec![0]);
1940 builder.build().context(InvalidMetadataSnafu).map(Arc::new)
1941 }
1942
1943 let metadata = json_projection_test_metadata()?;
1944 let hint = HashMap::from([(
1945 "j".to_string(),
1946 JsonNativeType::Object(JsonObjectType::from([
1947 ("a".to_string(), JsonNativeType::i64()),
1948 (
1949 "b".to_string(),
1950 JsonNativeType::Object(JsonObjectType::from([(
1951 "c".to_string(),
1952 JsonNativeType::String,
1953 )])),
1954 ),
1955 ])),
1956 )]);
1957
1958 fn nested_path(parts: &[&str]) -> NestedPath {
1959 parts.iter().map(|part| part.to_string()).collect()
1960 }
1961
1962 let mut read_columns = ReadColumns {
1963 cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
1964 };
1965 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1966 assert_eq!(
1967 read_columns,
1968 ReadColumns {
1969 cols: vec![
1970 ReadColumn::new(
1971 1,
1972 vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
1973 ),
1974 ReadColumn::new(0, vec![])
1975 ]
1976 }
1977 );
1978
1979 let mut read_columns = ReadColumns {
1980 cols: vec![ReadColumn::new(0, vec![])],
1981 };
1982 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1983 assert_eq!(
1984 read_columns,
1985 ReadColumns {
1986 cols: vec![ReadColumn::new(0, vec![])]
1987 }
1988 );
1989 Ok(())
1990 }
1991
1992 #[tokio::test]
1993 async fn test_build_scan_fingerprint_for_eligible_scan() {
1994 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1995 let input = new_scan_input(
1996 metadata.clone(),
1997 vec![
1998 col("ts").gt_eq(ts_lit(1000)),
1999 col("k0").eq(lit("foo")),
2000 col("v0").gt(lit(1)),
2001 ],
2002 )
2003 .await
2004 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2005 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2006 .with_merge_mode(MergeMode::LastNonNull)
2007 .with_filter_deleted(false);
2008
2009 let fingerprint = build_scan_fingerprint(&input).unwrap();
2010
2011 let expected = ScanRequestFingerprintBuilder {
2012 read_columns: input.read_cols,
2013 read_column_types: vec![
2014 metadata
2015 .column_by_id(0)
2016 .map(|col| col.column_schema.data_type.clone()),
2017 metadata
2018 .column_by_id(2)
2019 .map(|col| col.column_schema.data_type.clone()),
2020 metadata
2021 .column_by_id(3)
2022 .map(|col| col.column_schema.data_type.clone()),
2023 ],
2024 filters: vec![
2025 col("k0").eq(lit("foo")).to_string(),
2026 col("v0").gt(lit(1)).to_string(),
2027 ],
2028 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2029 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2030 append_mode: false,
2031 filter_deleted: false,
2032 merge_mode: MergeMode::LastNonNull,
2033 partition_expr_version: 0,
2034 }
2035 .build();
2036 assert_eq!(expected, fingerprint.fingerprint);
2037 }
2038
2039 #[tokio::test]
2040 async fn test_build_scan_fingerprint_requires_tag_filter() {
2041 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2042 let input = new_scan_input(
2043 metadata,
2044 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2045 )
2046 .await;
2047
2048 assert!(build_scan_fingerprint(&input).is_none());
2049 }
2050
2051 #[tokio::test]
2052 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2053 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2054 let filters = vec![col("k0").eq(lit("foo"))];
2055
2056 let disabled = ScanInput::new(
2057 SchedulerEnv::new().await.access_layer.clone(),
2058 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2059 )
2060 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2061 assert!(build_scan_fingerprint(&disabled).is_none());
2062
2063 let compaction = new_scan_input(metadata.clone(), filters.clone())
2064 .await
2065 .with_compaction(true);
2066 assert!(build_scan_fingerprint(&compaction).is_none());
2067
2068 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2070 assert!(build_scan_fingerprint(&no_files).is_none());
2071 }
2072
2073 #[tokio::test]
2074 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2075 let base = metadata_with_primary_key(vec![0, 1], false);
2076 let mut builder = RegionMetadataBuilder::from_existing(base);
2077 let partition_expr = partition_col("k0")
2078 .gt_eq(Value::String("foo".into()))
2079 .as_json_str()
2080 .unwrap();
2081 builder.partition_expr_json(Some(partition_expr));
2082 let metadata = Arc::new(builder.build_without_validation().unwrap());
2083
2084 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2085 let fingerprint = build_scan_fingerprint(&input).unwrap();
2086
2087 let expected = ScanRequestFingerprintBuilder {
2088 read_columns: input.read_cols,
2089 read_column_types: vec![
2090 metadata
2091 .column_by_id(0)
2092 .map(|col| col.column_schema.data_type.clone()),
2093 metadata
2094 .column_by_id(2)
2095 .map(|col| col.column_schema.data_type.clone()),
2096 metadata
2097 .column_by_id(3)
2098 .map(|col| col.column_schema.data_type.clone()),
2099 ],
2100 filters: vec![col("k0").eq(lit("foo")).to_string()],
2101 time_filters: vec![],
2102 series_row_selector: None,
2103 append_mode: false,
2104 filter_deleted: true,
2105 merge_mode: MergeMode::LastRow,
2106 partition_expr_version: metadata.partition_expr_version,
2107 }
2108 .build();
2109 assert_eq!(expected, fingerprint.fingerprint);
2110 assert_ne!(0, metadata.partition_expr_version);
2111 }
2112
2113 #[test]
2114 fn test_update_dyn_filters_with_empty_base_predicates() {
2115 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2116 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2117 assert!(predicate_group.predicate().is_none());
2118 assert!(predicate_group.predicate_without_region().is_none());
2119
2120 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2121 predicate_group.add_dyn_filters(vec![dyn_filter]);
2122
2123 let predicate_all = predicate_group.predicate().unwrap();
2124 assert!(predicate_all.exprs().is_empty());
2125 assert_eq!(1, predicate_all.dyn_filters().len());
2126
2127 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2128 assert!(predicate_without_region.exprs().is_empty());
2129 assert_eq!(1, predicate_without_region.dyn_filters().len());
2130 }
2131
2132 #[tokio::test]
2133 async fn test_range_pre_filter_mode() {
2134 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2135 let cases = [
2136 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2137 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2138 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2139 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2140 ];
2141
2142 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2143 let input = new_scan_input(metadata.clone(), vec![])
2144 .await
2145 .with_append_mode(append_mode)
2146 .with_merge_mode(merge_mode);
2147
2148 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2149 }
2150 }
2151}