1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::filter::SimpleFilterEvaluator;
27use common_telemetry::tracing::Instrument;
28use common_telemetry::{debug, error, tracing, warn};
29use common_time::range::TimestampRange;
30use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
31use datafusion_common::pruning::PruningStatistics;
32use datafusion_common::{Column, ScalarValue};
33use datafusion_expr::Expr;
34use datafusion_expr::utils::expr_to_columns;
35use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
36use datatypes::extension::json::is_structured_json_field;
37use datatypes::types::json_type::JsonNativeType;
38use datatypes::value::timestamp_to_scalar_value;
39use futures::StreamExt;
40use itertools::Itertools;
41use partition::expr::PartitionExpr;
42use smallvec::SmallVec;
43use snafu::ResultExt;
44use store_api::metadata::{RegionMetadata, RegionMetadataRef};
45use store_api::region_engine::{PartitionRange, RegionScannerRef};
46use store_api::storage::{
47 NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
48 TimeSeriesRowSelector,
49};
50use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
51use tokio::sync::{Semaphore, mpsc};
52use tokio_stream::wrappers::ReceiverStream;
53
54use crate::access_layer::AccessLayerRef;
55use crate::cache::CacheStrategy;
56use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
57use crate::error::{InvalidPartitionExprSnafu, Result};
58#[cfg(feature = "enterprise")]
59use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
60use crate::memtable::{MemtableRange, RangesOptions};
61use crate::metrics::READ_SST_COUNT;
62use crate::read::compat::{self, FlatCompatBatch};
63use crate::read::flat_projection::FlatProjectionMapper;
64use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
65use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
66use crate::read::read_columns::{
67 ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
68 read_columns_from_projection,
69};
70use crate::read::seq_scan::SeqScan;
71use crate::read::series_scan::SeriesScan;
72use crate::read::stream::ScanBatchStream;
73use crate::read::unordered_scan::UnorderedScan;
74use crate::read::{BoxedRecordBatchStream, RecordBatch};
75use crate::region::options::MergeMode;
76use crate::region::version::VersionRef;
77use crate::sst::file::FileHandle;
78use crate::sst::index::bloom_filter::applier::{
79 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
80};
81use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
82use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
83use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
84use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
85#[cfg(feature = "vector_index")]
86use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
87use crate::sst::parquet::file_range::PreFilterMode;
88use crate::sst::parquet::reader::ReaderMetrics;
89
90#[cfg(feature = "vector_index")]
91const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
92
93pub(crate) enum Scanner {
95 Seq(SeqScan),
97 Unordered(UnorderedScan),
99 Series(SeriesScan),
101}
102
103impl Scanner {
104 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
106 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
107 match self {
108 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
109 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
110 Scanner::Series(series_scan) => series_scan.build_stream().await,
111 }
112 }
113
114 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
116 match self {
117 Scanner::Seq(x) => x.scan_all_partitions(),
118 Scanner::Unordered(x) => x.scan_all_partitions(),
119 Scanner::Series(x) => x.scan_all_partitions(),
120 }
121 }
122}
123
124#[cfg(test)]
125impl Scanner {
126 pub(crate) fn num_files(&self) -> usize {
128 match self {
129 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
130 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
131 Scanner::Series(series_scan) => series_scan.input().num_files(),
132 }
133 }
134
135 pub(crate) fn num_memtables(&self) -> usize {
137 match self {
138 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
139 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
140 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
141 }
142 }
143
144 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
146 match self {
147 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
148 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
149 Scanner::Series(series_scan) => series_scan.input().file_ids(),
150 }
151 }
152
153 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
154 match self {
155 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
156 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
157 Scanner::Series(series_scan) => series_scan.input().index_ids(),
158 }
159 }
160
161 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
162 match self {
163 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
164 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
165 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
166 }
167 }
168
169 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
171 use store_api::region_engine::{PrepareRequest, RegionScanner};
172
173 let request = PrepareRequest::default().with_target_partitions(target_partitions);
174 match self {
175 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
176 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
177 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
178 }
179 }
180}
181
182#[cfg_attr(doc, aquamarine::aquamarine)]
183pub(crate) struct ScanRegion {
233 version: VersionRef,
235 access_layer: AccessLayerRef,
237 request: ScanRequest,
239 cache_strategy: CacheStrategy,
241 max_concurrent_scan_files: usize,
243 ignore_inverted_index: bool,
245 ignore_fulltext_index: bool,
247 ignore_bloom_filter: bool,
249 start_time: Option<Instant>,
251 filter_deleted: bool,
254 #[cfg(feature = "enterprise")]
255 extension_range_provider: Option<BoxedExtensionRangeProvider>,
256}
257
258impl ScanRegion {
259 pub(crate) fn new(
261 version: VersionRef,
262 access_layer: AccessLayerRef,
263 request: ScanRequest,
264 cache_strategy: CacheStrategy,
265 ) -> ScanRegion {
266 ScanRegion {
267 version,
268 access_layer,
269 request,
270 cache_strategy,
271 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
272 ignore_inverted_index: false,
273 ignore_fulltext_index: false,
274 ignore_bloom_filter: false,
275 start_time: None,
276 filter_deleted: true,
277 #[cfg(feature = "enterprise")]
278 extension_range_provider: None,
279 }
280 }
281
282 #[must_use]
284 pub(crate) fn with_max_concurrent_scan_files(
285 mut self,
286 max_concurrent_scan_files: usize,
287 ) -> Self {
288 self.max_concurrent_scan_files = max_concurrent_scan_files;
289 self
290 }
291
292 #[must_use]
294 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
295 self.ignore_inverted_index = ignore;
296 self
297 }
298
299 #[must_use]
301 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
302 self.ignore_fulltext_index = ignore;
303 self
304 }
305
306 #[must_use]
308 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
309 self.ignore_bloom_filter = ignore;
310 self
311 }
312
313 #[must_use]
314 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
315 self.start_time = Some(now);
316 self
317 }
318
319 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
320 self.filter_deleted = filter_deleted;
321 }
322
323 #[cfg(feature = "enterprise")]
324 pub(crate) fn set_extension_range_provider(
325 &mut self,
326 extension_range_provider: BoxedExtensionRangeProvider,
327 ) {
328 self.extension_range_provider = Some(extension_range_provider);
329 }
330
331 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
333 pub(crate) async fn scanner(self) -> Result<Scanner> {
334 if self.use_series_scan() {
335 self.series_scan().await.map(Scanner::Series)
336 } else if self.use_unordered_scan() {
337 self.unordered_scan().await.map(Scanner::Unordered)
340 } else {
341 self.seq_scan().await.map(Scanner::Seq)
342 }
343 }
344
345 #[tracing::instrument(
347 level = tracing::Level::DEBUG,
348 skip_all,
349 fields(region_id = %self.region_id())
350 )]
351 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
352 if self.use_series_scan() {
353 self.series_scan()
354 .await
355 .map(|scanner| Box::new(scanner) as _)
356 } else if self.use_unordered_scan() {
357 self.unordered_scan()
358 .await
359 .map(|scanner| Box::new(scanner) as _)
360 } else {
361 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
362 }
363 }
364
365 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
367 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
368 let input = self.scan_input().await?.with_compaction(false);
369 Ok(SeqScan::new(input))
370 }
371
372 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
374 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
375 let input = self.scan_input().await?;
376 Ok(UnorderedScan::new(input))
377 }
378
379 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
381 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
382 let input = self.scan_input().await?;
383 Ok(SeriesScan::new(input))
384 }
385
386 fn use_unordered_scan(&self) -> bool {
388 self.version.options.append_mode
395 && self.request.series_row_selector.is_none()
396 && (self.request.distribution.is_none()
397 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
398 }
399
400 fn use_series_scan(&self) -> bool {
402 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
403 }
404
405 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
407 async fn scan_input(self) -> Result<ScanInput> {
408 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
409 let time_range = self.build_time_range_predicate();
410 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
411
412 let mut read_cols = match &self.request.projection_input {
413 Some(p) => {
414 let metadata = &self.version.metadata;
417 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
418 let from_predicate = read_columns_from_predicate(&predicate, metadata);
419 merge(from_projection, from_predicate)
420 }
421 None => {
422 let read_col_ids = self
423 .version
424 .metadata
425 .column_metadatas
426 .iter()
427 .map(|col| col.column_id);
428 ReadColumns::from_deduped_column_ids(read_col_ids)
429 }
430 };
431 let has_structured_json = self
435 .version
436 .metadata
437 .schema
438 .arrow_schema()
439 .fields()
440 .iter()
441 .any(is_structured_json_field);
442 if has_structured_json {
443 narrow_read_columns_by_json_type_hint(
444 &mut read_cols,
445 &self.request.json_type_hint,
446 &self.version.metadata,
447 );
448 }
449 let read_col_ids = read_cols.column_ids();
450
451 let projection = self
453 .request
454 .projection_indices()
455 .map(|x| x.to_vec())
456 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
457 let json_type_hint = has_structured_json
458 .then_some(&self.request.json_type_hint)
459 .inspect(|json_type_hint| {
460 debug!(
461 "Concretized JSON type: {{{}}}",
462 json_type_hint
463 .iter()
464 .map(|(k, v)| format!("{}: {}", k, v))
465 .join(", ")
466 );
467 });
468 let mapper = FlatProjectionMapper::new_with_read_columns(
469 &self.version.metadata,
470 projection,
471 read_cols,
472 json_type_hint,
473 )?;
474
475 let ssts = &self.version.ssts;
476 let mut files = Vec::new();
477 if !self.request.skip_sst_files {
478 for level in ssts.levels() {
479 for file in level.files.values() {
480 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
481 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
482 (Some(_), None) => true,
488 (None, _) => true,
489 };
490
491 if exceed_min_sequence && file_in_range(file, &time_range) {
493 files.push(file.clone());
494 }
495 }
499 }
500 }
501
502 let memtables = self.version.memtables.list_memtables();
503 let mut mem_range_builders = Vec::new();
505 let filter_mode = pre_filter_mode(
506 self.version.options.append_mode,
507 self.version.options.merge_mode(),
508 );
509
510 for m in memtables {
511 let Some((start, end)) = m.stats().time_range() else {
513 continue;
514 };
515 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
517 if !memtable_range.intersects(&time_range) {
518 continue;
519 }
520 let ranges_in_memtable = m.ranges(
521 Some(&read_col_ids),
522 RangesOptions::default()
523 .with_predicate(predicate.clone())
524 .with_sequence(SequenceRange::new(
525 self.request.memtable_min_sequence,
526 self.request.memtable_max_sequence,
527 ))
528 .with_pre_filter_mode(filter_mode),
529 )?;
530 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
531 let stats = v.stats().clone();
532 MemRangeBuilder::new(v, stats)
533 }));
534 }
535
536 let region_id = self.region_id();
537 debug!(
538 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
539 region_id,
540 self.request,
541 time_range,
542 mem_range_builders.len(),
543 files.len(),
544 self.version.options.append_mode,
545 );
546
547 let (non_field_filters, field_filters) = self.partition_by_field_filters();
548 let inverted_index_appliers = [
549 self.build_invereted_index_applier(&non_field_filters),
550 self.build_invereted_index_applier(&field_filters),
551 ];
552 let bloom_filter_appliers = [
553 self.build_bloom_filter_applier(&non_field_filters),
554 self.build_bloom_filter_applier(&field_filters),
555 ];
556 let fulltext_index_appliers = [
557 self.build_fulltext_index_applier(&non_field_filters),
558 self.build_fulltext_index_applier(&field_filters),
559 ];
560 #[cfg(feature = "vector_index")]
561 let vector_index_applier = self.build_vector_index_applier();
562 #[cfg(feature = "vector_index")]
563 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
564 if self.request.filters.is_empty() {
565 search.k
566 } else {
567 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
568 }
569 });
570
571 let input = ScanInput::new(self.access_layer, mapper)
572 .with_time_range(Some(time_range))
573 .with_predicate(predicate)
574 .with_memtables(mem_range_builders)
575 .with_files(files)
576 .with_cache(self.cache_strategy)
577 .with_inverted_index_appliers(inverted_index_appliers)
578 .with_bloom_filter_index_appliers(bloom_filter_appliers)
579 .with_fulltext_index_appliers(fulltext_index_appliers)
580 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
581 .with_start_time(self.start_time)
582 .with_append_mode(self.version.options.append_mode)
583 .with_filter_deleted(self.filter_deleted)
584 .with_merge_mode(self.version.options.merge_mode())
585 .with_series_row_selector(self.request.series_row_selector)
586 .with_distribution(self.request.distribution)
587 .with_explain_flat_format(
588 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
589 )
590 .with_snapshot_sequence(
591 self.request
592 .snapshot_on_scan
593 .then_some(self.request.memtable_max_sequence)
594 .flatten(),
595 );
596 #[cfg(feature = "vector_index")]
597 let input = input
598 .with_vector_index_applier(vector_index_applier)
599 .with_vector_index_k(vector_index_k);
600
601 #[cfg(feature = "enterprise")]
602 let input = if !self.request.skip_sst_files
603 && let Some(provider) = self.extension_range_provider
604 {
605 let ranges = provider
606 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
607 .await?;
608 debug!("Find extension ranges: {ranges:?}");
609 input.with_extension_ranges(ranges)
610 } else {
611 input
612 };
613 Ok(input)
614 }
615
616 fn region_id(&self) -> RegionId {
617 self.version.metadata.region_id
618 }
619
620 fn build_time_range_predicate(&self) -> TimestampRange {
622 let time_index = self.version.metadata.time_index_column();
623 let unit = time_index
624 .column_schema
625 .data_type
626 .as_timestamp()
627 .expect("Time index must have timestamp-compatible type")
628 .unit();
629 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
630 }
631
632 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
635 let field_columns = self
636 .version
637 .metadata
638 .field_columns()
639 .map(|col| &col.column_schema.name)
640 .collect::<HashSet<_>>();
641
642 let mut columns = HashSet::new();
643
644 self.request.filters.iter().cloned().partition(|expr| {
645 columns.clear();
646 if expr_to_columns(expr, &mut columns).is_err() {
648 return true;
650 }
651 !columns
653 .iter()
654 .any(|column| field_columns.contains(&column.name))
655 })
656 }
657
658 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
660 if self.ignore_inverted_index {
661 return None;
662 }
663
664 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
665 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
666
667 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
668
669 InvertedIndexApplierBuilder::new(
670 self.access_layer.table_dir().to_string(),
671 self.access_layer.path_type(),
672 self.access_layer.object_store().clone(),
673 self.version.metadata.as_ref(),
674 self.version.metadata.inverted_indexed_column_ids(
675 self.version
676 .options
677 .index_options
678 .inverted_index
679 .ignore_column_ids
680 .iter(),
681 ),
682 self.access_layer.puffin_manager_factory().clone(),
683 )
684 .with_file_cache(file_cache)
685 .with_inverted_index_cache(inverted_index_cache)
686 .with_puffin_metadata_cache(puffin_metadata_cache)
687 .build(filters)
688 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
689 .ok()
690 .flatten()
691 .map(Arc::new)
692 }
693
694 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
696 if self.ignore_bloom_filter {
697 return None;
698 }
699
700 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
701 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
702 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
703
704 BloomFilterIndexApplierBuilder::new(
705 self.access_layer.table_dir().to_string(),
706 self.access_layer.path_type(),
707 self.access_layer.object_store().clone(),
708 self.version.metadata.as_ref(),
709 self.access_layer.puffin_manager_factory().clone(),
710 )
711 .with_file_cache(file_cache)
712 .with_bloom_filter_index_cache(bloom_filter_index_cache)
713 .with_puffin_metadata_cache(puffin_metadata_cache)
714 .build(filters)
715 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
716 .ok()
717 .flatten()
718 .map(Arc::new)
719 }
720
721 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
723 if self.ignore_fulltext_index {
724 return None;
725 }
726
727 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
728 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
729 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
730 FulltextIndexApplierBuilder::new(
731 self.access_layer.table_dir().to_string(),
732 self.access_layer.path_type(),
733 self.access_layer.object_store().clone(),
734 self.access_layer.puffin_manager_factory().clone(),
735 self.version.metadata.as_ref(),
736 )
737 .with_file_cache(file_cache)
738 .with_puffin_metadata_cache(puffin_metadata_cache)
739 .with_bloom_filter_cache(bloom_filter_index_cache)
740 .build(filters)
741 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
742 .ok()
743 .flatten()
744 .map(Arc::new)
745 }
746
747 #[cfg(feature = "vector_index")]
749 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
750 let vector_search = self.request.vector_search.as_ref()?;
751
752 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
753 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
754 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
755
756 let applier = VectorIndexApplier::new(
757 self.access_layer.table_dir().to_string(),
758 self.access_layer.path_type(),
759 self.access_layer.object_store().clone(),
760 self.access_layer.puffin_manager_factory().clone(),
761 vector_search.column_id,
762 vector_search.query_vector.clone(),
763 vector_search.metric,
764 )
765 .with_file_cache(file_cache)
766 .with_puffin_metadata_cache(puffin_metadata_cache)
767 .with_vector_index_cache(vector_index_cache);
768
769 Some(Arc::new(applier))
770 }
771}
772
773fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
775 if predicate == &TimestampRange::min_to_max() {
776 return true;
777 }
778 let (start, end) = file.time_range();
780 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
781 file_ts_range.intersects(predicate)
782}
783
784pub struct ScanInput {
786 access_layer: AccessLayerRef,
788 pub(crate) mapper: Arc<FlatProjectionMapper>,
790 pub(crate) read_cols: ReadColumns,
794 pub(crate) time_range: Option<TimestampRange>,
796 pub(crate) predicate: PredicateGroup,
798 region_partition_expr: Option<PartitionExpr>,
800 pub(crate) memtables: Vec<MemRangeBuilder>,
802 pub(crate) files: Vec<FileHandle>,
804 pub(crate) cache_strategy: CacheStrategy,
806 ignore_file_not_found: bool,
808 pub(crate) max_concurrent_scan_files: usize,
810 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
812 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
813 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
814 #[cfg(feature = "vector_index")]
816 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
817 #[cfg(feature = "vector_index")]
819 pub(crate) vector_index_k: Option<usize>,
820 pub(crate) query_start: Option<Instant>,
822 pub(crate) append_mode: bool,
824 pub(crate) filter_deleted: bool,
826 pub(crate) merge_mode: MergeMode,
828 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
830 pub(crate) distribution: Option<TimeSeriesDistribution>,
832 explain_flat_format: bool,
834 pub(crate) snapshot_sequence: Option<SequenceNumber>,
836 pub(crate) compaction: bool,
838 #[cfg(feature = "enterprise")]
839 extension_ranges: Vec<BoxedExtensionRange>,
840}
841
842impl ScanInput {
843 #[must_use]
845 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
846 ScanInput {
847 access_layer,
848 read_cols: mapper.read_columns().clone(),
849 mapper: Arc::new(mapper),
850 time_range: None,
851 predicate: PredicateGroup::default(),
852 region_partition_expr: None,
853 memtables: Vec::new(),
854 files: Vec::new(),
855 cache_strategy: CacheStrategy::Disabled,
856 ignore_file_not_found: false,
857 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
858 inverted_index_appliers: [None, None],
859 bloom_filter_index_appliers: [None, None],
860 fulltext_index_appliers: [None, None],
861 #[cfg(feature = "vector_index")]
862 vector_index_applier: None,
863 #[cfg(feature = "vector_index")]
864 vector_index_k: None,
865 query_start: None,
866 append_mode: false,
867 filter_deleted: true,
868 merge_mode: MergeMode::default(),
869 series_row_selector: None,
870 distribution: None,
871 explain_flat_format: false,
872 snapshot_sequence: None,
873 compaction: false,
874 #[cfg(feature = "enterprise")]
875 extension_ranges: Vec::new(),
876 }
877 }
878
879 #[must_use]
881 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
882 self.time_range = time_range;
883 self
884 }
885
886 #[must_use]
888 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
889 self.region_partition_expr = predicate.region_partition_expr().cloned();
890 self.predicate = predicate;
891 self
892 }
893
894 #[must_use]
896 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
897 self.memtables = memtables;
898 self
899 }
900
901 #[must_use]
903 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
904 self.files = files;
905 self
906 }
907
908 #[must_use]
910 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
911 self.cache_strategy = cache;
912 self
913 }
914
915 #[must_use]
917 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
918 self.ignore_file_not_found = ignore;
919 self
920 }
921
922 #[must_use]
924 pub(crate) fn with_max_concurrent_scan_files(
925 mut self,
926 max_concurrent_scan_files: usize,
927 ) -> Self {
928 self.max_concurrent_scan_files = max_concurrent_scan_files;
929 self
930 }
931
932 #[must_use]
934 pub(crate) fn with_inverted_index_appliers(
935 mut self,
936 appliers: [Option<InvertedIndexApplierRef>; 2],
937 ) -> Self {
938 self.inverted_index_appliers = appliers;
939 self
940 }
941
942 #[must_use]
944 pub(crate) fn with_bloom_filter_index_appliers(
945 mut self,
946 appliers: [Option<BloomFilterIndexApplierRef>; 2],
947 ) -> Self {
948 self.bloom_filter_index_appliers = appliers;
949 self
950 }
951
952 #[must_use]
954 pub(crate) fn with_fulltext_index_appliers(
955 mut self,
956 appliers: [Option<FulltextIndexApplierRef>; 2],
957 ) -> Self {
958 self.fulltext_index_appliers = appliers;
959 self
960 }
961
962 #[cfg(feature = "vector_index")]
964 #[must_use]
965 pub(crate) fn with_vector_index_applier(
966 mut self,
967 applier: Option<VectorIndexApplierRef>,
968 ) -> Self {
969 self.vector_index_applier = applier;
970 self
971 }
972
973 #[cfg(feature = "vector_index")]
975 #[must_use]
976 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
977 self.vector_index_k = k;
978 self
979 }
980
981 #[must_use]
983 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
984 self.query_start = now;
985 self
986 }
987
988 #[must_use]
989 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
990 self.append_mode = is_append_mode;
991 self
992 }
993
994 #[must_use]
996 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
997 self.filter_deleted = filter_deleted;
998 self
999 }
1000
1001 #[must_use]
1003 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1004 self.merge_mode = merge_mode;
1005 self
1006 }
1007
1008 #[must_use]
1010 pub(crate) fn with_distribution(
1011 mut self,
1012 distribution: Option<TimeSeriesDistribution>,
1013 ) -> Self {
1014 self.distribution = distribution;
1015 self
1016 }
1017
1018 #[must_use]
1020 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1021 self.explain_flat_format = explain_flat_format;
1022 self
1023 }
1024
1025 #[must_use]
1027 pub(crate) fn with_series_row_selector(
1028 mut self,
1029 series_row_selector: Option<TimeSeriesRowSelector>,
1030 ) -> Self {
1031 self.series_row_selector = series_row_selector;
1032 self
1033 }
1034
1035 #[must_use]
1036 pub(crate) fn with_snapshot_sequence(
1037 mut self,
1038 snapshot_sequence: Option<SequenceNumber>,
1039 ) -> Self {
1040 self.snapshot_sequence = snapshot_sequence;
1041 self
1042 }
1043
1044 #[must_use]
1046 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1047 self.compaction = compaction;
1048 self
1049 }
1050
1051 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1053 let memtable = &self.memtables[index.index];
1054 let mut ranges = SmallVec::new();
1055 memtable.build_ranges(index.row_group_index, &mut ranges);
1056 ranges
1057 }
1058
1059 fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1060 if self.should_skip_region_partition(file) {
1061 self.predicate.predicate_without_region().cloned()
1062 } else {
1063 self.predicate.predicate().cloned()
1064 }
1065 }
1066
1067 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1068 match (
1069 self.region_partition_expr.as_ref(),
1070 file.meta_ref().partition_expr.as_ref(),
1071 ) {
1072 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1073 _ => false,
1074 }
1075 }
1076
1077 fn try_file_level_pruning_stats(&self, file: &FileHandle) -> Option<FileLevelPruningStats> {
1082 let (ts_min, ts_max) = file.time_range();
1083 let time_index = self.mapper.metadata().time_index_column();
1084 let time_index_unit = time_index.column_schema.data_type.as_timestamp()?.unit();
1085
1086 let min_ts = ts_min.convert_to(time_index_unit)?;
1089 let max_ts = ts_max.convert_to_ceil(time_index_unit)?;
1090
1091 Some(FileLevelPruningStats {
1092 min_scalar: timestamp_to_scalar_value(time_index_unit, Some(min_ts.value())),
1093 max_scalar: timestamp_to_scalar_value(time_index_unit, Some(max_ts.value())),
1094 time_index_col_name: time_index.column_schema.name.clone(),
1095 })
1096 }
1097
1098 #[tracing::instrument(
1100 skip_all,
1101 fields(
1102 region_id = %self.region_metadata().region_id,
1103 file_id = %file.file_id()
1104 )
1105 )]
1106 pub async fn prune_file(
1107 &self,
1108 file: &FileHandle,
1109 pre_filter_mode: PreFilterMode,
1110 reader_metrics: &mut ReaderMetrics,
1111 ) -> Result<FileRangeBuilder> {
1112 let predicate = self.predicate_for_file(file);
1113
1114 if let Some(ref pred) = predicate
1118 && !pred.is_empty()
1119 && let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
1120 {
1121 let pruning_results = pred.prune_with_stats(
1122 &file_level_stats,
1123 self.mapper.metadata().schema.arrow_schema(),
1124 );
1125 if pruning_results.first() == Some(&false) {
1126 reader_metrics.filter_metrics.files_time_range_pruned += 1;
1127 return Ok(FileRangeBuilder::default());
1128 }
1129 }
1130
1131 let may_build_selective_row_selection = predicate.is_some();
1132 let decode_pk_values = !self.compaction
1133 && self
1134 .mapper
1135 .read_columns()
1136 .column_ids_iter()
1137 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1138 let reader = self
1139 .access_layer
1140 .read_sst(file.clone())
1141 .predicate(predicate)
1142 .projection(Some(self.read_cols.clone()))
1143 .cache(self.cache_strategy.clone())
1144 .inverted_index_appliers(self.inverted_index_appliers.clone())
1145 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1146 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1147 let reader = if !self.compaction && may_build_selective_row_selection {
1148 reader.deferred_optional_page_index()
1149 } else {
1150 reader
1151 };
1152 #[cfg(feature = "vector_index")]
1153 let reader = {
1154 let mut reader = reader;
1155 reader =
1156 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1157 reader
1158 };
1159 let res = reader
1160 .expected_metadata(Some(self.mapper.metadata().clone()))
1161 .compaction(self.compaction)
1162 .pre_filter_mode(pre_filter_mode)
1163 .decode_primary_key_values(decode_pk_values)
1164 .build_reader_input(reader_metrics)
1165 .await;
1166 let read_input = match res {
1167 Ok(x) => x,
1168 Err(e) => {
1169 if e.is_object_not_found() && self.ignore_file_not_found {
1170 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1171 return Ok(FileRangeBuilder::default());
1172 } else {
1173 return Err(e);
1174 }
1175 }
1176 };
1177
1178 let Some((mut file_range_ctx, selection)) = read_input else {
1179 return Ok(FileRangeBuilder::default());
1180 };
1181
1182 let need_compat = !compat::has_same_columns_and_pk_encoding(
1183 &self.mapper,
1184 file_range_ctx.read_format(),
1185 self.compaction,
1186 );
1187 if need_compat {
1188 let compat = FlatCompatBatch::try_new(
1191 &self.mapper,
1192 file_range_ctx.read_format(),
1193 self.compaction,
1194 )?;
1195 file_range_ctx.set_compat_batch(compat);
1196 }
1197 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1198 }
1199
1200 #[tracing::instrument(
1204 skip(self, sources, semaphore),
1205 fields(
1206 region_id = %self.region_metadata().region_id,
1207 source_count = sources.len()
1208 )
1209 )]
1210 pub(crate) fn create_parallel_flat_sources(
1211 &self,
1212 sources: Vec<BoxedRecordBatchStream>,
1213 semaphore: Arc<Semaphore>,
1214 channel_size: usize,
1215 ) -> Result<Vec<BoxedRecordBatchStream>> {
1216 if sources.len() <= 1 {
1217 return Ok(sources);
1218 }
1219
1220 let sources = sources
1222 .into_iter()
1223 .map(|source| {
1224 let (sender, receiver) = mpsc::channel(channel_size);
1225 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1226 let stream = Box::pin(ReceiverStream::new(receiver));
1227 Box::pin(stream) as _
1228 })
1229 .collect();
1230 Ok(sources)
1231 }
1232
1233 #[tracing::instrument(
1235 skip(self, input, semaphore, sender),
1236 fields(region_id = %self.region_metadata().region_id)
1237 )]
1238 pub(crate) fn spawn_flat_scan_task(
1239 &self,
1240 mut input: BoxedRecordBatchStream,
1241 semaphore: Arc<Semaphore>,
1242 sender: mpsc::Sender<Result<RecordBatch>>,
1243 ) {
1244 let region_id = self.region_metadata().region_id;
1245 let span = tracing::info_span!(
1246 "ScanInput::parallel_scan_task",
1247 region_id = %region_id,
1248 stream_kind = "flat"
1249 );
1250 common_runtime::spawn_query(
1251 async move {
1252 loop {
1253 let maybe_batch = {
1256 let _permit = semaphore.acquire().await.unwrap();
1258 input.next().await
1259 };
1260 match maybe_batch {
1261 Some(Ok(batch)) => {
1262 let _ = sender.send(Ok(batch)).await;
1263 }
1264 Some(Err(e)) => {
1265 let _ = sender.send(Err(e)).await;
1266 break;
1267 }
1268 None => break,
1269 }
1270 }
1271 }
1272 .instrument(span),
1273 );
1274 }
1275
1276 pub(crate) fn total_rows(&self) -> usize {
1277 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1278 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1279
1280 let rows = rows_in_files + rows_in_memtables;
1281 #[cfg(feature = "enterprise")]
1282 let rows = rows
1283 + self
1284 .extension_ranges
1285 .iter()
1286 .map(|x| x.num_rows())
1287 .sum::<u64>() as usize;
1288 rows
1289 }
1290
1291 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1292 &self.predicate
1293 }
1294
1295 pub(crate) fn num_memtables(&self) -> usize {
1297 self.memtables.len()
1298 }
1299
1300 pub(crate) fn num_files(&self) -> usize {
1302 self.files.len()
1303 }
1304
1305 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1307 let file_index = index.index - self.num_memtables();
1308 &self.files[file_index]
1309 }
1310
1311 pub fn region_metadata(&self) -> &RegionMetadataRef {
1312 self.mapper.metadata()
1313 }
1314
1315 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1316 if source_count <= 1 {
1317 return PreFilterMode::All;
1322 }
1323
1324 pre_filter_mode(self.append_mode, self.merge_mode)
1325 }
1326}
1327
1328#[cfg(feature = "enterprise")]
1329impl ScanInput {
1330 #[must_use]
1331 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1332 Self {
1333 extension_ranges,
1334 ..self
1335 }
1336 }
1337
1338 #[cfg(feature = "enterprise")]
1339 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1340 &self.extension_ranges
1341 }
1342
1343 #[cfg(feature = "enterprise")]
1345 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1346 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1347 }
1348}
1349
1350pub(crate) struct FileLevelPruningStats {
1354 pub(crate) min_scalar: ScalarValue,
1356 pub(crate) max_scalar: ScalarValue,
1358 pub(crate) time_index_col_name: String,
1360}
1361
1362impl PruningStatistics for FileLevelPruningStats {
1363 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1364 if column.name == self.time_index_col_name {
1365 ScalarValue::iter_to_array(std::iter::once(self.min_scalar.clone())).ok()
1366 } else {
1367 None
1368 }
1369 }
1370
1371 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1372 if column.name == self.time_index_col_name {
1373 ScalarValue::iter_to_array(std::iter::once(self.max_scalar.clone())).ok()
1374 } else {
1375 None
1376 }
1377 }
1378
1379 fn num_containers(&self) -> usize {
1380 1
1381 }
1382
1383 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
1384 if column.name == self.time_index_col_name {
1385 Some(Arc::new(UInt64Array::from(vec![0u64])))
1387 } else {
1388 None
1389 }
1390 }
1391
1392 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1393 None
1394 }
1395
1396 fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
1397 None
1398 }
1399}
1400
1401#[cfg(test)]
1402impl ScanInput {
1403 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1405 self.files.iter().map(|file| file.file_id()).collect()
1406 }
1407
1408 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1409 self.files.iter().map(|file| file.index_id()).collect()
1410 }
1411}
1412
1413fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1414 if append_mode {
1415 return PreFilterMode::All;
1416 }
1417
1418 match merge_mode {
1419 MergeMode::LastRow => PreFilterMode::SkipFields,
1420 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1421 }
1422}
1423
1424fn narrow_read_columns_by_json_type_hint(
1425 read_columns: &mut ReadColumns,
1426 json_type_hint: &HashMap<String, JsonNativeType>,
1427 metadata: &RegionMetadata,
1428) {
1429 if json_type_hint.is_empty() {
1430 return;
1431 }
1432
1433 for read_column in &mut read_columns.cols {
1434 let Some(column) = metadata.column_by_id(read_column.column_id) else {
1435 continue;
1436 };
1437 let column_name = &column.column_schema.name;
1438 let Some(json_type) = json_type_hint.get(column_name) else {
1439 continue;
1440 };
1441
1442 let mut paths = Vec::new();
1443 let mut current = vec![column_name.clone()];
1444 collect_json_nested_paths(json_type, &mut current, &mut paths);
1445 merge_nested_paths(&mut read_column.nested_paths, paths)
1446 }
1447}
1448
1449fn collect_json_nested_paths(
1450 json_type: &JsonNativeType,
1451 current: &mut NestedPath,
1452 paths: &mut Vec<NestedPath>,
1453) {
1454 match json_type {
1455 JsonNativeType::Object(fields) if !fields.is_empty() => {
1456 for (field, child) in fields {
1457 current.push(field.clone());
1458 collect_json_nested_paths(child, current, paths);
1459 current.pop();
1460 }
1461 }
1462 _ => paths.push(current.clone()),
1463 }
1464}
1465
1466pub(crate) struct ScanFingerprintBundle {
1470 pub(crate) fingerprint: ScanRequestFingerprint,
1471 pub(crate) implied_time_range: Option<TimestampRange>,
1476}
1477
1478pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1481 let eligible = !input.compaction
1482 && !input.files.is_empty()
1483 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1484
1485 if !eligible {
1486 return None;
1487 }
1488
1489 let metadata = input.region_metadata();
1490 let tag_names: HashSet<&str> = metadata
1491 .column_metadatas
1492 .iter()
1493 .filter(|col| col.semantic_type == SemanticType::Tag)
1494 .map(|col| col.column_schema.name.as_str())
1495 .collect();
1496
1497 let time_index = metadata.time_index_column();
1498 let time_index_name = time_index.column_schema.name.clone();
1499 let ts_col_unit = time_index
1500 .column_schema
1501 .data_type
1502 .as_timestamp()
1503 .expect("Time index must have timestamp-compatible type")
1504 .unit();
1505
1506 let exprs = input
1507 .predicate_group()
1508 .predicate_without_region()
1509 .map(|predicate| predicate.exprs())
1510 .unwrap_or_default();
1511
1512 let mut filters = Vec::new();
1513 let mut time_only_exprs: Vec<&Expr> = Vec::new();
1514 let mut has_tag_filter = false;
1515 let mut columns = HashSet::new();
1516
1517 for expr in exprs {
1518 columns.clear();
1519 let is_time_only = match expr_to_columns(expr, &mut columns) {
1520 Ok(()) if !columns.is_empty() => {
1521 has_tag_filter |= columns
1522 .iter()
1523 .any(|col| tag_names.contains(col.name.as_str()));
1524 columns.iter().all(|col| col.name == time_index_name)
1525 }
1526 _ => false,
1527 };
1528
1529 if is_time_only
1537 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1538 {
1539 time_only_exprs.push(expr);
1540 } else {
1541 filters.push(expr.to_string());
1542 }
1543 }
1544
1545 if !has_tag_filter {
1546 return None;
1548 }
1549
1550 let implied_time_range =
1551 implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1552 let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1553
1554 filters.sort_unstable();
1556 time_filters.sort_unstable();
1557 let read_columns = input.read_cols.clone();
1558 let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1559 read_column_types: read_columns
1560 .column_ids_iter()
1561 .map(|id| {
1562 metadata
1563 .column_by_id(id)
1564 .map(|col| col.column_schema.data_type.clone())
1565 })
1566 .collect(),
1567 read_columns,
1568 filters,
1569 time_filters,
1570 series_row_selector: input.series_row_selector,
1571 append_mode: input.append_mode,
1572 filter_deleted: input.filter_deleted,
1573 merge_mode: input.merge_mode,
1574 partition_expr_version: metadata.partition_expr_version,
1575 }
1576 .build();
1577
1578 Some(ScanFingerprintBundle {
1579 fingerprint,
1580 implied_time_range,
1581 })
1582}
1583
1584pub struct StreamContext {
1587 pub input: ScanInput,
1589 pub(crate) ranges: Vec<RangeMeta>,
1591 #[allow(dead_code)]
1594 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1595 pub(crate) scan_implied_time_range: Option<TimestampRange>,
1602
1603 pub(crate) query_start: Instant,
1606}
1607
1608impl StreamContext {
1609 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1611 let query_start = input.query_start.unwrap_or_else(Instant::now);
1612 let ranges = RangeMeta::seq_scan_ranges(&input);
1613 READ_SST_COUNT.observe(input.num_files() as f64);
1614 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1615 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1616 None => (None, None),
1617 };
1618
1619 Self {
1620 input,
1621 ranges,
1622 scan_fingerprint,
1623 scan_implied_time_range,
1624 query_start,
1625 }
1626 }
1627
1628 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1630 let query_start = input.query_start.unwrap_or_else(Instant::now);
1631 let ranges = RangeMeta::unordered_scan_ranges(&input);
1632 READ_SST_COUNT.observe(input.num_files() as f64);
1633 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1634 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1635 None => (None, None),
1636 };
1637
1638 Self {
1639 input,
1640 ranges,
1641 scan_fingerprint,
1642 scan_implied_time_range,
1643 query_start,
1644 }
1645 }
1646
1647 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1649 self.input.num_memtables() > index.index
1650 }
1651
1652 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1653 !self.is_mem_range_index(index)
1654 && index.index < self.input.num_files() + self.input.num_memtables()
1655 }
1656
1657 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1658 let range_meta = &self.ranges[part_range.identifier];
1659 let source_count = range_meta.indices.len();
1660
1661 self.input.range_pre_filter_mode(source_count)
1662 }
1663
1664 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1666 self.ranges
1667 .iter()
1668 .enumerate()
1669 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1670 .collect()
1671 }
1672
1673 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1675 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1676 for range_meta in &self.ranges {
1677 for idx in &range_meta.row_group_indices {
1678 if self.is_mem_range_index(*idx) {
1679 num_mem_ranges += 1;
1680 } else if self.is_file_range_index(*idx) {
1681 num_file_ranges += 1;
1682 } else {
1683 num_other_ranges += 1;
1684 }
1685 }
1686 }
1687 if verbose {
1688 write!(f, "{{")?;
1689 }
1690 write!(
1691 f,
1692 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1693 self.ranges.len(),
1694 num_mem_ranges,
1695 self.input.num_files(),
1696 num_file_ranges,
1697 )?;
1698 if num_other_ranges > 0 {
1699 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1700 }
1701 write!(f, "}}")?;
1702
1703 if let Some(selector) = &self.input.series_row_selector {
1704 write!(f, ", \"selector\":\"{}\"", selector)?;
1705 }
1706 if let Some(distribution) = &self.input.distribution {
1707 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1708 }
1709
1710 if verbose {
1711 self.format_verbose_content(f)?;
1712 }
1713
1714 Ok(())
1715 }
1716
1717 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1718 struct FileWrapper<'a> {
1719 file: &'a FileHandle,
1720 }
1721
1722 impl fmt::Debug for FileWrapper<'_> {
1723 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1724 let (start, end) = self.file.time_range();
1725 write!(
1726 f,
1727 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1728 self.file.file_id(),
1729 start.value(),
1730 start.unit(),
1731 end.value(),
1732 end.unit(),
1733 self.file.num_rows(),
1734 self.file.size(),
1735 self.file.index_size()
1736 )
1737 }
1738 }
1739
1740 struct InputWrapper<'a> {
1741 input: &'a ScanInput,
1742 }
1743
1744 #[cfg(feature = "enterprise")]
1745 impl InputWrapper<'_> {
1746 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1747 if self.input.extension_ranges.is_empty() {
1748 return Ok(());
1749 }
1750
1751 let mut delimiter = "";
1752 write!(f, ", extension_ranges: [")?;
1753 for range in self.input.extension_ranges() {
1754 write!(f, "{}{:?}", delimiter, range)?;
1755 delimiter = ", ";
1756 }
1757 write!(f, "]")?;
1758 Ok(())
1759 }
1760 }
1761
1762 impl fmt::Debug for InputWrapper<'_> {
1763 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1764 let output_schema = self.input.mapper.output_schema();
1765 if !output_schema.is_empty() {
1766 let names: Vec<_> = output_schema
1767 .column_schemas()
1768 .iter()
1769 .map(|col| &col.name)
1770 .collect();
1771 write!(f, ", \"projection\": {:?}", names)?;
1772 }
1773 if let Some(predicate) = &self.input.predicate.predicate() {
1774 if !predicate.exprs().is_empty() {
1775 let exprs: Vec<_> =
1776 predicate.exprs().iter().map(|e| e.to_string()).collect();
1777 write!(f, ", \"filters\": {:?}", exprs)?;
1778 }
1779 if !predicate.dyn_filters().is_empty() {
1780 let dyn_filters: Vec<_> = predicate
1781 .dyn_filters()
1782 .iter()
1783 .map(|f| format!("{}", f))
1784 .collect();
1785 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1786 }
1787 }
1788 #[cfg(feature = "vector_index")]
1789 if let Some(vector_index_k) = self.input.vector_index_k {
1790 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1791 }
1792 if !self.input.files.is_empty() {
1793 write!(f, ", \"files\": ")?;
1794 f.debug_list()
1795 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1796 .finish()?;
1797 }
1798 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1799 #[cfg(feature = "enterprise")]
1800 self.format_extension_ranges(f)?;
1801
1802 Ok(())
1803 }
1804 }
1805
1806 write!(f, "{:?}", InputWrapper { input: &self.input })
1807 }
1808
1809 pub(crate) fn add_dyn_filter_to_predicate(
1812 self: &Arc<Self>,
1813 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1814 ) -> Vec<bool> {
1815 let mut supported = Vec::with_capacity(filter_exprs.len());
1816 let filter_expr = filter_exprs
1817 .into_iter()
1818 .filter_map(|expr| {
1819 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1820 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1821 {
1822 supported.push(true);
1823 Some(dyn_filter)
1824 } else {
1825 supported.push(false);
1826 None
1827 }
1828 })
1829 .collect();
1830 self.input.predicate.add_dyn_filters(filter_expr);
1831 supported
1832 }
1833}
1834
1835#[derive(Clone, Default)]
1838pub struct PredicateGroup {
1839 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1840 predicate_all: Predicate,
1842 predicate_without_region: Predicate,
1844 region_partition_expr: Option<PartitionExpr>,
1846}
1847
1848impl PredicateGroup {
1849 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1851 let mut combined_exprs = exprs.to_vec();
1852 let mut region_partition_expr = None;
1853
1854 if let Some(expr_json) = metadata.partition_expr.as_ref()
1855 && !expr_json.is_empty()
1856 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1857 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1858 {
1859 let logical_expr = expr
1860 .try_as_logical_expr()
1861 .context(InvalidPartitionExprSnafu {
1862 expr: expr_json.clone(),
1863 })?;
1864
1865 combined_exprs.push(logical_expr);
1866 region_partition_expr = Some(expr);
1867 }
1868
1869 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1870 let mut columns = HashSet::new();
1872 for expr in &combined_exprs {
1873 columns.clear();
1874 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1875 continue;
1876 };
1877 time_filters.push(filter);
1878 }
1879 let time_filters = if time_filters.is_empty() {
1880 None
1881 } else {
1882 Some(Arc::new(time_filters))
1883 };
1884
1885 let predicate_all = Predicate::new(combined_exprs);
1886 let predicate_without_region = Predicate::new(exprs.to_vec());
1887
1888 Ok(Self {
1889 time_filters,
1890 predicate_all,
1891 predicate_without_region,
1892 region_partition_expr,
1893 })
1894 }
1895
1896 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1898 self.time_filters.clone()
1899 }
1900
1901 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1903 if self.predicate_all.is_empty() {
1904 None
1905 } else {
1906 Some(&self.predicate_all)
1907 }
1908 }
1909
1910 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1912 if self.predicate_without_region.is_empty() {
1913 None
1914 } else {
1915 Some(&self.predicate_without_region)
1916 }
1917 }
1918
1919 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1921 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1922 self.predicate_without_region.add_dyn_filters(dyn_filters);
1923 }
1924
1925 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1927 self.region_partition_expr.as_ref()
1928 }
1929
1930 fn expr_to_filter(
1931 expr: &Expr,
1932 metadata: &RegionMetadata,
1933 columns: &mut HashSet<Column>,
1934 ) -> Option<SimpleFilterEvaluator> {
1935 columns.clear();
1936 expr_to_columns(expr, columns).ok()?;
1939 if columns.len() > 1 {
1940 return None;
1942 }
1943 let column = columns.iter().next()?;
1944 let column_meta = metadata.column_by_name(&column.name)?;
1945 if column_meta.semantic_type == SemanticType::Timestamp {
1946 SimpleFilterEvaluator::try_new(expr)
1947 } else {
1948 None
1949 }
1950 }
1951}
1952
1953#[cfg(test)]
1954mod tests {
1955 use std::sync::Arc;
1956
1957 use common_time::timestamp::{TimeUnit, Timestamp};
1958 use datafusion::physical_plan::expressions::lit as physical_lit;
1959 use datafusion_common::ScalarValue;
1960 use datafusion_expr::{col, lit};
1961 use datatypes::arrow::datatypes::{
1962 DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
1963 };
1964 use datatypes::prelude::ConcreteDataType;
1965 use datatypes::schema::ColumnSchema;
1966 use datatypes::types::json_type::JsonObjectType;
1967 use datatypes::value::Value;
1968 use partition::expr::col as partition_col;
1969 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
1970 use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
1971
1972 use super::*;
1973 use crate::cache::CacheManager;
1974 use crate::error::InvalidMetadataSnafu;
1975 use crate::read::range_cache::ScanRequestFingerprintBuilder;
1976 use crate::read::read_columns::ReadColumn;
1977 use crate::sst::file::FileMeta;
1978 use crate::test_util::memtable_util::metadata_with_primary_key;
1979 use crate::test_util::scheduler_util::SchedulerEnv;
1980
1981 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
1982 let env = SchedulerEnv::new().await;
1983 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
1984 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
1985 let file = FileHandle::new(
1986 crate::sst::file::FileMeta::default(),
1987 Arc::new(crate::sst::file_purger::NoopFilePurger),
1988 );
1989
1990 ScanInput::new(env.access_layer.clone(), mapper)
1991 .with_predicate(predicate)
1992 .with_cache(CacheStrategy::EnableAll(Arc::new(
1993 CacheManager::builder()
1994 .range_result_cache_size(1024)
1995 .build(),
1996 )))
1997 .with_files(vec![file])
1998 }
1999
2000 fn ts_lit(val: i64) -> datafusion_expr::Expr {
2002 lit(ScalarValue::TimestampMillisecond(Some(val), None))
2003 }
2004
2005 fn metadata_with_time_index_unit(unit: TimeUnit) -> RegionMetadataRef {
2006 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2007 builder
2008 .push_column_metadata(ColumnMetadata {
2009 column_schema: ColumnSchema::new(
2010 "k0".to_string(),
2011 ConcreteDataType::string_datatype(),
2012 false,
2013 ),
2014 semantic_type: SemanticType::Tag,
2015 column_id: 0,
2016 })
2017 .push_column_metadata(ColumnMetadata {
2018 column_schema: ColumnSchema::new(
2019 "k1".to_string(),
2020 ConcreteDataType::uint32_datatype(),
2021 false,
2022 ),
2023 semantic_type: SemanticType::Tag,
2024 column_id: 1,
2025 })
2026 .push_column_metadata(ColumnMetadata {
2027 column_schema: ColumnSchema::new(
2028 "ts".to_string(),
2029 ConcreteDataType::timestamp_datatype(unit),
2030 false,
2031 ),
2032 semantic_type: SemanticType::Timestamp,
2033 column_id: 2,
2034 })
2035 .push_column_metadata(ColumnMetadata {
2036 column_schema: ColumnSchema::new(
2037 "v0".to_string(),
2038 ConcreteDataType::int64_datatype(),
2039 true,
2040 ),
2041 semantic_type: SemanticType::Field,
2042 column_id: 3,
2043 })
2044 .primary_key(vec![0, 1]);
2045
2046 Arc::new(builder.build().unwrap())
2047 }
2048
2049 fn file_handle_with_time_range(start: Timestamp, end: Timestamp) -> FileHandle {
2050 FileHandle::new(
2051 FileMeta {
2052 time_range: (start, end),
2053 ..Default::default()
2054 },
2055 Arc::new(crate::sst::file_purger::NoopFilePurger),
2056 )
2057 }
2058
2059 #[test]
2060 fn test_fill_json_nested_paths_from_hint() -> Result<()> {
2061 fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
2062 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
2063 builder
2064 .push_column_metadata(ColumnMetadata {
2065 column_schema: ColumnSchema::new(
2066 "tag".to_string(),
2067 ConcreteDataType::string_datatype(),
2068 true,
2069 ),
2070 semantic_type: SemanticType::Tag,
2071 column_id: 0,
2072 })
2073 .push_column_metadata(ColumnMetadata {
2074 column_schema: ColumnSchema::new(
2075 "j".to_string(),
2076 ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
2077 true,
2078 ),
2079 semantic_type: SemanticType::Field,
2080 column_id: 1,
2081 })
2082 .push_column_metadata(ColumnMetadata {
2083 column_schema: ColumnSchema::new(
2084 "ts".to_string(),
2085 ConcreteDataType::timestamp_millisecond_datatype(),
2086 false,
2087 ),
2088 semantic_type: SemanticType::Timestamp,
2089 column_id: 2,
2090 });
2091 builder.primary_key(vec![0]);
2092 builder.build().context(InvalidMetadataSnafu).map(Arc::new)
2093 }
2094
2095 let metadata = json_projection_test_metadata()?;
2096 let hint = HashMap::from([(
2097 "j".to_string(),
2098 JsonNativeType::Object(JsonObjectType::from([
2099 ("a".to_string(), JsonNativeType::i64()),
2100 (
2101 "b".to_string(),
2102 JsonNativeType::Object(JsonObjectType::from([(
2103 "c".to_string(),
2104 JsonNativeType::String,
2105 )])),
2106 ),
2107 ])),
2108 )]);
2109
2110 fn nested_path(parts: &[&str]) -> NestedPath {
2111 parts.iter().map(|part| part.to_string()).collect()
2112 }
2113
2114 let mut read_columns = ReadColumns {
2115 cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
2116 };
2117 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2118 assert_eq!(
2119 read_columns,
2120 ReadColumns {
2121 cols: vec![
2122 ReadColumn::new(
2123 1,
2124 vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
2125 ),
2126 ReadColumn::new(0, vec![])
2127 ]
2128 }
2129 );
2130
2131 let mut read_columns = ReadColumns {
2132 cols: vec![ReadColumn::new(0, vec![])],
2133 };
2134 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2135 assert_eq!(
2136 read_columns,
2137 ReadColumns {
2138 cols: vec![ReadColumn::new(0, vec![])]
2139 }
2140 );
2141 Ok(())
2142 }
2143
2144 #[tokio::test]
2145 async fn test_build_scan_fingerprint_for_eligible_scan() {
2146 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2147 let input = new_scan_input(
2148 metadata.clone(),
2149 vec![
2150 col("ts").gt_eq(ts_lit(1000)),
2151 col("k0").eq(lit("foo")),
2152 col("v0").gt(lit(1)),
2153 ],
2154 )
2155 .await
2156 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2157 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2158 .with_merge_mode(MergeMode::LastNonNull)
2159 .with_filter_deleted(false);
2160
2161 let fingerprint = build_scan_fingerprint(&input).unwrap();
2162
2163 let expected = ScanRequestFingerprintBuilder {
2164 read_columns: input.read_cols,
2165 read_column_types: vec![
2166 metadata
2167 .column_by_id(0)
2168 .map(|col| col.column_schema.data_type.clone()),
2169 metadata
2170 .column_by_id(2)
2171 .map(|col| col.column_schema.data_type.clone()),
2172 metadata
2173 .column_by_id(3)
2174 .map(|col| col.column_schema.data_type.clone()),
2175 ],
2176 filters: vec![
2177 col("k0").eq(lit("foo")).to_string(),
2178 col("v0").gt(lit(1)).to_string(),
2179 ],
2180 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2181 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2182 append_mode: false,
2183 filter_deleted: false,
2184 merge_mode: MergeMode::LastNonNull,
2185 partition_expr_version: 0,
2186 }
2187 .build();
2188 assert_eq!(expected, fingerprint.fingerprint);
2189 }
2190
2191 #[tokio::test]
2192 async fn test_build_scan_fingerprint_requires_tag_filter() {
2193 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2194 let input = new_scan_input(
2195 metadata,
2196 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2197 )
2198 .await;
2199
2200 assert!(build_scan_fingerprint(&input).is_none());
2201 }
2202
2203 #[tokio::test]
2204 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2205 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2206 let filters = vec![col("k0").eq(lit("foo"))];
2207
2208 let disabled = ScanInput::new(
2209 SchedulerEnv::new().await.access_layer.clone(),
2210 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2211 )
2212 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2213 assert!(build_scan_fingerprint(&disabled).is_none());
2214
2215 let compaction = new_scan_input(metadata.clone(), filters.clone())
2216 .await
2217 .with_compaction(true);
2218 assert!(build_scan_fingerprint(&compaction).is_none());
2219
2220 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2222 assert!(build_scan_fingerprint(&no_files).is_none());
2223 }
2224
2225 #[tokio::test]
2226 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2227 let base = metadata_with_primary_key(vec![0, 1], false);
2228 let mut builder = RegionMetadataBuilder::from_existing(base);
2229 let partition_expr = partition_col("k0")
2230 .gt_eq(Value::String("foo".into()))
2231 .as_json_str()
2232 .unwrap();
2233 builder.partition_expr_json(Some(partition_expr));
2234 let metadata = Arc::new(builder.build_without_validation().unwrap());
2235
2236 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2237 let fingerprint = build_scan_fingerprint(&input).unwrap();
2238
2239 let expected = ScanRequestFingerprintBuilder {
2240 read_columns: input.read_cols,
2241 read_column_types: vec![
2242 metadata
2243 .column_by_id(0)
2244 .map(|col| col.column_schema.data_type.clone()),
2245 metadata
2246 .column_by_id(2)
2247 .map(|col| col.column_schema.data_type.clone()),
2248 metadata
2249 .column_by_id(3)
2250 .map(|col| col.column_schema.data_type.clone()),
2251 ],
2252 filters: vec![col("k0").eq(lit("foo")).to_string()],
2253 time_filters: vec![],
2254 series_row_selector: None,
2255 append_mode: false,
2256 filter_deleted: true,
2257 merge_mode: MergeMode::LastRow,
2258 partition_expr_version: metadata.partition_expr_version,
2259 }
2260 .build();
2261 assert_eq!(expected, fingerprint.fingerprint);
2262 assert_ne!(0, metadata.partition_expr_version);
2263 }
2264
2265 #[test]
2266 fn test_update_dyn_filters_with_empty_base_predicates() {
2267 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2268 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2269 assert!(predicate_group.predicate().is_none());
2270 assert!(predicate_group.predicate_without_region().is_none());
2271
2272 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2273 predicate_group.add_dyn_filters(vec![dyn_filter]);
2274
2275 let predicate_all = predicate_group.predicate().unwrap();
2276 assert!(predicate_all.exprs().is_empty());
2277 assert_eq!(1, predicate_all.dyn_filters().len());
2278
2279 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2280 assert!(predicate_without_region.exprs().is_empty());
2281 assert_eq!(1, predicate_without_region.dyn_filters().len());
2282 }
2283
2284 #[test]
2285 fn test_file_level_pruning_stats_prunes_old_file() {
2286 let ts_col_name = "ts";
2287 let predicate = Predicate::new(vec![col(ts_col_name).gt(ts_lit(1000))]);
2288 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2289 ts_col_name,
2290 ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2291 false,
2292 )]));
2293
2294 let stats = FileLevelPruningStats {
2296 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2297 max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2298 time_index_col_name: ts_col_name.to_string(),
2299 };
2300 assert_eq!(
2301 vec![false],
2302 predicate.prune_with_stats(&stats, &arrow_schema)
2303 );
2304
2305 let stats = FileLevelPruningStats {
2307 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2308 max_scalar: ScalarValue::TimestampMillisecond(Some(2000), None),
2309 time_index_col_name: ts_col_name.to_string(),
2310 };
2311 assert_eq!(
2312 vec![true],
2313 predicate.prune_with_stats(&stats, &arrow_schema)
2314 );
2315 }
2316
2317 #[test]
2318 fn test_file_level_pruning_stats_no_predicate_keeps_all() {
2319 let predicate = Predicate::new(vec![]);
2320 assert!(predicate.is_empty());
2321
2322 let stats = FileLevelPruningStats {
2323 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2324 max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2325 time_index_col_name: "ts".to_string(),
2326 };
2327 let arrow_schema = Arc::new(ArrowSchema::new(Vec::<Field>::new()));
2328 assert_eq!(
2329 vec![true],
2330 predicate.prune_with_stats(&stats, &arrow_schema)
2331 );
2332 }
2333
2334 #[tokio::test]
2335 async fn test_file_level_pruning_stats_ceil_max_unit_conversion() {
2336 let metadata = metadata_with_time_index_unit(TimeUnit::Millisecond);
2337 let input = new_scan_input(metadata, vec![]).await;
2338 let file = file_handle_with_time_range(
2339 Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2340 Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2341 );
2342
2343 let stats = input.try_file_level_pruning_stats(&file).unwrap();
2344 assert_eq!(
2345 ScalarValue::TimestampMillisecond(Some(1), None),
2346 stats.min_scalar
2347 );
2348 assert_eq!(
2349 ScalarValue::TimestampMillisecond(Some(2), None),
2350 stats.max_scalar
2351 );
2352
2353 let predicate = Predicate::new(vec![col("ts").gt(ts_lit(1))]);
2355 assert_eq!(
2356 vec![true],
2357 predicate.prune_with_stats(&stats, input.mapper.metadata().schema.arrow_schema())
2358 );
2359 }
2360
2361 #[tokio::test]
2362 async fn test_file_level_pruning_stats_overflow_keeps_file() {
2363 let metadata = metadata_with_time_index_unit(TimeUnit::Nanosecond);
2364 let input = new_scan_input(metadata, vec![]).await;
2365 let file = file_handle_with_time_range(
2366 Timestamp::new(0, TimeUnit::Second),
2367 Timestamp::new(i64::MAX, TimeUnit::Second),
2368 );
2369
2370 assert!(input.try_file_level_pruning_stats(&file).is_none());
2371 }
2372
2373 #[test]
2374 fn test_file_level_pruning_stats_keeps_inclusive_boundary() {
2375 let ts_col_name = "ts";
2376 let predicate = Predicate::new(vec![col(ts_col_name).gt_eq(ts_lit(1000))]);
2377 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2378 ts_col_name,
2379 ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2380 false,
2381 )]));
2382 let stats = FileLevelPruningStats {
2383 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2384 max_scalar: ScalarValue::TimestampMillisecond(Some(1000), None),
2385 time_index_col_name: ts_col_name.to_string(),
2386 };
2387
2388 assert_eq!(
2389 vec![true],
2390 predicate.prune_with_stats(&stats, &arrow_schema)
2391 );
2392 }
2393
2394 #[tokio::test]
2395 async fn test_file_level_pruning_with_dyn_filter_only_predicate() {
2396 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2397 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2398 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2399 predicate_group.add_dyn_filters(vec![Arc::new(DynamicFilterPhysicalExpr::new(
2400 vec![],
2401 physical_lit(false),
2402 ))]);
2403 let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2404 .with_predicate(predicate_group);
2405 let file = file_handle_with_time_range(
2406 Timestamp::new_millisecond(0),
2407 Timestamp::new_millisecond(1000),
2408 );
2409 let mut reader_metrics = ReaderMetrics::default();
2410
2411 let builder = input
2412 .prune_file(&file, PreFilterMode::SkipFields, &mut reader_metrics)
2413 .await
2414 .unwrap();
2415
2416 assert_eq!(1, reader_metrics.filter_metrics.files_time_range_pruned);
2417 let mut ranges = SmallVec::new();
2418 builder.build_ranges(-1, &mut ranges);
2419 assert!(ranges.is_empty());
2420 }
2421
2422 #[tokio::test]
2423 async fn test_range_pre_filter_mode() {
2424 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2425 let cases = [
2426 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2427 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2428 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2429 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2430 ];
2431
2432 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2433 let input = new_scan_input(metadata.clone(), vec![])
2434 .await
2435 .with_append_mode(append_mode)
2436 .with_merge_mode(merge_mode);
2437
2438 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2439 }
2440 }
2441}