1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::Column;
32use datafusion_expr::Expr;
33use datafusion_expr::utils::expr_to_columns;
34use datatypes::schema::ext::ArrowSchemaExt;
35use datatypes::types::json_type::JsonNativeType;
36use futures::StreamExt;
37use itertools::Itertools;
38use partition::expr::PartitionExpr;
39use smallvec::SmallVec;
40use snafu::ResultExt;
41use store_api::metadata::{RegionMetadata, RegionMetadataRef};
42use store_api::region_engine::{PartitionRange, RegionScannerRef};
43use store_api::storage::{
44 NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
45 TimeSeriesRowSelector,
46};
47use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
48use tokio::sync::{Semaphore, mpsc};
49use tokio_stream::wrappers::ReceiverStream;
50
51use crate::access_layer::AccessLayerRef;
52use crate::cache::CacheStrategy;
53use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
54use crate::error::{InvalidPartitionExprSnafu, Result};
55#[cfg(feature = "enterprise")]
56use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
57use crate::memtable::{MemtableRange, RangesOptions};
58use crate::metrics::READ_SST_COUNT;
59use crate::read::compat::{self, FlatCompatBatch};
60use crate::read::flat_projection::FlatProjectionMapper;
61use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
62use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
63use crate::read::read_columns::{
64 ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
65 read_columns_from_projection,
66};
67use crate::read::seq_scan::SeqScan;
68use crate::read::series_scan::SeriesScan;
69use crate::read::stream::ScanBatchStream;
70use crate::read::unordered_scan::UnorderedScan;
71use crate::read::{BoxedRecordBatchStream, RecordBatch};
72use crate::region::options::MergeMode;
73use crate::region::version::VersionRef;
74use crate::sst::file::FileHandle;
75use crate::sst::index::bloom_filter::applier::{
76 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
77};
78use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
79use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
80use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
81use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
82#[cfg(feature = "vector_index")]
83use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
84use crate::sst::parquet::file_range::PreFilterMode;
85use crate::sst::parquet::reader::ReaderMetrics;
86
87#[cfg(feature = "vector_index")]
88const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
89
90pub(crate) enum Scanner {
92 Seq(SeqScan),
94 Unordered(UnorderedScan),
96 Series(SeriesScan),
98}
99
100impl Scanner {
101 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
103 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104 match self {
105 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
106 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
107 Scanner::Series(series_scan) => series_scan.build_stream().await,
108 }
109 }
110
111 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
113 match self {
114 Scanner::Seq(x) => x.scan_all_partitions(),
115 Scanner::Unordered(x) => x.scan_all_partitions(),
116 Scanner::Series(x) => x.scan_all_partitions(),
117 }
118 }
119}
120
121#[cfg(test)]
122impl Scanner {
123 pub(crate) fn num_files(&self) -> usize {
125 match self {
126 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
127 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
128 Scanner::Series(series_scan) => series_scan.input().num_files(),
129 }
130 }
131
132 pub(crate) fn num_memtables(&self) -> usize {
134 match self {
135 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
136 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
137 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
138 }
139 }
140
141 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
143 match self {
144 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
145 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
146 Scanner::Series(series_scan) => series_scan.input().file_ids(),
147 }
148 }
149
150 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
151 match self {
152 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
153 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
154 Scanner::Series(series_scan) => series_scan.input().index_ids(),
155 }
156 }
157
158 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
159 match self {
160 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
161 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
162 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
163 }
164 }
165
166 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
168 use store_api::region_engine::{PrepareRequest, RegionScanner};
169
170 let request = PrepareRequest::default().with_target_partitions(target_partitions);
171 match self {
172 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
173 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
174 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
175 }
176 }
177}
178
179#[cfg_attr(doc, aquamarine::aquamarine)]
180pub(crate) struct ScanRegion {
230 version: VersionRef,
232 access_layer: AccessLayerRef,
234 request: ScanRequest,
236 cache_strategy: CacheStrategy,
238 max_concurrent_scan_files: usize,
240 ignore_inverted_index: bool,
242 ignore_fulltext_index: bool,
244 ignore_bloom_filter: bool,
246 start_time: Option<Instant>,
248 filter_deleted: bool,
251 #[cfg(feature = "enterprise")]
252 extension_range_provider: Option<BoxedExtensionRangeProvider>,
253}
254
255impl ScanRegion {
256 pub(crate) fn new(
258 version: VersionRef,
259 access_layer: AccessLayerRef,
260 request: ScanRequest,
261 cache_strategy: CacheStrategy,
262 ) -> ScanRegion {
263 ScanRegion {
264 version,
265 access_layer,
266 request,
267 cache_strategy,
268 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
269 ignore_inverted_index: false,
270 ignore_fulltext_index: false,
271 ignore_bloom_filter: false,
272 start_time: None,
273 filter_deleted: true,
274 #[cfg(feature = "enterprise")]
275 extension_range_provider: None,
276 }
277 }
278
279 #[must_use]
281 pub(crate) fn with_max_concurrent_scan_files(
282 mut self,
283 max_concurrent_scan_files: usize,
284 ) -> Self {
285 self.max_concurrent_scan_files = max_concurrent_scan_files;
286 self
287 }
288
289 #[must_use]
291 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
292 self.ignore_inverted_index = ignore;
293 self
294 }
295
296 #[must_use]
298 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
299 self.ignore_fulltext_index = ignore;
300 self
301 }
302
303 #[must_use]
305 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
306 self.ignore_bloom_filter = ignore;
307 self
308 }
309
310 #[must_use]
311 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
312 self.start_time = Some(now);
313 self
314 }
315
316 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
317 self.filter_deleted = filter_deleted;
318 }
319
320 #[cfg(feature = "enterprise")]
321 pub(crate) fn set_extension_range_provider(
322 &mut self,
323 extension_range_provider: BoxedExtensionRangeProvider,
324 ) {
325 self.extension_range_provider = Some(extension_range_provider);
326 }
327
328 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
330 pub(crate) async fn scanner(self) -> Result<Scanner> {
331 if self.use_series_scan() {
332 self.series_scan().await.map(Scanner::Series)
333 } else if self.use_unordered_scan() {
334 self.unordered_scan().await.map(Scanner::Unordered)
337 } else {
338 self.seq_scan().await.map(Scanner::Seq)
339 }
340 }
341
342 #[tracing::instrument(
344 level = tracing::Level::DEBUG,
345 skip_all,
346 fields(region_id = %self.region_id())
347 )]
348 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
349 if self.use_series_scan() {
350 self.series_scan()
351 .await
352 .map(|scanner| Box::new(scanner) as _)
353 } else if self.use_unordered_scan() {
354 self.unordered_scan()
355 .await
356 .map(|scanner| Box::new(scanner) as _)
357 } else {
358 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
359 }
360 }
361
362 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
364 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
365 let input = self.scan_input().await?.with_compaction(false);
366 Ok(SeqScan::new(input))
367 }
368
369 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
371 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
372 let input = self.scan_input().await?;
373 Ok(UnorderedScan::new(input))
374 }
375
376 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
379 let input = self.scan_input().await?;
380 Ok(SeriesScan::new(input))
381 }
382
383 fn use_unordered_scan(&self) -> bool {
385 self.version.options.append_mode
392 && self.request.series_row_selector.is_none()
393 && (self.request.distribution.is_none()
394 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
395 }
396
397 fn use_series_scan(&self) -> bool {
399 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
400 }
401
402 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
404 async fn scan_input(self) -> Result<ScanInput> {
405 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
406 let time_range = self.build_time_range_predicate();
407 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
408
409 let mut read_cols = match &self.request.projection_input {
410 Some(p) => {
411 let metadata = &self.version.metadata;
414 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
415 let from_predicate = read_columns_from_predicate(&predicate, metadata);
416 merge(from_projection, from_predicate)
417 }
418 None => {
419 let read_col_ids = self
420 .version
421 .metadata
422 .column_metadatas
423 .iter()
424 .map(|col| col.column_id);
425 ReadColumns::from_deduped_column_ids(read_col_ids)
426 }
427 };
428 narrow_read_columns_by_json_type_hint(
429 &mut read_cols,
430 &self.request.json_type_hint,
431 &self.version.metadata,
432 );
433 let read_col_ids = read_cols.column_ids();
434
435 let projection = self
437 .request
438 .projection_indices()
439 .map(|x| x.to_vec())
440 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
441 let json_type_hint = self
442 .version
443 .metadata
444 .schema
445 .arrow_schema()
446 .has_json_extension_field()
447 .then_some(&self.request.json_type_hint)
448 .inspect(|json_type_hint| {
449 debug!(
450 "Concretized JSON type: {{{}}}",
451 json_type_hint
452 .iter()
453 .map(|(k, v)| format!("{}: {}", k, v))
454 .join(", ")
455 );
456 });
457 let mapper = FlatProjectionMapper::new_with_read_columns(
458 &self.version.metadata,
459 projection,
460 read_cols,
461 json_type_hint,
462 )?;
463
464 let ssts = &self.version.ssts;
465 let mut files = Vec::new();
466 if !self.request.skip_sst_files {
467 for level in ssts.levels() {
468 for file in level.files.values() {
469 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
470 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
471 (Some(_), None) => true,
477 (None, _) => true,
478 };
479
480 if exceed_min_sequence && file_in_range(file, &time_range) {
482 files.push(file.clone());
483 }
484 }
488 }
489 }
490
491 let memtables = self.version.memtables.list_memtables();
492 let mut mem_range_builders = Vec::new();
494 let filter_mode = pre_filter_mode(
495 self.version.options.append_mode,
496 self.version.options.merge_mode(),
497 );
498
499 for m in memtables {
500 let Some((start, end)) = m.stats().time_range() else {
502 continue;
503 };
504 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
506 if !memtable_range.intersects(&time_range) {
507 continue;
508 }
509 let ranges_in_memtable = m.ranges(
510 Some(&read_col_ids),
511 RangesOptions::default()
512 .with_predicate(predicate.clone())
513 .with_sequence(SequenceRange::new(
514 self.request.memtable_min_sequence,
515 self.request.memtable_max_sequence,
516 ))
517 .with_pre_filter_mode(filter_mode),
518 )?;
519 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
520 let stats = v.stats().clone();
521 MemRangeBuilder::new(v, stats)
522 }));
523 }
524
525 let region_id = self.region_id();
526 debug!(
527 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
528 region_id,
529 self.request,
530 time_range,
531 mem_range_builders.len(),
532 files.len(),
533 self.version.options.append_mode,
534 );
535
536 let (non_field_filters, field_filters) = self.partition_by_field_filters();
537 let inverted_index_appliers = [
538 self.build_invereted_index_applier(&non_field_filters),
539 self.build_invereted_index_applier(&field_filters),
540 ];
541 let bloom_filter_appliers = [
542 self.build_bloom_filter_applier(&non_field_filters),
543 self.build_bloom_filter_applier(&field_filters),
544 ];
545 let fulltext_index_appliers = [
546 self.build_fulltext_index_applier(&non_field_filters),
547 self.build_fulltext_index_applier(&field_filters),
548 ];
549 #[cfg(feature = "vector_index")]
550 let vector_index_applier = self.build_vector_index_applier();
551 #[cfg(feature = "vector_index")]
552 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
553 if self.request.filters.is_empty() {
554 search.k
555 } else {
556 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
557 }
558 });
559
560 let input = ScanInput::new(self.access_layer, mapper)
561 .with_time_range(Some(time_range))
562 .with_predicate(predicate)
563 .with_memtables(mem_range_builders)
564 .with_files(files)
565 .with_cache(self.cache_strategy)
566 .with_inverted_index_appliers(inverted_index_appliers)
567 .with_bloom_filter_index_appliers(bloom_filter_appliers)
568 .with_fulltext_index_appliers(fulltext_index_appliers)
569 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
570 .with_start_time(self.start_time)
571 .with_append_mode(self.version.options.append_mode)
572 .with_filter_deleted(self.filter_deleted)
573 .with_merge_mode(self.version.options.merge_mode())
574 .with_series_row_selector(self.request.series_row_selector)
575 .with_distribution(self.request.distribution)
576 .with_explain_flat_format(
577 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
578 )
579 .with_snapshot_sequence(
580 self.request
581 .snapshot_on_scan
582 .then_some(self.request.memtable_max_sequence)
583 .flatten(),
584 );
585 #[cfg(feature = "vector_index")]
586 let input = input
587 .with_vector_index_applier(vector_index_applier)
588 .with_vector_index_k(vector_index_k);
589
590 #[cfg(feature = "enterprise")]
591 let input = if !self.request.skip_sst_files
592 && let Some(provider) = self.extension_range_provider
593 {
594 let ranges = provider
595 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
596 .await?;
597 debug!("Find extension ranges: {ranges:?}");
598 input.with_extension_ranges(ranges)
599 } else {
600 input
601 };
602 Ok(input)
603 }
604
605 fn region_id(&self) -> RegionId {
606 self.version.metadata.region_id
607 }
608
609 fn build_time_range_predicate(&self) -> TimestampRange {
611 let time_index = self.version.metadata.time_index_column();
612 let unit = time_index
613 .column_schema
614 .data_type
615 .as_timestamp()
616 .expect("Time index must have timestamp-compatible type")
617 .unit();
618 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
619 }
620
621 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
624 let field_columns = self
625 .version
626 .metadata
627 .field_columns()
628 .map(|col| &col.column_schema.name)
629 .collect::<HashSet<_>>();
630
631 let mut columns = HashSet::new();
632
633 self.request.filters.iter().cloned().partition(|expr| {
634 columns.clear();
635 if expr_to_columns(expr, &mut columns).is_err() {
637 return true;
639 }
640 !columns
642 .iter()
643 .any(|column| field_columns.contains(&column.name))
644 })
645 }
646
647 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
649 if self.ignore_inverted_index {
650 return None;
651 }
652
653 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
654 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
655
656 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
657
658 InvertedIndexApplierBuilder::new(
659 self.access_layer.table_dir().to_string(),
660 self.access_layer.path_type(),
661 self.access_layer.object_store().clone(),
662 self.version.metadata.as_ref(),
663 self.version.metadata.inverted_indexed_column_ids(
664 self.version
665 .options
666 .index_options
667 .inverted_index
668 .ignore_column_ids
669 .iter(),
670 ),
671 self.access_layer.puffin_manager_factory().clone(),
672 )
673 .with_file_cache(file_cache)
674 .with_inverted_index_cache(inverted_index_cache)
675 .with_puffin_metadata_cache(puffin_metadata_cache)
676 .build(filters)
677 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
678 .ok()
679 .flatten()
680 .map(Arc::new)
681 }
682
683 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
685 if self.ignore_bloom_filter {
686 return None;
687 }
688
689 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
690 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
691 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
692
693 BloomFilterIndexApplierBuilder::new(
694 self.access_layer.table_dir().to_string(),
695 self.access_layer.path_type(),
696 self.access_layer.object_store().clone(),
697 self.version.metadata.as_ref(),
698 self.access_layer.puffin_manager_factory().clone(),
699 )
700 .with_file_cache(file_cache)
701 .with_bloom_filter_index_cache(bloom_filter_index_cache)
702 .with_puffin_metadata_cache(puffin_metadata_cache)
703 .build(filters)
704 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
705 .ok()
706 .flatten()
707 .map(Arc::new)
708 }
709
710 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
712 if self.ignore_fulltext_index {
713 return None;
714 }
715
716 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
717 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
718 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
719 FulltextIndexApplierBuilder::new(
720 self.access_layer.table_dir().to_string(),
721 self.access_layer.path_type(),
722 self.access_layer.object_store().clone(),
723 self.access_layer.puffin_manager_factory().clone(),
724 self.version.metadata.as_ref(),
725 )
726 .with_file_cache(file_cache)
727 .with_puffin_metadata_cache(puffin_metadata_cache)
728 .with_bloom_filter_cache(bloom_filter_index_cache)
729 .build(filters)
730 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
731 .ok()
732 .flatten()
733 .map(Arc::new)
734 }
735
736 #[cfg(feature = "vector_index")]
738 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
739 let vector_search = self.request.vector_search.as_ref()?;
740
741 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
742 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
743 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
744
745 let applier = VectorIndexApplier::new(
746 self.access_layer.table_dir().to_string(),
747 self.access_layer.path_type(),
748 self.access_layer.object_store().clone(),
749 self.access_layer.puffin_manager_factory().clone(),
750 vector_search.column_id,
751 vector_search.query_vector.clone(),
752 vector_search.metric,
753 )
754 .with_file_cache(file_cache)
755 .with_puffin_metadata_cache(puffin_metadata_cache)
756 .with_vector_index_cache(vector_index_cache);
757
758 Some(Arc::new(applier))
759 }
760}
761
762fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
764 if predicate == &TimestampRange::min_to_max() {
765 return true;
766 }
767 let (start, end) = file.time_range();
769 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
770 file_ts_range.intersects(predicate)
771}
772
773pub struct ScanInput {
775 access_layer: AccessLayerRef,
777 pub(crate) mapper: Arc<FlatProjectionMapper>,
779 pub(crate) read_cols: ReadColumns,
783 pub(crate) time_range: Option<TimestampRange>,
785 pub(crate) predicate: PredicateGroup,
787 region_partition_expr: Option<PartitionExpr>,
789 pub(crate) memtables: Vec<MemRangeBuilder>,
791 pub(crate) files: Vec<FileHandle>,
793 pub(crate) cache_strategy: CacheStrategy,
795 ignore_file_not_found: bool,
797 pub(crate) max_concurrent_scan_files: usize,
799 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
801 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
802 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
803 #[cfg(feature = "vector_index")]
805 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
806 #[cfg(feature = "vector_index")]
808 pub(crate) vector_index_k: Option<usize>,
809 pub(crate) query_start: Option<Instant>,
811 pub(crate) append_mode: bool,
813 pub(crate) filter_deleted: bool,
815 pub(crate) merge_mode: MergeMode,
817 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
819 pub(crate) distribution: Option<TimeSeriesDistribution>,
821 explain_flat_format: bool,
823 pub(crate) snapshot_sequence: Option<SequenceNumber>,
825 pub(crate) compaction: bool,
827 #[cfg(feature = "enterprise")]
828 extension_ranges: Vec<BoxedExtensionRange>,
829}
830
831impl ScanInput {
832 #[must_use]
834 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
835 ScanInput {
836 access_layer,
837 read_cols: mapper.read_columns().clone(),
838 mapper: Arc::new(mapper),
839 time_range: None,
840 predicate: PredicateGroup::default(),
841 region_partition_expr: None,
842 memtables: Vec::new(),
843 files: Vec::new(),
844 cache_strategy: CacheStrategy::Disabled,
845 ignore_file_not_found: false,
846 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
847 inverted_index_appliers: [None, None],
848 bloom_filter_index_appliers: [None, None],
849 fulltext_index_appliers: [None, None],
850 #[cfg(feature = "vector_index")]
851 vector_index_applier: None,
852 #[cfg(feature = "vector_index")]
853 vector_index_k: None,
854 query_start: None,
855 append_mode: false,
856 filter_deleted: true,
857 merge_mode: MergeMode::default(),
858 series_row_selector: None,
859 distribution: None,
860 explain_flat_format: false,
861 snapshot_sequence: None,
862 compaction: false,
863 #[cfg(feature = "enterprise")]
864 extension_ranges: Vec::new(),
865 }
866 }
867
868 #[must_use]
870 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
871 self.time_range = time_range;
872 self
873 }
874
875 #[must_use]
877 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
878 self.region_partition_expr = predicate.region_partition_expr().cloned();
879 self.predicate = predicate;
880 self
881 }
882
883 #[must_use]
885 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
886 self.memtables = memtables;
887 self
888 }
889
890 #[must_use]
892 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
893 self.files = files;
894 self
895 }
896
897 #[must_use]
899 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
900 self.cache_strategy = cache;
901 self
902 }
903
904 #[must_use]
906 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
907 self.ignore_file_not_found = ignore;
908 self
909 }
910
911 #[must_use]
913 pub(crate) fn with_max_concurrent_scan_files(
914 mut self,
915 max_concurrent_scan_files: usize,
916 ) -> Self {
917 self.max_concurrent_scan_files = max_concurrent_scan_files;
918 self
919 }
920
921 #[must_use]
923 pub(crate) fn with_inverted_index_appliers(
924 mut self,
925 appliers: [Option<InvertedIndexApplierRef>; 2],
926 ) -> Self {
927 self.inverted_index_appliers = appliers;
928 self
929 }
930
931 #[must_use]
933 pub(crate) fn with_bloom_filter_index_appliers(
934 mut self,
935 appliers: [Option<BloomFilterIndexApplierRef>; 2],
936 ) -> Self {
937 self.bloom_filter_index_appliers = appliers;
938 self
939 }
940
941 #[must_use]
943 pub(crate) fn with_fulltext_index_appliers(
944 mut self,
945 appliers: [Option<FulltextIndexApplierRef>; 2],
946 ) -> Self {
947 self.fulltext_index_appliers = appliers;
948 self
949 }
950
951 #[cfg(feature = "vector_index")]
953 #[must_use]
954 pub(crate) fn with_vector_index_applier(
955 mut self,
956 applier: Option<VectorIndexApplierRef>,
957 ) -> Self {
958 self.vector_index_applier = applier;
959 self
960 }
961
962 #[cfg(feature = "vector_index")]
964 #[must_use]
965 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
966 self.vector_index_k = k;
967 self
968 }
969
970 #[must_use]
972 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
973 self.query_start = now;
974 self
975 }
976
977 #[must_use]
978 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
979 self.append_mode = is_append_mode;
980 self
981 }
982
983 #[must_use]
985 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
986 self.filter_deleted = filter_deleted;
987 self
988 }
989
990 #[must_use]
992 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
993 self.merge_mode = merge_mode;
994 self
995 }
996
997 #[must_use]
999 pub(crate) fn with_distribution(
1000 mut self,
1001 distribution: Option<TimeSeriesDistribution>,
1002 ) -> Self {
1003 self.distribution = distribution;
1004 self
1005 }
1006
1007 #[must_use]
1009 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1010 self.explain_flat_format = explain_flat_format;
1011 self
1012 }
1013
1014 #[must_use]
1016 pub(crate) fn with_series_row_selector(
1017 mut self,
1018 series_row_selector: Option<TimeSeriesRowSelector>,
1019 ) -> Self {
1020 self.series_row_selector = series_row_selector;
1021 self
1022 }
1023
1024 #[must_use]
1025 pub(crate) fn with_snapshot_sequence(
1026 mut self,
1027 snapshot_sequence: Option<SequenceNumber>,
1028 ) -> Self {
1029 self.snapshot_sequence = snapshot_sequence;
1030 self
1031 }
1032
1033 #[must_use]
1035 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1036 self.compaction = compaction;
1037 self
1038 }
1039
1040 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1042 let memtable = &self.memtables[index.index];
1043 let mut ranges = SmallVec::new();
1044 memtable.build_ranges(index.row_group_index, &mut ranges);
1045 ranges
1046 }
1047
1048 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1049 if self.should_skip_region_partition(file) {
1050 self.predicate.predicate_without_region().cloned()
1051 } else {
1052 self.predicate.predicate().cloned()
1053 }
1054 }
1055
1056 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1057 match (
1058 self.region_partition_expr.as_ref(),
1059 file.meta_ref().partition_expr.as_ref(),
1060 ) {
1061 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1062 _ => false,
1063 }
1064 }
1065
1066 #[tracing::instrument(
1068 skip_all,
1069 fields(
1070 region_id = %self.region_metadata().region_id,
1071 file_id = %file.file_id()
1072 )
1073 )]
1074 pub async fn prune_file(
1075 &self,
1076 file: &FileHandle,
1077 pre_filter_mode: PreFilterMode,
1078 reader_metrics: &mut ReaderMetrics,
1079 ) -> Result<FileRangeBuilder> {
1080 let predicate = self.predicate_for_file(file);
1081 let may_build_selective_row_selection = predicate.is_some();
1082 let decode_pk_values = !self.compaction
1083 && self
1084 .mapper
1085 .read_columns()
1086 .column_ids_iter()
1087 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1088 let reader = self
1089 .access_layer
1090 .read_sst(file.clone())
1091 .predicate(predicate)
1092 .projection(Some(self.read_cols.clone()))
1093 .cache(self.cache_strategy.clone())
1094 .inverted_index_appliers(self.inverted_index_appliers.clone())
1095 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1096 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1097 let reader = if !self.compaction && may_build_selective_row_selection {
1098 reader.deferred_optional_page_index()
1099 } else {
1100 reader
1101 };
1102 #[cfg(feature = "vector_index")]
1103 let reader = {
1104 let mut reader = reader;
1105 reader =
1106 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1107 reader
1108 };
1109 let res = reader
1110 .expected_metadata(Some(self.mapper.metadata().clone()))
1111 .compaction(self.compaction)
1112 .pre_filter_mode(pre_filter_mode)
1113 .decode_primary_key_values(decode_pk_values)
1114 .build_reader_input(reader_metrics)
1115 .await;
1116 let read_input = match res {
1117 Ok(x) => x,
1118 Err(e) => {
1119 if e.is_object_not_found() && self.ignore_file_not_found {
1120 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1121 return Ok(FileRangeBuilder::default());
1122 } else {
1123 return Err(e);
1124 }
1125 }
1126 };
1127
1128 let Some((mut file_range_ctx, selection)) = read_input else {
1129 return Ok(FileRangeBuilder::default());
1130 };
1131
1132 let need_compat = !compat::has_same_columns_and_pk_encoding(
1133 &self.mapper,
1134 file_range_ctx.read_format(),
1135 self.compaction,
1136 );
1137 if need_compat {
1138 let compat = FlatCompatBatch::try_new(
1141 &self.mapper,
1142 file_range_ctx.read_format(),
1143 self.compaction,
1144 )?;
1145 file_range_ctx.set_compat_batch(compat);
1146 }
1147 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1148 }
1149
1150 #[tracing::instrument(
1154 skip(self, sources, semaphore),
1155 fields(
1156 region_id = %self.region_metadata().region_id,
1157 source_count = sources.len()
1158 )
1159 )]
1160 pub(crate) fn create_parallel_flat_sources(
1161 &self,
1162 sources: Vec<BoxedRecordBatchStream>,
1163 semaphore: Arc<Semaphore>,
1164 channel_size: usize,
1165 ) -> Result<Vec<BoxedRecordBatchStream>> {
1166 if sources.len() <= 1 {
1167 return Ok(sources);
1168 }
1169
1170 let sources = sources
1172 .into_iter()
1173 .map(|source| {
1174 let (sender, receiver) = mpsc::channel(channel_size);
1175 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1176 let stream = Box::pin(ReceiverStream::new(receiver));
1177 Box::pin(stream) as _
1178 })
1179 .collect();
1180 Ok(sources)
1181 }
1182
1183 #[tracing::instrument(
1185 skip(self, input, semaphore, sender),
1186 fields(region_id = %self.region_metadata().region_id)
1187 )]
1188 pub(crate) fn spawn_flat_scan_task(
1189 &self,
1190 mut input: BoxedRecordBatchStream,
1191 semaphore: Arc<Semaphore>,
1192 sender: mpsc::Sender<Result<RecordBatch>>,
1193 ) {
1194 let region_id = self.region_metadata().region_id;
1195 let span = tracing::info_span!(
1196 "ScanInput::parallel_scan_task",
1197 region_id = %region_id,
1198 stream_kind = "flat"
1199 );
1200 common_runtime::spawn_query(
1201 async move {
1202 loop {
1203 let maybe_batch = {
1206 let _permit = semaphore.acquire().await.unwrap();
1208 input.next().await
1209 };
1210 match maybe_batch {
1211 Some(Ok(batch)) => {
1212 let _ = sender.send(Ok(batch)).await;
1213 }
1214 Some(Err(e)) => {
1215 let _ = sender.send(Err(e)).await;
1216 break;
1217 }
1218 None => break,
1219 }
1220 }
1221 }
1222 .instrument(span),
1223 );
1224 }
1225
1226 pub(crate) fn total_rows(&self) -> usize {
1227 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1228 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1229
1230 let rows = rows_in_files + rows_in_memtables;
1231 #[cfg(feature = "enterprise")]
1232 let rows = rows
1233 + self
1234 .extension_ranges
1235 .iter()
1236 .map(|x| x.num_rows())
1237 .sum::<u64>() as usize;
1238 rows
1239 }
1240
1241 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1242 &self.predicate
1243 }
1244
1245 pub(crate) fn num_memtables(&self) -> usize {
1247 self.memtables.len()
1248 }
1249
1250 pub(crate) fn num_files(&self) -> usize {
1252 self.files.len()
1253 }
1254
1255 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1257 let file_index = index.index - self.num_memtables();
1258 &self.files[file_index]
1259 }
1260
1261 pub fn region_metadata(&self) -> &RegionMetadataRef {
1262 self.mapper.metadata()
1263 }
1264
1265 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1266 if source_count <= 1 {
1267 return PreFilterMode::All;
1272 }
1273
1274 pre_filter_mode(self.append_mode, self.merge_mode)
1275 }
1276}
1277
1278#[cfg(feature = "enterprise")]
1279impl ScanInput {
1280 #[must_use]
1281 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1282 Self {
1283 extension_ranges,
1284 ..self
1285 }
1286 }
1287
1288 #[cfg(feature = "enterprise")]
1289 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1290 &self.extension_ranges
1291 }
1292
1293 #[cfg(feature = "enterprise")]
1295 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1296 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1297 }
1298}
1299
1300#[cfg(test)]
1301impl ScanInput {
1302 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1304 self.files.iter().map(|file| file.file_id()).collect()
1305 }
1306
1307 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1308 self.files.iter().map(|file| file.index_id()).collect()
1309 }
1310}
1311
1312fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1313 if append_mode {
1314 return PreFilterMode::All;
1315 }
1316
1317 match merge_mode {
1318 MergeMode::LastRow => PreFilterMode::SkipFields,
1319 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1320 }
1321}
1322
1323fn narrow_read_columns_by_json_type_hint(
1324 read_columns: &mut ReadColumns,
1325 json_type_hint: &HashMap<String, JsonNativeType>,
1326 metadata: &RegionMetadata,
1327) {
1328 if json_type_hint.is_empty() {
1329 return;
1330 }
1331
1332 for read_column in &mut read_columns.cols {
1333 let Some(column) = metadata.column_by_id(read_column.column_id) else {
1334 continue;
1335 };
1336 let column_name = &column.column_schema.name;
1337 let Some(json_type) = json_type_hint.get(column_name) else {
1338 continue;
1339 };
1340
1341 let mut paths = Vec::new();
1342 let mut current = vec![column_name.clone()];
1343 collect_json_nested_paths(json_type, &mut current, &mut paths);
1344 merge_nested_paths(&mut read_column.nested_paths, paths)
1345 }
1346}
1347
1348fn collect_json_nested_paths(
1349 json_type: &JsonNativeType,
1350 current: &mut NestedPath,
1351 paths: &mut Vec<NestedPath>,
1352) {
1353 match json_type {
1354 JsonNativeType::Object(fields) if !fields.is_empty() => {
1355 for (field, child) in fields {
1356 current.push(field.clone());
1357 collect_json_nested_paths(child, current, paths);
1358 current.pop();
1359 }
1360 }
1361 _ => paths.push(current.clone()),
1362 }
1363}
1364
1365pub(crate) struct ScanFingerprintBundle {
1369 pub(crate) fingerprint: ScanRequestFingerprint,
1370 pub(crate) implied_time_range: Option<TimestampRange>,
1375}
1376
1377pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1380 let eligible = !input.compaction
1381 && !input.files.is_empty()
1382 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1383
1384 if !eligible {
1385 return None;
1386 }
1387
1388 let metadata = input.region_metadata();
1389 let tag_names: HashSet<&str> = metadata
1390 .column_metadatas
1391 .iter()
1392 .filter(|col| col.semantic_type == SemanticType::Tag)
1393 .map(|col| col.column_schema.name.as_str())
1394 .collect();
1395
1396 let time_index = metadata.time_index_column();
1397 let time_index_name = time_index.column_schema.name.clone();
1398 let ts_col_unit = time_index
1399 .column_schema
1400 .data_type
1401 .as_timestamp()
1402 .expect("Time index must have timestamp-compatible type")
1403 .unit();
1404
1405 let exprs = input
1406 .predicate_group()
1407 .predicate_without_region()
1408 .map(|predicate| predicate.exprs())
1409 .unwrap_or_default();
1410
1411 let mut filters = Vec::new();
1412 let mut time_only_exprs: Vec<&Expr> = Vec::new();
1413 let mut has_tag_filter = false;
1414 let mut columns = HashSet::new();
1415
1416 for expr in exprs {
1417 columns.clear();
1418 let is_time_only = match expr_to_columns(expr, &mut columns) {
1419 Ok(()) if !columns.is_empty() => {
1420 has_tag_filter |= columns
1421 .iter()
1422 .any(|col| tag_names.contains(col.name.as_str()));
1423 columns.iter().all(|col| col.name == time_index_name)
1424 }
1425 _ => false,
1426 };
1427
1428 if is_time_only
1436 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1437 {
1438 time_only_exprs.push(expr);
1439 } else {
1440 filters.push(expr.to_string());
1441 }
1442 }
1443
1444 if !has_tag_filter {
1445 return None;
1447 }
1448
1449 let implied_time_range =
1450 implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1451 let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1452
1453 filters.sort_unstable();
1455 time_filters.sort_unstable();
1456 let read_columns = input.read_cols.clone();
1457 let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1458 read_column_types: read_columns
1459 .column_ids_iter()
1460 .map(|id| {
1461 metadata
1462 .column_by_id(id)
1463 .map(|col| col.column_schema.data_type.clone())
1464 })
1465 .collect(),
1466 read_columns,
1467 filters,
1468 time_filters,
1469 series_row_selector: input.series_row_selector,
1470 append_mode: input.append_mode,
1471 filter_deleted: input.filter_deleted,
1472 merge_mode: input.merge_mode,
1473 partition_expr_version: metadata.partition_expr_version,
1474 }
1475 .build();
1476
1477 Some(ScanFingerprintBundle {
1478 fingerprint,
1479 implied_time_range,
1480 })
1481}
1482
1483pub struct StreamContext {
1486 pub input: ScanInput,
1488 pub(crate) ranges: Vec<RangeMeta>,
1490 #[allow(dead_code)]
1493 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1494 pub(crate) scan_implied_time_range: Option<TimestampRange>,
1501
1502 pub(crate) query_start: Instant,
1505}
1506
1507impl StreamContext {
1508 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1510 let query_start = input.query_start.unwrap_or_else(Instant::now);
1511 let ranges = RangeMeta::seq_scan_ranges(&input);
1512 READ_SST_COUNT.observe(input.num_files() as f64);
1513 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1514 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1515 None => (None, None),
1516 };
1517
1518 Self {
1519 input,
1520 ranges,
1521 scan_fingerprint,
1522 scan_implied_time_range,
1523 query_start,
1524 }
1525 }
1526
1527 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1529 let query_start = input.query_start.unwrap_or_else(Instant::now);
1530 let ranges = RangeMeta::unordered_scan_ranges(&input);
1531 READ_SST_COUNT.observe(input.num_files() as f64);
1532 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1533 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1534 None => (None, None),
1535 };
1536
1537 Self {
1538 input,
1539 ranges,
1540 scan_fingerprint,
1541 scan_implied_time_range,
1542 query_start,
1543 }
1544 }
1545
1546 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1548 self.input.num_memtables() > index.index
1549 }
1550
1551 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1552 !self.is_mem_range_index(index)
1553 && index.index < self.input.num_files() + self.input.num_memtables()
1554 }
1555
1556 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1557 let range_meta = &self.ranges[part_range.identifier];
1558 let source_count = range_meta.indices.len();
1559
1560 self.input.range_pre_filter_mode(source_count)
1561 }
1562
1563 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1565 self.ranges
1566 .iter()
1567 .enumerate()
1568 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1569 .collect()
1570 }
1571
1572 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1574 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1575 for range_meta in &self.ranges {
1576 for idx in &range_meta.row_group_indices {
1577 if self.is_mem_range_index(*idx) {
1578 num_mem_ranges += 1;
1579 } else if self.is_file_range_index(*idx) {
1580 num_file_ranges += 1;
1581 } else {
1582 num_other_ranges += 1;
1583 }
1584 }
1585 }
1586 if verbose {
1587 write!(f, "{{")?;
1588 }
1589 write!(
1590 f,
1591 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1592 self.ranges.len(),
1593 num_mem_ranges,
1594 self.input.num_files(),
1595 num_file_ranges,
1596 )?;
1597 if num_other_ranges > 0 {
1598 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1599 }
1600 write!(f, "}}")?;
1601
1602 if let Some(selector) = &self.input.series_row_selector {
1603 write!(f, ", \"selector\":\"{}\"", selector)?;
1604 }
1605 if let Some(distribution) = &self.input.distribution {
1606 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1607 }
1608
1609 if verbose {
1610 self.format_verbose_content(f)?;
1611 }
1612
1613 Ok(())
1614 }
1615
1616 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1617 struct FileWrapper<'a> {
1618 file: &'a FileHandle,
1619 }
1620
1621 impl fmt::Debug for FileWrapper<'_> {
1622 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1623 let (start, end) = self.file.time_range();
1624 write!(
1625 f,
1626 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1627 self.file.file_id(),
1628 start.value(),
1629 start.unit(),
1630 end.value(),
1631 end.unit(),
1632 self.file.num_rows(),
1633 self.file.size(),
1634 self.file.index_size()
1635 )
1636 }
1637 }
1638
1639 struct InputWrapper<'a> {
1640 input: &'a ScanInput,
1641 }
1642
1643 #[cfg(feature = "enterprise")]
1644 impl InputWrapper<'_> {
1645 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1646 if self.input.extension_ranges.is_empty() {
1647 return Ok(());
1648 }
1649
1650 let mut delimiter = "";
1651 write!(f, ", extension_ranges: [")?;
1652 for range in self.input.extension_ranges() {
1653 write!(f, "{}{:?}", delimiter, range)?;
1654 delimiter = ", ";
1655 }
1656 write!(f, "]")?;
1657 Ok(())
1658 }
1659 }
1660
1661 impl fmt::Debug for InputWrapper<'_> {
1662 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1663 let output_schema = self.input.mapper.output_schema();
1664 if !output_schema.is_empty() {
1665 let names: Vec<_> = output_schema
1666 .column_schemas()
1667 .iter()
1668 .map(|col| &col.name)
1669 .collect();
1670 write!(f, ", \"projection\": {:?}", names)?;
1671 }
1672 if let Some(predicate) = &self.input.predicate.predicate() {
1673 if !predicate.exprs().is_empty() {
1674 let exprs: Vec<_> =
1675 predicate.exprs().iter().map(|e| e.to_string()).collect();
1676 write!(f, ", \"filters\": {:?}", exprs)?;
1677 }
1678 if !predicate.dyn_filters().is_empty() {
1679 let dyn_filters: Vec<_> = predicate
1680 .dyn_filters()
1681 .iter()
1682 .map(|f| format!("{}", f))
1683 .collect();
1684 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1685 }
1686 }
1687 #[cfg(feature = "vector_index")]
1688 if let Some(vector_index_k) = self.input.vector_index_k {
1689 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1690 }
1691 if !self.input.files.is_empty() {
1692 write!(f, ", \"files\": ")?;
1693 f.debug_list()
1694 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1695 .finish()?;
1696 }
1697 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1698 #[cfg(feature = "enterprise")]
1699 self.format_extension_ranges(f)?;
1700
1701 Ok(())
1702 }
1703 }
1704
1705 write!(f, "{:?}", InputWrapper { input: &self.input })
1706 }
1707
1708 pub(crate) fn add_dyn_filter_to_predicate(
1711 self: &Arc<Self>,
1712 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1713 ) -> Vec<bool> {
1714 let mut supported = Vec::with_capacity(filter_exprs.len());
1715 let filter_expr = filter_exprs
1716 .into_iter()
1717 .filter_map(|expr| {
1718 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1719 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1720 {
1721 supported.push(true);
1722 Some(dyn_filter)
1723 } else {
1724 supported.push(false);
1725 None
1726 }
1727 })
1728 .collect();
1729 self.input.predicate.add_dyn_filters(filter_expr);
1730 supported
1731 }
1732}
1733
1734#[derive(Clone, Default)]
1737pub struct PredicateGroup {
1738 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1739 predicate_all: Predicate,
1741 predicate_without_region: Predicate,
1743 region_partition_expr: Option<PartitionExpr>,
1745}
1746
1747impl PredicateGroup {
1748 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1750 let mut combined_exprs = exprs.to_vec();
1751 let mut region_partition_expr = None;
1752
1753 if let Some(expr_json) = metadata.partition_expr.as_ref()
1754 && !expr_json.is_empty()
1755 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1756 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1757 {
1758 let logical_expr = expr
1759 .try_as_logical_expr()
1760 .context(InvalidPartitionExprSnafu {
1761 expr: expr_json.clone(),
1762 })?;
1763
1764 combined_exprs.push(logical_expr);
1765 region_partition_expr = Some(expr);
1766 }
1767
1768 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1769 let mut columns = HashSet::new();
1771 for expr in &combined_exprs {
1772 columns.clear();
1773 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1774 continue;
1775 };
1776 time_filters.push(filter);
1777 }
1778 let time_filters = if time_filters.is_empty() {
1779 None
1780 } else {
1781 Some(Arc::new(time_filters))
1782 };
1783
1784 let predicate_all = Predicate::new(combined_exprs);
1785 let predicate_without_region = Predicate::new(exprs.to_vec());
1786
1787 Ok(Self {
1788 time_filters,
1789 predicate_all,
1790 predicate_without_region,
1791 region_partition_expr,
1792 })
1793 }
1794
1795 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1797 self.time_filters.clone()
1798 }
1799
1800 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1802 if self.predicate_all.is_empty() {
1803 None
1804 } else {
1805 Some(&self.predicate_all)
1806 }
1807 }
1808
1809 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1811 if self.predicate_without_region.is_empty() {
1812 None
1813 } else {
1814 Some(&self.predicate_without_region)
1815 }
1816 }
1817
1818 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1820 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1821 self.predicate_without_region.add_dyn_filters(dyn_filters);
1822 }
1823
1824 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1826 self.region_partition_expr.as_ref()
1827 }
1828
1829 fn expr_to_filter(
1830 expr: &Expr,
1831 metadata: &RegionMetadata,
1832 columns: &mut HashSet<Column>,
1833 ) -> Option<SimpleFilterEvaluator> {
1834 columns.clear();
1835 expr_to_columns(expr, columns).ok()?;
1838 if columns.len() > 1 {
1839 return None;
1841 }
1842 let column = columns.iter().next()?;
1843 let column_meta = metadata.column_by_name(&column.name)?;
1844 if column_meta.semantic_type == SemanticType::Timestamp {
1845 SimpleFilterEvaluator::try_new(expr)
1846 } else {
1847 None
1848 }
1849 }
1850}
1851
1852#[cfg(test)]
1853mod tests {
1854 use std::sync::Arc;
1855
1856 use datafusion::physical_plan::expressions::lit as physical_lit;
1857 use datafusion_common::ScalarValue;
1858 use datafusion_expr::{col, lit};
1859 use datatypes::prelude::ConcreteDataType;
1860 use datatypes::schema::ColumnSchema;
1861 use datatypes::types::json_type::JsonObjectType;
1862 use datatypes::value::Value;
1863 use partition::expr::col as partition_col;
1864 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1865 use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1866
1867 use super::*;
1868 use crate::cache::CacheManager;
1869 use crate::error::InvalidMetadataSnafu;
1870 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1871 use crate::read::read_columns::ReadColumn;
1872 use crate::test_util::memtable_util::metadata_with_primary_key;
1873 use crate::test_util::scheduler_util::SchedulerEnv;
1874
1875 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1876 let env = SchedulerEnv::new().await;
1877 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1878 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1879 let file = FileHandle::new(
1880 crate::sst::file::FileMeta::default(),
1881 Arc::new(crate::sst::file_purger::NoopFilePurger),
1882 );
1883
1884 ScanInput::new(env.access_layer.clone(), mapper)
1885 .with_predicate(predicate)
1886 .with_cache(CacheStrategy::EnableAll(Arc::new(
1887 CacheManager::builder()
1888 .range_result_cache_size(1024)
1889 .build(),
1890 )))
1891 .with_files(vec![file])
1892 }
1893
1894 fn ts_lit(val: i64) -> datafusion_expr::Expr {
1896 lit(ScalarValue::TimestampMillisecond(Some(val), None))
1897 }
1898
1899 #[test]
1900 fn test_fill_json_nested_paths_from_hint() -> Result<()> {
1901 fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
1902 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
1903 builder
1904 .push_column_metadata(ColumnMetadata {
1905 column_schema: ColumnSchema::new(
1906 "tag".to_string(),
1907 ConcreteDataType::string_datatype(),
1908 true,
1909 ),
1910 semantic_type: SemanticType::Tag,
1911 column_id: 0,
1912 })
1913 .push_column_metadata(ColumnMetadata {
1914 column_schema: ColumnSchema::new(
1915 "j".to_string(),
1916 ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
1917 true,
1918 ),
1919 semantic_type: SemanticType::Field,
1920 column_id: 1,
1921 })
1922 .push_column_metadata(ColumnMetadata {
1923 column_schema: ColumnSchema::new(
1924 "ts".to_string(),
1925 ConcreteDataType::timestamp_millisecond_datatype(),
1926 false,
1927 ),
1928 semantic_type: SemanticType::Timestamp,
1929 column_id: 2,
1930 });
1931 builder.primary_key(vec![0]);
1932 builder.build().context(InvalidMetadataSnafu).map(Arc::new)
1933 }
1934
1935 let metadata = json_projection_test_metadata()?;
1936 let hint = HashMap::from([(
1937 "j".to_string(),
1938 JsonNativeType::Object(JsonObjectType::from([
1939 ("a".to_string(), JsonNativeType::i64()),
1940 (
1941 "b".to_string(),
1942 JsonNativeType::Object(JsonObjectType::from([(
1943 "c".to_string(),
1944 JsonNativeType::String,
1945 )])),
1946 ),
1947 ])),
1948 )]);
1949
1950 fn nested_path(parts: &[&str]) -> NestedPath {
1951 parts.iter().map(|part| part.to_string()).collect()
1952 }
1953
1954 let mut read_columns = ReadColumns {
1955 cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
1956 };
1957 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1958 assert_eq!(
1959 read_columns,
1960 ReadColumns {
1961 cols: vec![
1962 ReadColumn::new(
1963 1,
1964 vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
1965 ),
1966 ReadColumn::new(0, vec![])
1967 ]
1968 }
1969 );
1970
1971 let mut read_columns = ReadColumns {
1972 cols: vec![ReadColumn::new(0, vec![])],
1973 };
1974 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
1975 assert_eq!(
1976 read_columns,
1977 ReadColumns {
1978 cols: vec![ReadColumn::new(0, vec![])]
1979 }
1980 );
1981 Ok(())
1982 }
1983
1984 #[tokio::test]
1985 async fn test_build_scan_fingerprint_for_eligible_scan() {
1986 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
1987 let input = new_scan_input(
1988 metadata.clone(),
1989 vec![
1990 col("ts").gt_eq(ts_lit(1000)),
1991 col("k0").eq(lit("foo")),
1992 col("v0").gt(lit(1)),
1993 ],
1994 )
1995 .await
1996 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
1997 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
1998 .with_merge_mode(MergeMode::LastNonNull)
1999 .with_filter_deleted(false);
2000
2001 let fingerprint = build_scan_fingerprint(&input).unwrap();
2002
2003 let expected = ScanRequestFingerprintBuilder {
2004 read_columns: input.read_cols,
2005 read_column_types: vec![
2006 metadata
2007 .column_by_id(0)
2008 .map(|col| col.column_schema.data_type.clone()),
2009 metadata
2010 .column_by_id(2)
2011 .map(|col| col.column_schema.data_type.clone()),
2012 metadata
2013 .column_by_id(3)
2014 .map(|col| col.column_schema.data_type.clone()),
2015 ],
2016 filters: vec![
2017 col("k0").eq(lit("foo")).to_string(),
2018 col("v0").gt(lit(1)).to_string(),
2019 ],
2020 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2021 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2022 append_mode: false,
2023 filter_deleted: false,
2024 merge_mode: MergeMode::LastNonNull,
2025 partition_expr_version: 0,
2026 }
2027 .build();
2028 assert_eq!(expected, fingerprint.fingerprint);
2029 }
2030
2031 #[tokio::test]
2032 async fn test_build_scan_fingerprint_requires_tag_filter() {
2033 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2034 let input = new_scan_input(
2035 metadata,
2036 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2037 )
2038 .await;
2039
2040 assert!(build_scan_fingerprint(&input).is_none());
2041 }
2042
2043 #[tokio::test]
2044 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2045 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2046 let filters = vec![col("k0").eq(lit("foo"))];
2047
2048 let disabled = ScanInput::new(
2049 SchedulerEnv::new().await.access_layer.clone(),
2050 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2051 )
2052 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2053 assert!(build_scan_fingerprint(&disabled).is_none());
2054
2055 let compaction = new_scan_input(metadata.clone(), filters.clone())
2056 .await
2057 .with_compaction(true);
2058 assert!(build_scan_fingerprint(&compaction).is_none());
2059
2060 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2062 assert!(build_scan_fingerprint(&no_files).is_none());
2063 }
2064
2065 #[tokio::test]
2066 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2067 let base = metadata_with_primary_key(vec![0, 1], false);
2068 let mut builder = RegionMetadataBuilder::from_existing(base);
2069 let partition_expr = partition_col("k0")
2070 .gt_eq(Value::String("foo".into()))
2071 .as_json_str()
2072 .unwrap();
2073 builder.partition_expr_json(Some(partition_expr));
2074 let metadata = Arc::new(builder.build_without_validation().unwrap());
2075
2076 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2077 let fingerprint = build_scan_fingerprint(&input).unwrap();
2078
2079 let expected = ScanRequestFingerprintBuilder {
2080 read_columns: input.read_cols,
2081 read_column_types: vec![
2082 metadata
2083 .column_by_id(0)
2084 .map(|col| col.column_schema.data_type.clone()),
2085 metadata
2086 .column_by_id(2)
2087 .map(|col| col.column_schema.data_type.clone()),
2088 metadata
2089 .column_by_id(3)
2090 .map(|col| col.column_schema.data_type.clone()),
2091 ],
2092 filters: vec![col("k0").eq(lit("foo")).to_string()],
2093 time_filters: vec![],
2094 series_row_selector: None,
2095 append_mode: false,
2096 filter_deleted: true,
2097 merge_mode: MergeMode::LastRow,
2098 partition_expr_version: metadata.partition_expr_version,
2099 }
2100 .build();
2101 assert_eq!(expected, fingerprint.fingerprint);
2102 assert_ne!(0, metadata.partition_expr_version);
2103 }
2104
2105 #[test]
2106 fn test_update_dyn_filters_with_empty_base_predicates() {
2107 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2108 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2109 assert!(predicate_group.predicate().is_none());
2110 assert!(predicate_group.predicate_without_region().is_none());
2111
2112 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2113 predicate_group.add_dyn_filters(vec![dyn_filter]);
2114
2115 let predicate_all = predicate_group.predicate().unwrap();
2116 assert!(predicate_all.exprs().is_empty());
2117 assert_eq!(1, predicate_all.dyn_filters().len());
2118
2119 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2120 assert!(predicate_without_region.exprs().is_empty());
2121 assert_eq!(1, predicate_without_region.dyn_filters().len());
2122 }
2123
2124 #[tokio::test]
2125 async fn test_range_pre_filter_mode() {
2126 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2127 let cases = [
2128 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2129 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2130 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2131 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2132 ];
2133
2134 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2135 let input = new_scan_input(metadata.clone(), vec![])
2136 .await
2137 .with_append_mode(append_mode)
2138 .with_merge_mode(merge_mode);
2139
2140 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2141 }
2142 }
2143}