1use std::collections::{HashMap, HashSet};
18use std::fmt;
19use std::num::NonZeroU64;
20use std::sync::Arc;
21use std::time::Instant;
22
23use api::v1::SemanticType;
24use common_error::ext::BoxedError;
25use common_recordbatch::SendableRecordBatchStream;
26use common_recordbatch::adapter::RegionQueryStatCounters;
27use common_recordbatch::filter::SimpleFilterEvaluator;
28use common_telemetry::tracing::Instrument;
29use common_telemetry::{debug, error, tracing, warn};
30use common_time::range::TimestampRange;
31use datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr;
32use datafusion_common::pruning::PruningStatistics;
33use datafusion_common::{Column, ScalarValue};
34use datafusion_expr::Expr;
35use datafusion_expr::utils::expr_to_columns;
36use datatypes::arrow::array::{ArrayRef, BooleanArray, UInt64Array};
37use datatypes::extension::json::is_structured_json_field;
38use datatypes::types::json_type::JsonNativeType;
39use datatypes::value::timestamp_to_scalar_value;
40use futures::StreamExt;
41use itertools::Itertools;
42use partition::expr::PartitionExpr;
43use smallvec::SmallVec;
44use snafu::ResultExt;
45use store_api::metadata::{RegionMetadata, RegionMetadataRef};
46use store_api::region_engine::{PartitionRange, RegionScannerRef};
47use store_api::storage::{
48 NestedPath, RegionId, ScanRequest, SequenceNumber, SequenceRange, TimeSeriesDistribution,
49 TimeSeriesRowSelector,
50};
51use table::predicate::{Predicate, build_time_range_predicate, extract_time_range_from_expr};
52use tokio::sync::{Semaphore, mpsc};
53use tokio_stream::wrappers::ReceiverStream;
54
55use crate::access_layer::AccessLayerRef;
56use crate::cache::CacheStrategy;
57use crate::config::DEFAULT_MAX_CONCURRENT_SCAN_FILES;
58use crate::error::{InvalidPartitionExprSnafu, Result};
59#[cfg(feature = "enterprise")]
60use crate::extension::{BoxedExtensionRange, BoxedExtensionRangeProvider};
61use crate::memtable::{MemtableRange, RangesOptions};
62use crate::metrics::READ_SST_COUNT;
63use crate::read::compat::{self, FlatCompatBatch};
64use crate::read::flat_projection::FlatProjectionMapper;
65use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
66use crate::read::range_cache::{ScanRequestFingerprint, implied_time_range_from_exprs};
67use crate::read::read_columns::{
68 ReadColumns, merge, merge_nested_paths, read_columns_from_predicate,
69 read_columns_from_projection,
70};
71use crate::read::seq_scan::SeqScan;
72use crate::read::series_scan::SeriesScan;
73use crate::read::stream::ScanBatchStream;
74use crate::read::unordered_scan::UnorderedScan;
75use crate::read::{BoxedRecordBatchStream, RecordBatch};
76use crate::region::options::MergeMode;
77use crate::region::version::VersionRef;
78use crate::sst::file::FileHandle;
79use crate::sst::index::bloom_filter::applier::{
80 BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
81};
82use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
83use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
84use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
85use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
86#[cfg(feature = "vector_index")]
87use crate::sst::index::vector_index::applier::{VectorIndexApplier, VectorIndexApplierRef};
88use crate::sst::parquet::file_range::PreFilterMode;
89use crate::sst::parquet::reader::ReaderMetrics;
90
91#[cfg(feature = "vector_index")]
92const VECTOR_INDEX_OVERFETCH_MULTIPLIER: usize = 2;
93
94pub(crate) enum Scanner {
96 Seq(SeqScan),
98 Unordered(UnorderedScan),
100 Series(SeriesScan),
102}
103
104impl Scanner {
105 #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
107 pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
108 match self {
109 Scanner::Seq(seq_scan) => seq_scan.build_stream(),
110 Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
111 Scanner::Series(series_scan) => series_scan.build_stream().await,
112 }
113 }
114
115 pub(crate) fn scan_batch(&self) -> Result<ScanBatchStream> {
117 match self {
118 Scanner::Seq(x) => x.scan_all_partitions(),
119 Scanner::Unordered(x) => x.scan_all_partitions(),
120 Scanner::Series(x) => x.scan_all_partitions(),
121 }
122 }
123}
124
125#[cfg(test)]
126impl Scanner {
127 pub(crate) fn num_files(&self) -> usize {
129 match self {
130 Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
131 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
132 Scanner::Series(series_scan) => series_scan.input().num_files(),
133 }
134 }
135
136 pub(crate) fn num_memtables(&self) -> usize {
138 match self {
139 Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
140 Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
141 Scanner::Series(series_scan) => series_scan.input().num_memtables(),
142 }
143 }
144
145 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
147 match self {
148 Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
149 Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
150 Scanner::Series(series_scan) => series_scan.input().file_ids(),
151 }
152 }
153
154 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
155 match self {
156 Scanner::Seq(seq_scan) => seq_scan.input().index_ids(),
157 Scanner::Unordered(unordered_scan) => unordered_scan.input().index_ids(),
158 Scanner::Series(series_scan) => series_scan.input().index_ids(),
159 }
160 }
161
162 pub(crate) fn snapshot_sequence(&self) -> Option<SequenceNumber> {
163 match self {
164 Scanner::Seq(seq_scan) => seq_scan.input().snapshot_sequence,
165 Scanner::Unordered(unordered_scan) => unordered_scan.input().snapshot_sequence,
166 Scanner::Series(series_scan) => series_scan.input().snapshot_sequence,
167 }
168 }
169
170 pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
172 use store_api::region_engine::{PrepareRequest, RegionScanner};
173
174 let request = PrepareRequest::default().with_target_partitions(target_partitions);
175 match self {
176 Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
177 Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
178 Scanner::Series(series_scan) => series_scan.prepare(request).unwrap(),
179 }
180 }
181}
182
183#[cfg_attr(doc, aquamarine::aquamarine)]
184pub(crate) struct ScanRegion {
234 version: VersionRef,
236 access_layer: AccessLayerRef,
238 request: ScanRequest,
240 cache_strategy: CacheStrategy,
242 max_concurrent_scan_files: usize,
244 ignore_inverted_index: bool,
246 ignore_fulltext_index: bool,
248 ignore_bloom_filter: bool,
250 start_time: Option<Instant>,
252 filter_deleted: bool,
255 query_stat_counters: Option<RegionQueryStatCounters>,
257 #[cfg(feature = "enterprise")]
258 extension_range_provider: Option<BoxedExtensionRangeProvider>,
259}
260
261impl ScanRegion {
262 pub(crate) fn new(
264 version: VersionRef,
265 access_layer: AccessLayerRef,
266 request: ScanRequest,
267 cache_strategy: CacheStrategy,
268 ) -> ScanRegion {
269 ScanRegion {
270 version,
271 access_layer,
272 request,
273 cache_strategy,
274 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
275 ignore_inverted_index: false,
276 ignore_fulltext_index: false,
277 ignore_bloom_filter: false,
278 start_time: None,
279 filter_deleted: true,
280 query_stat_counters: None,
281 #[cfg(feature = "enterprise")]
282 extension_range_provider: None,
283 }
284 }
285
286 #[must_use]
288 pub(crate) fn with_query_stat_counters(mut self, counters: RegionQueryStatCounters) -> Self {
289 self.query_stat_counters = Some(counters);
290 self
291 }
292
293 #[must_use]
295 pub(crate) fn with_max_concurrent_scan_files(
296 mut self,
297 max_concurrent_scan_files: usize,
298 ) -> Self {
299 self.max_concurrent_scan_files = max_concurrent_scan_files;
300 self
301 }
302
303 #[must_use]
305 pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
306 self.ignore_inverted_index = ignore;
307 self
308 }
309
310 #[must_use]
312 pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
313 self.ignore_fulltext_index = ignore;
314 self
315 }
316
317 #[must_use]
319 pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
320 self.ignore_bloom_filter = ignore;
321 self
322 }
323
324 #[must_use]
325 pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
326 self.start_time = Some(now);
327 self
328 }
329
330 pub(crate) fn set_filter_deleted(&mut self, filter_deleted: bool) {
331 self.filter_deleted = filter_deleted;
332 }
333
334 #[cfg(feature = "enterprise")]
335 pub(crate) fn set_extension_range_provider(
336 &mut self,
337 extension_range_provider: BoxedExtensionRangeProvider,
338 ) {
339 self.extension_range_provider = Some(extension_range_provider);
340 }
341
342 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
344 pub(crate) async fn scanner(self) -> Result<Scanner> {
345 if self.use_series_scan() {
346 self.series_scan().await.map(Scanner::Series)
347 } else if self.use_unordered_scan() {
348 self.unordered_scan().await.map(Scanner::Unordered)
351 } else {
352 self.seq_scan().await.map(Scanner::Seq)
353 }
354 }
355
356 #[tracing::instrument(
358 level = tracing::Level::DEBUG,
359 skip_all,
360 fields(region_id = %self.region_id())
361 )]
362 pub(crate) async fn region_scanner(self) -> Result<RegionScannerRef> {
363 if self.use_series_scan() {
364 self.series_scan()
365 .await
366 .map(|scanner| Box::new(scanner) as _)
367 } else if self.use_unordered_scan() {
368 self.unordered_scan()
369 .await
370 .map(|scanner| Box::new(scanner) as _)
371 } else {
372 self.seq_scan().await.map(|scanner| Box::new(scanner) as _)
373 }
374 }
375
376 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
378 pub(crate) async fn seq_scan(self) -> Result<SeqScan> {
379 let input = self.scan_input().await?.with_compaction(false);
380 Ok(SeqScan::new(input))
381 }
382
383 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
385 pub(crate) async fn unordered_scan(self) -> Result<UnorderedScan> {
386 let input = self.scan_input().await?;
387 Ok(UnorderedScan::new(input))
388 }
389
390 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
392 pub(crate) async fn series_scan(self) -> Result<SeriesScan> {
393 let input = self.scan_input().await?;
394 Ok(SeriesScan::new(input))
395 }
396
397 fn use_unordered_scan(&self) -> bool {
399 self.version.options.append_mode
406 && self.request.series_row_selector.is_none()
407 && (self.request.distribution.is_none()
408 || self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
409 }
410
411 fn use_series_scan(&self) -> bool {
413 self.request.distribution == Some(TimeSeriesDistribution::PerSeries)
414 }
415
416 #[tracing::instrument(skip_all, fields(region_id = %self.region_id()))]
418 async fn scan_input(self) -> Result<ScanInput> {
419 let sst_min_sequence = self.request.sst_min_sequence.and_then(NonZeroU64::new);
420 let time_range = self.build_time_range_predicate();
421 let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters)?;
422
423 let mut read_cols = match &self.request.projection_input {
424 Some(p) => {
425 let metadata = &self.version.metadata;
428 let from_projection = read_columns_from_projection(p.clone(), metadata)?;
429 let from_predicate = read_columns_from_predicate(&predicate, metadata);
430 merge(from_projection, from_predicate)
431 }
432 None => {
433 let read_col_ids = self
434 .version
435 .metadata
436 .column_metadatas
437 .iter()
438 .map(|col| col.column_id);
439 ReadColumns::from_deduped_column_ids(read_col_ids)
440 }
441 };
442 let has_structured_json = self
446 .version
447 .metadata
448 .schema
449 .arrow_schema()
450 .fields()
451 .iter()
452 .any(is_structured_json_field);
453 if has_structured_json {
454 narrow_read_columns_by_json_type_hint(
455 &mut read_cols,
456 &self.request.json_type_hint,
457 &self.version.metadata,
458 );
459 }
460 let read_col_ids = read_cols.column_ids();
461
462 let projection = self
464 .request
465 .projection_indices()
466 .map(|x| x.to_vec())
467 .unwrap_or_else(|| (0..self.version.metadata.column_metadatas.len()).collect());
468 let json_type_hint = has_structured_json
469 .then_some(&self.request.json_type_hint)
470 .inspect(|json_type_hint| {
471 debug!(
472 "Concretized JSON type: {{{}}}",
473 json_type_hint
474 .iter()
475 .map(|(k, v)| format!("{}: {}", k, v))
476 .join(", ")
477 );
478 });
479 let mapper = FlatProjectionMapper::new_with_read_columns(
480 &self.version.metadata,
481 projection,
482 read_cols,
483 json_type_hint,
484 )?;
485
486 let ssts = &self.version.ssts;
487 let mut files = Vec::new();
488 if !self.request.skip_sst_files {
489 for level in ssts.levels() {
490 for file in level.files.values() {
491 let exceed_min_sequence = match (sst_min_sequence, file.meta_ref().sequence) {
492 (Some(min_sequence), Some(file_sequence)) => file_sequence > min_sequence,
493 (Some(_), None) => true,
499 (None, _) => true,
500 };
501
502 if exceed_min_sequence && file_in_range(file, &time_range) {
504 files.push(file.clone());
505 }
506 }
510 }
511 }
512
513 let memtables = self.version.memtables.list_memtables();
514 let mut mem_range_builders = Vec::new();
516 let filter_mode = pre_filter_mode(
517 self.version.options.append_mode,
518 self.version.options.merge_mode(),
519 );
520
521 for m in memtables {
522 let Some((start, end)) = m.stats().time_range() else {
524 continue;
525 };
526 let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
528 if !memtable_range.intersects(&time_range) {
529 continue;
530 }
531 let ranges_in_memtable = m.ranges(
532 Some(&read_col_ids),
533 RangesOptions::default()
534 .with_predicate(predicate.clone())
535 .with_sequence(SequenceRange::new(
536 self.request.memtable_min_sequence,
537 self.request.memtable_max_sequence,
538 ))
539 .with_pre_filter_mode(filter_mode),
540 )?;
541 mem_range_builders.extend(ranges_in_memtable.ranges.into_values().map(|v| {
542 let stats = v.stats().clone();
543 MemRangeBuilder::new(v, stats)
544 }));
545 }
546
547 let region_id = self.region_id();
548 debug!(
549 "Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
550 region_id,
551 self.request,
552 time_range,
553 mem_range_builders.len(),
554 files.len(),
555 self.version.options.append_mode,
556 );
557
558 let (non_field_filters, field_filters) = self.partition_by_field_filters();
559 let inverted_index_appliers = [
560 self.build_invereted_index_applier(&non_field_filters),
561 self.build_invereted_index_applier(&field_filters),
562 ];
563 let bloom_filter_appliers = [
564 self.build_bloom_filter_applier(&non_field_filters),
565 self.build_bloom_filter_applier(&field_filters),
566 ];
567 let fulltext_index_appliers = [
568 self.build_fulltext_index_applier(&non_field_filters),
569 self.build_fulltext_index_applier(&field_filters),
570 ];
571 #[cfg(feature = "vector_index")]
572 let vector_index_applier = self.build_vector_index_applier();
573 #[cfg(feature = "vector_index")]
574 let vector_index_k = self.request.vector_search.as_ref().map(|search| {
575 if self.request.filters.is_empty() {
576 search.k
577 } else {
578 search.k.saturating_mul(VECTOR_INDEX_OVERFETCH_MULTIPLIER)
579 }
580 });
581
582 let input = ScanInput::new(self.access_layer, mapper)
583 .with_time_range(Some(time_range))
584 .with_predicate(predicate)
585 .with_memtables(mem_range_builders)
586 .with_files(files)
587 .with_cache(self.cache_strategy)
588 .with_inverted_index_appliers(inverted_index_appliers)
589 .with_bloom_filter_index_appliers(bloom_filter_appliers)
590 .with_fulltext_index_appliers(fulltext_index_appliers)
591 .with_max_concurrent_scan_files(self.max_concurrent_scan_files)
592 .with_start_time(self.start_time)
593 .with_append_mode(self.version.options.append_mode)
594 .with_filter_deleted(self.filter_deleted)
595 .with_merge_mode(self.version.options.merge_mode())
596 .with_series_row_selector(self.request.series_row_selector)
597 .with_distribution(self.request.distribution)
598 .with_explain_flat_format(
599 self.version.options.sst_format == Some(crate::sst::FormatType::Flat),
600 )
601 .with_snapshot_sequence(
602 self.request
603 .snapshot_on_scan
604 .then_some(self.request.memtable_max_sequence)
605 .flatten(),
606 )
607 .with_query_stat_counters(self.query_stat_counters);
608 #[cfg(feature = "vector_index")]
609 let input = input
610 .with_vector_index_applier(vector_index_applier)
611 .with_vector_index_k(vector_index_k);
612
613 #[cfg(feature = "enterprise")]
614 let input = if !self.request.skip_sst_files
615 && let Some(provider) = self.extension_range_provider
616 {
617 let ranges = provider
618 .find_extension_ranges(self.version.flushed_sequence, time_range, &self.request)
619 .await?;
620 debug!("Find extension ranges: {ranges:?}");
621 input.with_extension_ranges(ranges)
622 } else {
623 input
624 };
625 Ok(input)
626 }
627
628 fn region_id(&self) -> RegionId {
629 self.version.metadata.region_id
630 }
631
632 fn build_time_range_predicate(&self) -> TimestampRange {
634 let time_index = self.version.metadata.time_index_column();
635 let unit = time_index
636 .column_schema
637 .data_type
638 .as_timestamp()
639 .expect("Time index must have timestamp-compatible type")
640 .unit();
641 build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
642 }
643
644 fn partition_by_field_filters(&self) -> (Vec<Expr>, Vec<Expr>) {
647 let field_columns = self
648 .version
649 .metadata
650 .field_columns()
651 .map(|col| &col.column_schema.name)
652 .collect::<HashSet<_>>();
653
654 let mut columns = HashSet::new();
655
656 self.request.filters.iter().cloned().partition(|expr| {
657 columns.clear();
658 if expr_to_columns(expr, &mut columns).is_err() {
660 return true;
662 }
663 !columns
665 .iter()
666 .any(|column| field_columns.contains(&column.name))
667 })
668 }
669
670 fn build_invereted_index_applier(&self, filters: &[Expr]) -> Option<InvertedIndexApplierRef> {
672 if self.ignore_inverted_index {
673 return None;
674 }
675
676 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
677 let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
678
679 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
680
681 InvertedIndexApplierBuilder::new(
682 self.access_layer.table_dir().to_string(),
683 self.access_layer.path_type(),
684 self.access_layer.object_store().clone(),
685 self.version.metadata.as_ref(),
686 self.version.metadata.inverted_indexed_column_ids(
687 self.version
688 .options
689 .index_options
690 .inverted_index
691 .ignore_column_ids
692 .iter(),
693 ),
694 self.access_layer.puffin_manager_factory().clone(),
695 )
696 .with_file_cache(file_cache)
697 .with_inverted_index_cache(inverted_index_cache)
698 .with_puffin_metadata_cache(puffin_metadata_cache)
699 .build(filters)
700 .inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
701 .ok()
702 .flatten()
703 .map(Arc::new)
704 }
705
706 fn build_bloom_filter_applier(&self, filters: &[Expr]) -> Option<BloomFilterIndexApplierRef> {
708 if self.ignore_bloom_filter {
709 return None;
710 }
711
712 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
713 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
714 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
715
716 BloomFilterIndexApplierBuilder::new(
717 self.access_layer.table_dir().to_string(),
718 self.access_layer.path_type(),
719 self.access_layer.object_store().clone(),
720 self.version.metadata.as_ref(),
721 self.access_layer.puffin_manager_factory().clone(),
722 )
723 .with_file_cache(file_cache)
724 .with_bloom_filter_index_cache(bloom_filter_index_cache)
725 .with_puffin_metadata_cache(puffin_metadata_cache)
726 .build(filters)
727 .inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
728 .ok()
729 .flatten()
730 .map(Arc::new)
731 }
732
733 fn build_fulltext_index_applier(&self, filters: &[Expr]) -> Option<FulltextIndexApplierRef> {
735 if self.ignore_fulltext_index {
736 return None;
737 }
738
739 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
740 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
741 let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
742 FulltextIndexApplierBuilder::new(
743 self.access_layer.table_dir().to_string(),
744 self.access_layer.path_type(),
745 self.access_layer.object_store().clone(),
746 self.access_layer.puffin_manager_factory().clone(),
747 self.version.metadata.as_ref(),
748 )
749 .with_file_cache(file_cache)
750 .with_puffin_metadata_cache(puffin_metadata_cache)
751 .with_bloom_filter_cache(bloom_filter_index_cache)
752 .build(filters)
753 .inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
754 .ok()
755 .flatten()
756 .map(Arc::new)
757 }
758
759 #[cfg(feature = "vector_index")]
761 fn build_vector_index_applier(&self) -> Option<VectorIndexApplierRef> {
762 let vector_search = self.request.vector_search.as_ref()?;
763
764 let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
765 let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
766 let vector_index_cache = self.cache_strategy.vector_index_cache().cloned();
767
768 let applier = VectorIndexApplier::new(
769 self.access_layer.table_dir().to_string(),
770 self.access_layer.path_type(),
771 self.access_layer.object_store().clone(),
772 self.access_layer.puffin_manager_factory().clone(),
773 vector_search.column_id,
774 vector_search.query_vector.clone(),
775 vector_search.metric,
776 )
777 .with_file_cache(file_cache)
778 .with_puffin_metadata_cache(puffin_metadata_cache)
779 .with_vector_index_cache(vector_index_cache);
780
781 Some(Arc::new(applier))
782 }
783}
784
785fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
787 if predicate == &TimestampRange::min_to_max() {
788 return true;
789 }
790 let (start, end) = file.time_range();
792 let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
793 file_ts_range.intersects(predicate)
794}
795
796pub struct ScanInput {
798 access_layer: AccessLayerRef,
800 pub(crate) mapper: Arc<FlatProjectionMapper>,
802 pub(crate) read_cols: ReadColumns,
806 pub(crate) time_range: Option<TimestampRange>,
808 pub(crate) predicate: PredicateGroup,
810 region_partition_expr: Option<PartitionExpr>,
812 pub(crate) memtables: Vec<MemRangeBuilder>,
814 pub(crate) files: Vec<FileHandle>,
816 pub(crate) cache_strategy: CacheStrategy,
818 ignore_file_not_found: bool,
820 pub(crate) max_concurrent_scan_files: usize,
822 inverted_index_appliers: [Option<InvertedIndexApplierRef>; 2],
824 bloom_filter_index_appliers: [Option<BloomFilterIndexApplierRef>; 2],
825 fulltext_index_appliers: [Option<FulltextIndexApplierRef>; 2],
826 #[cfg(feature = "vector_index")]
828 pub(crate) vector_index_applier: Option<VectorIndexApplierRef>,
829 #[cfg(feature = "vector_index")]
831 pub(crate) vector_index_k: Option<usize>,
832 pub(crate) query_start: Option<Instant>,
834 pub(crate) append_mode: bool,
836 pub(crate) filter_deleted: bool,
838 pub(crate) merge_mode: MergeMode,
840 pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
842 pub(crate) distribution: Option<TimeSeriesDistribution>,
844 explain_flat_format: bool,
846 pub(crate) snapshot_sequence: Option<SequenceNumber>,
848 pub(crate) compaction: bool,
850 pub(crate) query_stat_counters: Option<RegionQueryStatCounters>,
852 #[cfg(feature = "enterprise")]
853 extension_ranges: Vec<BoxedExtensionRange>,
854}
855
856impl ScanInput {
857 #[must_use]
859 pub(crate) fn new(access_layer: AccessLayerRef, mapper: FlatProjectionMapper) -> ScanInput {
860 ScanInput {
861 access_layer,
862 read_cols: mapper.read_columns().clone(),
863 mapper: Arc::new(mapper),
864 time_range: None,
865 predicate: PredicateGroup::default(),
866 region_partition_expr: None,
867 memtables: Vec::new(),
868 files: Vec::new(),
869 cache_strategy: CacheStrategy::Disabled,
870 ignore_file_not_found: false,
871 max_concurrent_scan_files: DEFAULT_MAX_CONCURRENT_SCAN_FILES,
872 inverted_index_appliers: [None, None],
873 bloom_filter_index_appliers: [None, None],
874 fulltext_index_appliers: [None, None],
875 #[cfg(feature = "vector_index")]
876 vector_index_applier: None,
877 #[cfg(feature = "vector_index")]
878 vector_index_k: None,
879 query_start: None,
880 append_mode: false,
881 filter_deleted: true,
882 merge_mode: MergeMode::default(),
883 series_row_selector: None,
884 distribution: None,
885 explain_flat_format: false,
886 snapshot_sequence: None,
887 compaction: false,
888 query_stat_counters: None,
889 #[cfg(feature = "enterprise")]
890 extension_ranges: Vec::new(),
891 }
892 }
893
894 #[must_use]
896 pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
897 self.time_range = time_range;
898 self
899 }
900
901 #[must_use]
903 pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
904 self.region_partition_expr = predicate.region_partition_expr().cloned();
905 self.predicate = predicate;
906 self
907 }
908
909 #[must_use]
911 pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
912 self.memtables = memtables;
913 self
914 }
915
916 #[must_use]
918 pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
919 self.files = files;
920 self
921 }
922
923 #[must_use]
925 pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
926 self.cache_strategy = cache;
927 self
928 }
929
930 #[must_use]
932 pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
933 self.ignore_file_not_found = ignore;
934 self
935 }
936
937 #[must_use]
939 pub(crate) fn with_max_concurrent_scan_files(
940 mut self,
941 max_concurrent_scan_files: usize,
942 ) -> Self {
943 self.max_concurrent_scan_files = max_concurrent_scan_files;
944 self
945 }
946
947 #[must_use]
949 pub(crate) fn with_inverted_index_appliers(
950 mut self,
951 appliers: [Option<InvertedIndexApplierRef>; 2],
952 ) -> Self {
953 self.inverted_index_appliers = appliers;
954 self
955 }
956
957 #[must_use]
959 pub(crate) fn with_bloom_filter_index_appliers(
960 mut self,
961 appliers: [Option<BloomFilterIndexApplierRef>; 2],
962 ) -> Self {
963 self.bloom_filter_index_appliers = appliers;
964 self
965 }
966
967 #[must_use]
969 pub(crate) fn with_fulltext_index_appliers(
970 mut self,
971 appliers: [Option<FulltextIndexApplierRef>; 2],
972 ) -> Self {
973 self.fulltext_index_appliers = appliers;
974 self
975 }
976
977 #[cfg(feature = "vector_index")]
979 #[must_use]
980 pub(crate) fn with_vector_index_applier(
981 mut self,
982 applier: Option<VectorIndexApplierRef>,
983 ) -> Self {
984 self.vector_index_applier = applier;
985 self
986 }
987
988 #[cfg(feature = "vector_index")]
990 #[must_use]
991 pub(crate) fn with_vector_index_k(mut self, k: Option<usize>) -> Self {
992 self.vector_index_k = k;
993 self
994 }
995
996 #[must_use]
998 pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
999 self.query_start = now;
1000 self
1001 }
1002
1003 #[must_use]
1004 pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
1005 self.append_mode = is_append_mode;
1006 self
1007 }
1008
1009 pub(crate) fn with_query_stat_counters(
1010 mut self,
1011 counters: Option<RegionQueryStatCounters>,
1012 ) -> Self {
1013 self.query_stat_counters = counters;
1014 self
1015 }
1016
1017 #[must_use]
1019 pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
1020 self.filter_deleted = filter_deleted;
1021 self
1022 }
1023
1024 #[must_use]
1026 pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
1027 self.merge_mode = merge_mode;
1028 self
1029 }
1030
1031 #[must_use]
1033 pub(crate) fn with_distribution(
1034 mut self,
1035 distribution: Option<TimeSeriesDistribution>,
1036 ) -> Self {
1037 self.distribution = distribution;
1038 self
1039 }
1040
1041 #[must_use]
1043 pub(crate) fn with_explain_flat_format(mut self, explain_flat_format: bool) -> Self {
1044 self.explain_flat_format = explain_flat_format;
1045 self
1046 }
1047
1048 #[must_use]
1050 pub(crate) fn with_series_row_selector(
1051 mut self,
1052 series_row_selector: Option<TimeSeriesRowSelector>,
1053 ) -> Self {
1054 self.series_row_selector = series_row_selector;
1055 self
1056 }
1057
1058 #[must_use]
1059 pub(crate) fn with_snapshot_sequence(
1060 mut self,
1061 snapshot_sequence: Option<SequenceNumber>,
1062 ) -> Self {
1063 self.snapshot_sequence = snapshot_sequence;
1064 self
1065 }
1066
1067 #[must_use]
1069 pub(crate) fn with_compaction(mut self, compaction: bool) -> Self {
1070 self.compaction = compaction;
1071 self
1072 }
1073
1074 pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
1076 let memtable = &self.memtables[index.index];
1077 let mut ranges = SmallVec::new();
1078 memtable.build_ranges(index.row_group_index, &mut ranges);
1079 ranges
1080 }
1081
1082 pub(crate) fn predicate_for_file(&self, file: &FileHandle) -> Option<Predicate> {
1083 if self.should_skip_region_partition(file) {
1084 self.predicate.predicate_without_region().cloned()
1085 } else {
1086 self.predicate.predicate().cloned()
1087 }
1088 }
1089
1090 fn should_skip_region_partition(&self, file: &FileHandle) -> bool {
1091 match (
1092 self.region_partition_expr.as_ref(),
1093 file.meta_ref().partition_expr.as_ref(),
1094 ) {
1095 (Some(region_expr), Some(file_expr)) => region_expr == file_expr,
1096 _ => false,
1097 }
1098 }
1099
1100 fn try_file_level_pruning_stats(&self, file: &FileHandle) -> Option<FileLevelPruningStats> {
1105 let (ts_min, ts_max) = file.time_range();
1106 let time_index = self.mapper.metadata().time_index_column();
1107 let time_index_unit = time_index.column_schema.data_type.as_timestamp()?.unit();
1108
1109 let min_ts = ts_min.convert_to(time_index_unit)?;
1112 let max_ts = ts_max.convert_to_ceil(time_index_unit)?;
1113
1114 Some(FileLevelPruningStats {
1115 min_scalar: timestamp_to_scalar_value(time_index_unit, Some(min_ts.value())),
1116 max_scalar: timestamp_to_scalar_value(time_index_unit, Some(max_ts.value())),
1117 time_index_col_name: time_index.column_schema.name.clone(),
1118 })
1119 }
1120
1121 #[inline]
1126 pub(crate) fn can_manifest_prune_file(&self, file: &FileHandle) -> bool {
1127 let predicate = self.predicate_for_file(file);
1128 self.manifest_prunes_file(file, predicate.as_ref())
1129 }
1130
1131 fn manifest_prunes_file(&self, file: &FileHandle, predicate: Option<&Predicate>) -> bool {
1132 if let Some(pred) = predicate
1133 && !pred.is_empty()
1134 && let Some(file_level_stats) = self.try_file_level_pruning_stats(file)
1135 {
1136 let pruning_results = pred.prune_with_stats(
1137 &file_level_stats,
1138 self.mapper.metadata().schema.arrow_schema(),
1139 );
1140 pruning_results.first() == Some(&false)
1141 } else {
1142 false
1143 }
1144 }
1145
1146 #[tracing::instrument(
1151 skip_all,
1152 fields(
1153 region_id = %self.region_metadata().region_id,
1154 file_id = %file.file_id()
1155 )
1156 )]
1157 pub async fn prune_file(
1158 &self,
1159 file: &FileHandle,
1160 pre_filter_mode: PreFilterMode,
1161 reader_metrics: &mut ReaderMetrics,
1162 ) -> Result<FileRangeBuilder> {
1163 let predicate = self.predicate_for_file(file);
1164
1165 if self.manifest_prunes_file(file, predicate.as_ref()) {
1167 reader_metrics.filter_metrics.files_time_range_pruned += 1;
1168 return Ok(FileRangeBuilder::default());
1169 }
1170
1171 self.prune_file_after_manifest_check(file, pre_filter_mode, predicate, reader_metrics)
1172 .await
1173 }
1174
1175 pub(crate) async fn prune_file_after_manifest_check(
1183 &self,
1184 file: &FileHandle,
1185 pre_filter_mode: PreFilterMode,
1186 predicate: Option<Predicate>,
1187 reader_metrics: &mut ReaderMetrics,
1188 ) -> Result<FileRangeBuilder> {
1189 let may_build_selective_row_selection = predicate.is_some();
1190 let decode_pk_values = !self.compaction
1191 && self
1192 .mapper
1193 .read_columns()
1194 .column_ids_iter()
1195 .any(|column_id| self.mapper.metadata().primary_key.contains(&column_id));
1196 let reader = self
1197 .access_layer
1198 .read_sst(file.clone())
1199 .predicate(predicate)
1200 .projection(Some(self.read_cols.clone()))
1201 .cache(self.cache_strategy.clone())
1202 .inverted_index_appliers(self.inverted_index_appliers.clone())
1203 .bloom_filter_index_appliers(self.bloom_filter_index_appliers.clone())
1204 .fulltext_index_appliers(self.fulltext_index_appliers.clone());
1205 let reader = if !self.compaction && may_build_selective_row_selection {
1206 reader.deferred_optional_page_index()
1207 } else {
1208 reader
1209 };
1210 #[cfg(feature = "vector_index")]
1211 let reader = {
1212 let mut reader = reader;
1213 reader =
1214 reader.vector_index_applier(self.vector_index_applier.clone(), self.vector_index_k);
1215 reader
1216 };
1217 let res = reader
1218 .expected_metadata(Some(self.mapper.metadata().clone()))
1219 .compaction(self.compaction)
1220 .pre_filter_mode(pre_filter_mode)
1221 .decode_primary_key_values(decode_pk_values)
1222 .build_reader_input(reader_metrics)
1223 .await;
1224 let read_input = match res {
1225 Ok(x) => x,
1226 Err(e) => {
1227 if e.is_object_not_found() && self.ignore_file_not_found {
1228 error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
1229 return Ok(FileRangeBuilder::default());
1230 } else {
1231 return Err(e);
1232 }
1233 }
1234 };
1235
1236 let Some((mut file_range_ctx, selection)) = read_input else {
1237 return Ok(FileRangeBuilder::default());
1238 };
1239
1240 let need_compat = !compat::has_same_columns_and_pk_encoding(
1241 &self.mapper,
1242 file_range_ctx.read_format(),
1243 self.compaction,
1244 );
1245 if need_compat {
1246 let compat = FlatCompatBatch::try_new(
1249 &self.mapper,
1250 file_range_ctx.read_format(),
1251 self.compaction,
1252 )?;
1253 file_range_ctx.set_compat_batch(compat);
1254 }
1255 Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), selection))
1256 }
1257
1258 #[tracing::instrument(
1262 skip(self, sources, semaphore),
1263 fields(
1264 region_id = %self.region_metadata().region_id,
1265 source_count = sources.len()
1266 )
1267 )]
1268 pub(crate) fn create_parallel_flat_sources(
1269 &self,
1270 sources: Vec<BoxedRecordBatchStream>,
1271 semaphore: Arc<Semaphore>,
1272 channel_size: usize,
1273 ) -> Result<Vec<BoxedRecordBatchStream>> {
1274 if sources.len() <= 1 {
1275 return Ok(sources);
1276 }
1277
1278 let sources = sources
1280 .into_iter()
1281 .map(|source| {
1282 let (sender, receiver) = mpsc::channel(channel_size);
1283 self.spawn_flat_scan_task(source, semaphore.clone(), sender);
1284 let stream = Box::pin(ReceiverStream::new(receiver));
1285 Box::pin(stream) as _
1286 })
1287 .collect();
1288 Ok(sources)
1289 }
1290
1291 #[tracing::instrument(
1293 skip(self, input, semaphore, sender),
1294 fields(region_id = %self.region_metadata().region_id)
1295 )]
1296 pub(crate) fn spawn_flat_scan_task(
1297 &self,
1298 mut input: BoxedRecordBatchStream,
1299 semaphore: Arc<Semaphore>,
1300 sender: mpsc::Sender<Result<RecordBatch>>,
1301 ) {
1302 let region_id = self.region_metadata().region_id;
1303 let span = tracing::info_span!(
1304 "ScanInput::parallel_scan_task",
1305 region_id = %region_id,
1306 stream_kind = "flat"
1307 );
1308 common_runtime::spawn_query(
1309 async move {
1310 loop {
1311 let maybe_batch = {
1314 let _permit = semaphore.acquire().await.unwrap();
1316 input.next().await
1317 };
1318 match maybe_batch {
1319 Some(Ok(batch)) => {
1320 let _ = sender.send(Ok(batch)).await;
1321 }
1322 Some(Err(e)) => {
1323 let _ = sender.send(Err(e)).await;
1324 break;
1325 }
1326 None => break,
1327 }
1328 }
1329 }
1330 .instrument(span),
1331 );
1332 }
1333
1334 pub(crate) fn total_rows(&self) -> usize {
1335 let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
1336 let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
1337
1338 let rows = rows_in_files + rows_in_memtables;
1339 #[cfg(feature = "enterprise")]
1340 let rows = rows
1341 + self
1342 .extension_ranges
1343 .iter()
1344 .map(|x| x.num_rows())
1345 .sum::<u64>() as usize;
1346 rows
1347 }
1348
1349 pub(crate) fn predicate_group(&self) -> &PredicateGroup {
1350 &self.predicate
1351 }
1352
1353 pub(crate) fn num_memtables(&self) -> usize {
1355 self.memtables.len()
1356 }
1357
1358 pub(crate) fn num_files(&self) -> usize {
1360 self.files.len()
1361 }
1362
1363 pub(crate) fn file_from_index(&self, index: RowGroupIndex) -> &FileHandle {
1365 let file_index = index.index - self.num_memtables();
1366 &self.files[file_index]
1367 }
1368
1369 pub fn region_metadata(&self) -> &RegionMetadataRef {
1370 self.mapper.metadata()
1371 }
1372
1373 fn range_pre_filter_mode(&self, source_count: usize) -> PreFilterMode {
1374 if source_count <= 1 {
1375 return PreFilterMode::All;
1380 }
1381
1382 pre_filter_mode(self.append_mode, self.merge_mode)
1383 }
1384}
1385
1386#[cfg(feature = "enterprise")]
1387impl ScanInput {
1388 #[must_use]
1389 pub(crate) fn with_extension_ranges(self, extension_ranges: Vec<BoxedExtensionRange>) -> Self {
1390 Self {
1391 extension_ranges,
1392 ..self
1393 }
1394 }
1395
1396 #[cfg(feature = "enterprise")]
1397 pub(crate) fn extension_ranges(&self) -> &[BoxedExtensionRange] {
1398 &self.extension_ranges
1399 }
1400
1401 #[cfg(feature = "enterprise")]
1403 pub(crate) fn extension_range(&self, i: usize) -> &BoxedExtensionRange {
1404 &self.extension_ranges[i - self.num_memtables() - self.num_files()]
1405 }
1406}
1407
1408pub(crate) struct FileLevelPruningStats {
1412 pub(crate) min_scalar: ScalarValue,
1414 pub(crate) max_scalar: ScalarValue,
1416 pub(crate) time_index_col_name: String,
1418}
1419
1420impl PruningStatistics for FileLevelPruningStats {
1421 fn min_values(&self, column: &Column) -> Option<ArrayRef> {
1422 if column.name == self.time_index_col_name {
1423 ScalarValue::iter_to_array(std::iter::once(self.min_scalar.clone())).ok()
1424 } else {
1425 None
1426 }
1427 }
1428
1429 fn max_values(&self, column: &Column) -> Option<ArrayRef> {
1430 if column.name == self.time_index_col_name {
1431 ScalarValue::iter_to_array(std::iter::once(self.max_scalar.clone())).ok()
1432 } else {
1433 None
1434 }
1435 }
1436
1437 fn num_containers(&self) -> usize {
1438 1
1439 }
1440
1441 fn null_counts(&self, column: &Column) -> Option<ArrayRef> {
1442 if column.name == self.time_index_col_name {
1443 Some(Arc::new(UInt64Array::from(vec![0u64])))
1445 } else {
1446 None
1447 }
1448 }
1449
1450 fn row_counts(&self, _column: &Column) -> Option<ArrayRef> {
1451 None
1452 }
1453
1454 fn contained(&self, _column: &Column, _values: &HashSet<ScalarValue>) -> Option<BooleanArray> {
1455 None
1456 }
1457}
1458
1459#[cfg(test)]
1460impl ScanInput {
1461 pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::RegionFileId> {
1463 self.files.iter().map(|file| file.file_id()).collect()
1464 }
1465
1466 pub(crate) fn index_ids(&self) -> Vec<crate::sst::file::RegionIndexId> {
1467 self.files.iter().map(|file| file.index_id()).collect()
1468 }
1469}
1470
1471fn pre_filter_mode(append_mode: bool, merge_mode: MergeMode) -> PreFilterMode {
1472 if append_mode {
1473 return PreFilterMode::All;
1474 }
1475
1476 match merge_mode {
1477 MergeMode::LastRow => PreFilterMode::SkipFields,
1478 MergeMode::LastNonNull => PreFilterMode::SkipFields,
1479 }
1480}
1481
1482fn narrow_read_columns_by_json_type_hint(
1483 read_columns: &mut ReadColumns,
1484 json_type_hint: &HashMap<String, JsonNativeType>,
1485 metadata: &RegionMetadata,
1486) {
1487 if json_type_hint.is_empty() {
1488 return;
1489 }
1490
1491 for read_column in &mut read_columns.cols {
1492 let Some(column) = metadata.column_by_id(read_column.column_id) else {
1493 continue;
1494 };
1495 let column_name = &column.column_schema.name;
1496 let Some(json_type) = json_type_hint.get(column_name) else {
1497 continue;
1498 };
1499
1500 let mut paths = Vec::new();
1501 let mut current = vec![column_name.clone()];
1502 collect_json_nested_paths(json_type, &mut current, &mut paths);
1503 merge_nested_paths(&mut read_column.nested_paths, paths)
1504 }
1505}
1506
1507fn collect_json_nested_paths(
1508 json_type: &JsonNativeType,
1509 current: &mut NestedPath,
1510 paths: &mut Vec<NestedPath>,
1511) {
1512 match json_type {
1513 JsonNativeType::Object(fields) if !fields.is_empty() => {
1514 for (field, child) in fields {
1515 current.push(field.clone());
1516 collect_json_nested_paths(child, current, paths);
1517 current.pop();
1518 }
1519 }
1520 _ => paths.push(current.clone()),
1521 }
1522}
1523
1524pub(crate) struct ScanFingerprintBundle {
1528 pub(crate) fingerprint: ScanRequestFingerprint,
1529 pub(crate) implied_time_range: Option<TimestampRange>,
1534}
1535
1536pub(crate) fn build_scan_fingerprint(input: &ScanInput) -> Option<ScanFingerprintBundle> {
1539 let eligible = !input.compaction
1540 && !input.files.is_empty()
1541 && matches!(input.cache_strategy, CacheStrategy::EnableAll(_));
1542
1543 if !eligible {
1544 return None;
1545 }
1546
1547 let metadata = input.region_metadata();
1548 let tag_names: HashSet<&str> = metadata
1549 .column_metadatas
1550 .iter()
1551 .filter(|col| col.semantic_type == SemanticType::Tag)
1552 .map(|col| col.column_schema.name.as_str())
1553 .collect();
1554
1555 let time_index = metadata.time_index_column();
1556 let time_index_name = time_index.column_schema.name.clone();
1557 let ts_col_unit = time_index
1558 .column_schema
1559 .data_type
1560 .as_timestamp()
1561 .expect("Time index must have timestamp-compatible type")
1562 .unit();
1563
1564 let exprs = input
1565 .predicate_group()
1566 .predicate_without_region()
1567 .map(|predicate| predicate.exprs())
1568 .unwrap_or_default();
1569
1570 let mut filters = Vec::new();
1571 let mut time_only_exprs: Vec<&Expr> = Vec::new();
1572 let mut has_tag_filter = false;
1573 let mut columns = HashSet::new();
1574
1575 for expr in exprs {
1576 columns.clear();
1577 let is_time_only = match expr_to_columns(expr, &mut columns) {
1578 Ok(()) if !columns.is_empty() => {
1579 has_tag_filter |= columns
1580 .iter()
1581 .any(|col| tag_names.contains(col.name.as_str()));
1582 columns.iter().all(|col| col.name == time_index_name)
1583 }
1584 _ => false,
1585 };
1586
1587 if is_time_only
1595 && extract_time_range_from_expr(&time_index_name, ts_col_unit, expr).is_some()
1596 {
1597 time_only_exprs.push(expr);
1598 } else {
1599 filters.push(expr.to_string());
1600 }
1601 }
1602
1603 if !has_tag_filter {
1604 return None;
1606 }
1607
1608 let implied_time_range =
1609 implied_time_range_from_exprs(&time_index_name, ts_col_unit, &time_only_exprs);
1610 let mut time_filters: Vec<String> = time_only_exprs.iter().map(|e| e.to_string()).collect();
1611
1612 filters.sort_unstable();
1614 time_filters.sort_unstable();
1615 let read_columns = input.read_cols.clone();
1616 let fingerprint = crate::read::range_cache::ScanRequestFingerprintBuilder {
1617 read_column_types: read_columns
1618 .column_ids_iter()
1619 .map(|id| {
1620 metadata
1621 .column_by_id(id)
1622 .map(|col| col.column_schema.data_type.clone())
1623 })
1624 .collect(),
1625 read_columns,
1626 filters,
1627 time_filters,
1628 series_row_selector: input.series_row_selector,
1629 append_mode: input.append_mode,
1630 filter_deleted: input.filter_deleted,
1631 merge_mode: input.merge_mode,
1632 partition_expr_version: metadata.partition_expr_version,
1633 }
1634 .build();
1635
1636 Some(ScanFingerprintBundle {
1637 fingerprint,
1638 implied_time_range,
1639 })
1640}
1641
1642pub struct StreamContext {
1645 pub input: ScanInput,
1647 pub(crate) ranges: Vec<RangeMeta>,
1649 #[allow(dead_code)]
1652 pub(crate) scan_fingerprint: Option<ScanRequestFingerprint>,
1653 pub(crate) scan_implied_time_range: Option<TimestampRange>,
1660
1661 pub(crate) query_start: Instant,
1664}
1665
1666impl StreamContext {
1667 pub(crate) fn seq_scan_ctx(input: ScanInput) -> Self {
1669 let query_start = input.query_start.unwrap_or_else(Instant::now);
1670 let ranges = RangeMeta::seq_scan_ranges(&input);
1671 READ_SST_COUNT.observe(input.num_files() as f64);
1672 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1673 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1674 None => (None, None),
1675 };
1676
1677 Self {
1678 input,
1679 ranges,
1680 scan_fingerprint,
1681 scan_implied_time_range,
1682 query_start,
1683 }
1684 }
1685
1686 pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
1688 let query_start = input.query_start.unwrap_or_else(Instant::now);
1689 let ranges = RangeMeta::unordered_scan_ranges(&input);
1690 READ_SST_COUNT.observe(input.num_files() as f64);
1691 let (scan_fingerprint, scan_implied_time_range) = match build_scan_fingerprint(&input) {
1692 Some(b) => (Some(b.fingerprint), b.implied_time_range),
1693 None => (None, None),
1694 };
1695
1696 Self {
1697 input,
1698 ranges,
1699 scan_fingerprint,
1700 scan_implied_time_range,
1701 query_start,
1702 }
1703 }
1704
1705 pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
1707 self.input.num_memtables() > index.index
1708 }
1709
1710 pub(crate) fn is_file_range_index(&self, index: RowGroupIndex) -> bool {
1711 !self.is_mem_range_index(index)
1712 && index.index < self.input.num_files() + self.input.num_memtables()
1713 }
1714
1715 pub(crate) fn range_pre_filter_mode(&self, part_range: &PartitionRange) -> PreFilterMode {
1716 let range_meta = &self.ranges[part_range.identifier];
1717 let source_count = range_meta.indices.len();
1718
1719 self.input.range_pre_filter_mode(source_count)
1720 }
1721
1722 pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
1724 self.ranges
1725 .iter()
1726 .enumerate()
1727 .map(|(idx, range_meta)| range_meta.new_partition_range(idx))
1728 .collect()
1729 }
1730
1731 pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
1733 let (mut num_mem_ranges, mut num_file_ranges, mut num_other_ranges) = (0, 0, 0);
1734 for range_meta in &self.ranges {
1735 for idx in &range_meta.row_group_indices {
1736 if self.is_mem_range_index(*idx) {
1737 num_mem_ranges += 1;
1738 } else if self.is_file_range_index(*idx) {
1739 num_file_ranges += 1;
1740 } else {
1741 num_other_ranges += 1;
1742 }
1743 }
1744 }
1745 if verbose {
1746 write!(f, "{{")?;
1747 }
1748 write!(
1749 f,
1750 r#""partition_count":{{"count":{}, "mem_ranges":{}, "files":{}, "file_ranges":{}"#,
1751 self.ranges.len(),
1752 num_mem_ranges,
1753 self.input.num_files(),
1754 num_file_ranges,
1755 )?;
1756 if num_other_ranges > 0 {
1757 write!(f, r#", "other_ranges":{}"#, num_other_ranges)?;
1758 }
1759 write!(f, "}}")?;
1760
1761 if let Some(selector) = &self.input.series_row_selector {
1762 write!(f, ", \"selector\":\"{}\"", selector)?;
1763 }
1764 if let Some(distribution) = &self.input.distribution {
1765 write!(f, ", \"distribution\":\"{}\"", distribution)?;
1766 }
1767
1768 if verbose {
1769 self.format_verbose_content(f)?;
1770 }
1771
1772 Ok(())
1773 }
1774
1775 fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
1776 struct FileWrapper<'a> {
1777 file: &'a FileHandle,
1778 }
1779
1780 impl fmt::Debug for FileWrapper<'_> {
1781 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1782 let (start, end) = self.file.time_range();
1783 write!(
1784 f,
1785 r#"{{"file_id":"{}","time_range_start":"{}::{}","time_range_end":"{}::{}","rows":{},"size":{},"index_size":{}}}"#,
1786 self.file.file_id(),
1787 start.value(),
1788 start.unit(),
1789 end.value(),
1790 end.unit(),
1791 self.file.num_rows(),
1792 self.file.size(),
1793 self.file.index_size()
1794 )
1795 }
1796 }
1797
1798 struct InputWrapper<'a> {
1799 input: &'a ScanInput,
1800 }
1801
1802 #[cfg(feature = "enterprise")]
1803 impl InputWrapper<'_> {
1804 fn format_extension_ranges(&self, f: &mut fmt::Formatter) -> fmt::Result {
1805 if self.input.extension_ranges.is_empty() {
1806 return Ok(());
1807 }
1808
1809 let mut delimiter = "";
1810 write!(f, ", extension_ranges: [")?;
1811 for range in self.input.extension_ranges() {
1812 write!(f, "{}{:?}", delimiter, range)?;
1813 delimiter = ", ";
1814 }
1815 write!(f, "]")?;
1816 Ok(())
1817 }
1818 }
1819
1820 impl fmt::Debug for InputWrapper<'_> {
1821 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
1822 let output_schema = self.input.mapper.output_schema();
1823 if !output_schema.is_empty() {
1824 let names: Vec<_> = output_schema
1825 .column_schemas()
1826 .iter()
1827 .map(|col| &col.name)
1828 .collect();
1829 write!(f, ", \"projection\": {:?}", names)?;
1830 }
1831 if let Some(predicate) = &self.input.predicate.predicate() {
1832 if !predicate.exprs().is_empty() {
1833 let exprs: Vec<_> =
1834 predicate.exprs().iter().map(|e| e.to_string()).collect();
1835 write!(f, ", \"filters\": {:?}", exprs)?;
1836 }
1837 if !predicate.dyn_filters().is_empty() {
1838 let dyn_filters: Vec<_> = predicate
1839 .dyn_filters()
1840 .iter()
1841 .map(|f| format!("{}", f))
1842 .collect();
1843 write!(f, ", \"dyn_filters\": {:?}", dyn_filters)?;
1844 }
1845 }
1846 #[cfg(feature = "vector_index")]
1847 if let Some(vector_index_k) = self.input.vector_index_k {
1848 write!(f, ", \"vector_index_k\": {}", vector_index_k)?;
1849 }
1850 if !self.input.files.is_empty() {
1851 write!(f, ", \"files\": ")?;
1852 f.debug_list()
1853 .entries(self.input.files.iter().map(|file| FileWrapper { file }))
1854 .finish()?;
1855 }
1856 write!(f, ", \"flat_format\": {}", self.input.explain_flat_format)?;
1857 #[cfg(feature = "enterprise")]
1858 self.format_extension_ranges(f)?;
1859
1860 Ok(())
1861 }
1862 }
1863
1864 write!(f, "{:?}", InputWrapper { input: &self.input })
1865 }
1866
1867 pub(crate) fn add_dyn_filter_to_predicate(
1870 self: &Arc<Self>,
1871 filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
1872 ) -> Vec<bool> {
1873 let mut supported = Vec::with_capacity(filter_exprs.len());
1874 let filter_expr = filter_exprs
1875 .into_iter()
1876 .filter_map(|expr| {
1877 if let Ok(dyn_filter) = (expr as Arc<dyn std::any::Any + Send + Sync + 'static>)
1878 .downcast::<datafusion::physical_plan::expressions::DynamicFilterPhysicalExpr>()
1879 {
1880 supported.push(true);
1881 Some(dyn_filter)
1882 } else {
1883 supported.push(false);
1884 None
1885 }
1886 })
1887 .collect();
1888 self.input.predicate.add_dyn_filters(filter_expr);
1889 supported
1890 }
1891}
1892
1893#[derive(Clone, Default)]
1896pub struct PredicateGroup {
1897 time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
1898 predicate_all: Predicate,
1900 predicate_without_region: Predicate,
1902 region_partition_expr: Option<PartitionExpr>,
1904}
1905
1906impl PredicateGroup {
1907 pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Result<Self> {
1909 let mut combined_exprs = exprs.to_vec();
1910 let mut region_partition_expr = None;
1911
1912 if let Some(expr_json) = metadata.partition_expr.as_ref()
1913 && !expr_json.is_empty()
1914 && let Some(expr) = PartitionExpr::from_json_str(expr_json)
1915 .context(InvalidPartitionExprSnafu { expr: expr_json })?
1916 {
1917 let logical_expr = expr
1918 .try_as_logical_expr()
1919 .context(InvalidPartitionExprSnafu {
1920 expr: expr_json.clone(),
1921 })?;
1922
1923 combined_exprs.push(logical_expr);
1924 region_partition_expr = Some(expr);
1925 }
1926
1927 let mut time_filters = Vec::with_capacity(combined_exprs.len());
1928 let mut columns = HashSet::new();
1930 for expr in &combined_exprs {
1931 columns.clear();
1932 let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
1933 continue;
1934 };
1935 time_filters.push(filter);
1936 }
1937 let time_filters = if time_filters.is_empty() {
1938 None
1939 } else {
1940 Some(Arc::new(time_filters))
1941 };
1942
1943 let predicate_all = Predicate::new(combined_exprs);
1944 let predicate_without_region = Predicate::new(exprs.to_vec());
1945
1946 Ok(Self {
1947 time_filters,
1948 predicate_all,
1949 predicate_without_region,
1950 region_partition_expr,
1951 })
1952 }
1953
1954 pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
1956 self.time_filters.clone()
1957 }
1958
1959 pub(crate) fn predicate(&self) -> Option<&Predicate> {
1961 if self.predicate_all.is_empty() {
1962 None
1963 } else {
1964 Some(&self.predicate_all)
1965 }
1966 }
1967
1968 pub(crate) fn predicate_without_region(&self) -> Option<&Predicate> {
1970 if self.predicate_without_region.is_empty() {
1971 None
1972 } else {
1973 Some(&self.predicate_without_region)
1974 }
1975 }
1976
1977 pub(crate) fn add_dyn_filters(&self, dyn_filters: Vec<Arc<DynamicFilterPhysicalExpr>>) {
1979 self.predicate_all.add_dyn_filters(dyn_filters.clone());
1980 self.predicate_without_region.add_dyn_filters(dyn_filters);
1981 }
1982
1983 pub(crate) fn region_partition_expr(&self) -> Option<&PartitionExpr> {
1985 self.region_partition_expr.as_ref()
1986 }
1987
1988 fn expr_to_filter(
1989 expr: &Expr,
1990 metadata: &RegionMetadata,
1991 columns: &mut HashSet<Column>,
1992 ) -> Option<SimpleFilterEvaluator> {
1993 columns.clear();
1994 expr_to_columns(expr, columns).ok()?;
1997 if columns.len() > 1 {
1998 return None;
2000 }
2001 let column = columns.iter().next()?;
2002 let column_meta = metadata.column_by_name(&column.name)?;
2003 if column_meta.semantic_type == SemanticType::Timestamp {
2004 SimpleFilterEvaluator::try_new(expr)
2005 } else {
2006 None
2007 }
2008 }
2009}
2010
2011#[cfg(test)]
2012mod tests {
2013 use std::sync::Arc;
2014
2015 use common_time::timestamp::{TimeUnit, Timestamp};
2016 use datafusion::physical_plan::expressions::{
2017 binary as physical_binary, col as physical_col, lit as physical_lit,
2018 };
2019 use datafusion_common::ScalarValue;
2020 use datafusion_expr::{Operator, col, lit};
2021 use datatypes::arrow::datatypes::{
2022 DataType as ArrowDataType, Field, Schema as ArrowSchema, TimeUnit as ArrowTimeUnit,
2023 };
2024 use datatypes::prelude::ConcreteDataType;
2025 use datatypes::schema::ColumnSchema;
2026 use datatypes::types::json_type::JsonObjectType;
2027 use datatypes::value::Value;
2028 use partition::expr::col as partition_col;
2029 use store_api::metadata::{ColumnMetadata, RegionMetadataBuilder};
2030 use store_api::storage::{RegionId, TimeSeriesDistribution, TimeSeriesRowSelector};
2031
2032 use super::*;
2033 use crate::cache::CacheManager;
2034 use crate::error::InvalidMetadataSnafu;
2035 use crate::read::range_cache::ScanRequestFingerprintBuilder;
2036 use crate::read::read_columns::ReadColumn;
2037 use crate::sst::file::FileMeta;
2038 use crate::test_util::memtable_util::metadata_with_primary_key;
2039 use crate::test_util::scheduler_util::SchedulerEnv;
2040
2041 async fn new_scan_input(metadata: RegionMetadataRef, filters: Vec<Expr>) -> ScanInput {
2042 let env = SchedulerEnv::new().await;
2043 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2044 let predicate = PredicateGroup::new(metadata.as_ref(), &filters).unwrap();
2045 let file = FileHandle::new(
2046 crate::sst::file::FileMeta::default(),
2047 Arc::new(crate::sst::file_purger::NoopFilePurger),
2048 );
2049
2050 ScanInput::new(env.access_layer.clone(), mapper)
2051 .with_predicate(predicate)
2052 .with_cache(CacheStrategy::EnableAll(Arc::new(
2053 CacheManager::builder()
2054 .range_result_cache_size(1024)
2055 .build(),
2056 )))
2057 .with_files(vec![file])
2058 }
2059
2060 fn ts_lit(val: i64) -> datafusion_expr::Expr {
2062 lit(ScalarValue::TimestampMillisecond(Some(val), None))
2063 }
2064
2065 fn metadata_with_time_index_unit(unit: TimeUnit) -> RegionMetadataRef {
2066 let mut builder = RegionMetadataBuilder::new(RegionId::new(123, 456));
2067 builder
2068 .push_column_metadata(ColumnMetadata {
2069 column_schema: ColumnSchema::new(
2070 "k0".to_string(),
2071 ConcreteDataType::string_datatype(),
2072 false,
2073 ),
2074 semantic_type: SemanticType::Tag,
2075 column_id: 0,
2076 })
2077 .push_column_metadata(ColumnMetadata {
2078 column_schema: ColumnSchema::new(
2079 "k1".to_string(),
2080 ConcreteDataType::uint32_datatype(),
2081 false,
2082 ),
2083 semantic_type: SemanticType::Tag,
2084 column_id: 1,
2085 })
2086 .push_column_metadata(ColumnMetadata {
2087 column_schema: ColumnSchema::new(
2088 "ts".to_string(),
2089 ConcreteDataType::timestamp_datatype(unit),
2090 false,
2091 ),
2092 semantic_type: SemanticType::Timestamp,
2093 column_id: 2,
2094 })
2095 .push_column_metadata(ColumnMetadata {
2096 column_schema: ColumnSchema::new(
2097 "v0".to_string(),
2098 ConcreteDataType::int64_datatype(),
2099 true,
2100 ),
2101 semantic_type: SemanticType::Field,
2102 column_id: 3,
2103 })
2104 .primary_key(vec![0, 1]);
2105
2106 Arc::new(builder.build().unwrap())
2107 }
2108
2109 fn file_handle_with_time_range(start: Timestamp, end: Timestamp) -> FileHandle {
2110 FileHandle::new(
2111 FileMeta {
2112 time_range: (start, end),
2113 ..Default::default()
2114 },
2115 Arc::new(crate::sst::file_purger::NoopFilePurger),
2116 )
2117 }
2118
2119 #[test]
2120 fn test_fill_json_nested_paths_from_hint() -> Result<()> {
2121 fn json_projection_test_metadata() -> Result<RegionMetadataRef> {
2122 let mut builder = RegionMetadataBuilder::new(RegionId::new(1024, 0));
2123 builder
2124 .push_column_metadata(ColumnMetadata {
2125 column_schema: ColumnSchema::new(
2126 "tag".to_string(),
2127 ConcreteDataType::string_datatype(),
2128 true,
2129 ),
2130 semantic_type: SemanticType::Tag,
2131 column_id: 0,
2132 })
2133 .push_column_metadata(ColumnMetadata {
2134 column_schema: ColumnSchema::new(
2135 "j".to_string(),
2136 ConcreteDataType::json2(JsonNativeType::Object(JsonObjectType::new())),
2137 true,
2138 ),
2139 semantic_type: SemanticType::Field,
2140 column_id: 1,
2141 })
2142 .push_column_metadata(ColumnMetadata {
2143 column_schema: ColumnSchema::new(
2144 "ts".to_string(),
2145 ConcreteDataType::timestamp_millisecond_datatype(),
2146 false,
2147 ),
2148 semantic_type: SemanticType::Timestamp,
2149 column_id: 2,
2150 });
2151 builder.primary_key(vec![0]);
2152 builder.build().context(InvalidMetadataSnafu).map(Arc::new)
2153 }
2154
2155 let metadata = json_projection_test_metadata()?;
2156 let hint = HashMap::from([(
2157 "j".to_string(),
2158 JsonNativeType::Object(JsonObjectType::from([
2159 ("a".to_string(), JsonNativeType::i64()),
2160 (
2161 "b".to_string(),
2162 JsonNativeType::Object(JsonObjectType::from([(
2163 "c".to_string(),
2164 JsonNativeType::String,
2165 )])),
2166 ),
2167 ])),
2168 )]);
2169
2170 fn nested_path(parts: &[&str]) -> NestedPath {
2171 parts.iter().map(|part| part.to_string()).collect()
2172 }
2173
2174 let mut read_columns = ReadColumns {
2175 cols: vec![ReadColumn::new(1, vec![]), ReadColumn::new(0, vec![])],
2176 };
2177 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2178 assert_eq!(
2179 read_columns,
2180 ReadColumns {
2181 cols: vec![
2182 ReadColumn::new(
2183 1,
2184 vec![nested_path(&["j", "a"]), nested_path(&["j", "b", "c"])]
2185 ),
2186 ReadColumn::new(0, vec![])
2187 ]
2188 }
2189 );
2190
2191 let mut read_columns = ReadColumns {
2192 cols: vec![ReadColumn::new(0, vec![])],
2193 };
2194 narrow_read_columns_by_json_type_hint(&mut read_columns, &hint, metadata.as_ref());
2195 assert_eq!(
2196 read_columns,
2197 ReadColumns {
2198 cols: vec![ReadColumn::new(0, vec![])]
2199 }
2200 );
2201 Ok(())
2202 }
2203
2204 #[tokio::test]
2205 async fn test_build_scan_fingerprint_for_eligible_scan() {
2206 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2207 let input = new_scan_input(
2208 metadata.clone(),
2209 vec![
2210 col("ts").gt_eq(ts_lit(1000)),
2211 col("k0").eq(lit("foo")),
2212 col("v0").gt(lit(1)),
2213 ],
2214 )
2215 .await
2216 .with_distribution(Some(TimeSeriesDistribution::PerSeries))
2217 .with_series_row_selector(Some(TimeSeriesRowSelector::LastRow))
2218 .with_merge_mode(MergeMode::LastNonNull)
2219 .with_filter_deleted(false);
2220
2221 let fingerprint = build_scan_fingerprint(&input).unwrap();
2222
2223 let expected = ScanRequestFingerprintBuilder {
2224 read_columns: input.read_cols,
2225 read_column_types: vec![
2226 metadata
2227 .column_by_id(0)
2228 .map(|col| col.column_schema.data_type.clone()),
2229 metadata
2230 .column_by_id(2)
2231 .map(|col| col.column_schema.data_type.clone()),
2232 metadata
2233 .column_by_id(3)
2234 .map(|col| col.column_schema.data_type.clone()),
2235 ],
2236 filters: vec![
2237 col("k0").eq(lit("foo")).to_string(),
2238 col("v0").gt(lit(1)).to_string(),
2239 ],
2240 time_filters: vec![col("ts").gt_eq(ts_lit(1000)).to_string()],
2241 series_row_selector: Some(TimeSeriesRowSelector::LastRow),
2242 append_mode: false,
2243 filter_deleted: false,
2244 merge_mode: MergeMode::LastNonNull,
2245 partition_expr_version: 0,
2246 }
2247 .build();
2248 assert_eq!(expected, fingerprint.fingerprint);
2249 }
2250
2251 #[tokio::test]
2252 async fn test_build_scan_fingerprint_requires_tag_filter() {
2253 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2254 let input = new_scan_input(
2255 metadata,
2256 vec![col("ts").gt_eq(lit(1000)), col("v0").gt(lit(1))],
2257 )
2258 .await;
2259
2260 assert!(build_scan_fingerprint(&input).is_none());
2261 }
2262
2263 #[tokio::test]
2264 async fn test_build_scan_fingerprint_respects_scan_eligibility() {
2265 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2266 let filters = vec![col("k0").eq(lit("foo"))];
2267
2268 let disabled = ScanInput::new(
2269 SchedulerEnv::new().await.access_layer.clone(),
2270 FlatProjectionMapper::new(&metadata, [0, 2, 3].into_iter()).unwrap(),
2271 )
2272 .with_predicate(PredicateGroup::new(metadata.as_ref(), &filters).unwrap());
2273 assert!(build_scan_fingerprint(&disabled).is_none());
2274
2275 let compaction = new_scan_input(metadata.clone(), filters.clone())
2276 .await
2277 .with_compaction(true);
2278 assert!(build_scan_fingerprint(&compaction).is_none());
2279
2280 let no_files = new_scan_input(metadata, filters).await.with_files(vec![]);
2282 assert!(build_scan_fingerprint(&no_files).is_none());
2283 }
2284
2285 #[tokio::test]
2286 async fn test_build_scan_fingerprint_tracks_schema_and_partition_expr_changes() {
2287 let base = metadata_with_primary_key(vec![0, 1], false);
2288 let mut builder = RegionMetadataBuilder::from_existing(base);
2289 let partition_expr = partition_col("k0")
2290 .gt_eq(Value::String("foo".into()))
2291 .as_json_str()
2292 .unwrap();
2293 builder.partition_expr_json(Some(partition_expr));
2294 let metadata = Arc::new(builder.build_without_validation().unwrap());
2295
2296 let input = new_scan_input(metadata.clone(), vec![col("k0").eq(lit("foo"))]).await;
2297 let fingerprint = build_scan_fingerprint(&input).unwrap();
2298
2299 let expected = ScanRequestFingerprintBuilder {
2300 read_columns: input.read_cols,
2301 read_column_types: vec![
2302 metadata
2303 .column_by_id(0)
2304 .map(|col| col.column_schema.data_type.clone()),
2305 metadata
2306 .column_by_id(2)
2307 .map(|col| col.column_schema.data_type.clone()),
2308 metadata
2309 .column_by_id(3)
2310 .map(|col| col.column_schema.data_type.clone()),
2311 ],
2312 filters: vec![col("k0").eq(lit("foo")).to_string()],
2313 time_filters: vec![],
2314 series_row_selector: None,
2315 append_mode: false,
2316 filter_deleted: true,
2317 merge_mode: MergeMode::LastRow,
2318 partition_expr_version: metadata.partition_expr_version,
2319 }
2320 .build();
2321 assert_eq!(expected, fingerprint.fingerprint);
2322 assert_ne!(0, metadata.partition_expr_version);
2323 }
2324
2325 #[test]
2326 fn test_update_dyn_filters_with_empty_base_predicates() {
2327 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2328 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2329 assert!(predicate_group.predicate().is_none());
2330 assert!(predicate_group.predicate_without_region().is_none());
2331
2332 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(vec![], physical_lit(false)));
2333 predicate_group.add_dyn_filters(vec![dyn_filter]);
2334
2335 let predicate_all = predicate_group.predicate().unwrap();
2336 assert!(predicate_all.exprs().is_empty());
2337 assert_eq!(1, predicate_all.dyn_filters().len());
2338
2339 let predicate_without_region = predicate_group.predicate_without_region().unwrap();
2340 assert!(predicate_without_region.exprs().is_empty());
2341 assert_eq!(1, predicate_without_region.dyn_filters().len());
2342 }
2343
2344 #[test]
2345 fn test_file_level_pruning_stats_prunes_old_file() {
2346 let ts_col_name = "ts";
2347 let predicate = Predicate::new(vec![col(ts_col_name).gt(ts_lit(1000))]);
2348 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2349 ts_col_name,
2350 ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2351 false,
2352 )]));
2353
2354 let stats = FileLevelPruningStats {
2356 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2357 max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2358 time_index_col_name: ts_col_name.to_string(),
2359 };
2360 assert_eq!(
2361 vec![false],
2362 predicate.prune_with_stats(&stats, &arrow_schema)
2363 );
2364
2365 let stats = FileLevelPruningStats {
2367 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2368 max_scalar: ScalarValue::TimestampMillisecond(Some(2000), None),
2369 time_index_col_name: ts_col_name.to_string(),
2370 };
2371 assert_eq!(
2372 vec![true],
2373 predicate.prune_with_stats(&stats, &arrow_schema)
2374 );
2375 }
2376
2377 #[test]
2378 fn test_file_level_pruning_stats_no_predicate_keeps_all() {
2379 let predicate = Predicate::new(vec![]);
2380 assert!(predicate.is_empty());
2381
2382 let stats = FileLevelPruningStats {
2383 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2384 max_scalar: ScalarValue::TimestampMillisecond(Some(500), None),
2385 time_index_col_name: "ts".to_string(),
2386 };
2387 let arrow_schema = Arc::new(ArrowSchema::new(Vec::<Field>::new()));
2388 assert_eq!(
2389 vec![true],
2390 predicate.prune_with_stats(&stats, &arrow_schema)
2391 );
2392 }
2393
2394 #[tokio::test]
2395 async fn test_file_level_pruning_stats_ceil_max_unit_conversion() {
2396 let metadata = metadata_with_time_index_unit(TimeUnit::Millisecond);
2397 let input = new_scan_input(metadata, vec![]).await;
2398 let file = file_handle_with_time_range(
2399 Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2400 Timestamp::new(1_000_001, TimeUnit::Nanosecond),
2401 );
2402
2403 let stats = input.try_file_level_pruning_stats(&file).unwrap();
2404 assert_eq!(
2405 ScalarValue::TimestampMillisecond(Some(1), None),
2406 stats.min_scalar
2407 );
2408 assert_eq!(
2409 ScalarValue::TimestampMillisecond(Some(2), None),
2410 stats.max_scalar
2411 );
2412
2413 let predicate = Predicate::new(vec![col("ts").gt(ts_lit(1))]);
2415 assert_eq!(
2416 vec![true],
2417 predicate.prune_with_stats(&stats, input.mapper.metadata().schema.arrow_schema())
2418 );
2419 }
2420
2421 #[tokio::test]
2422 async fn test_file_level_pruning_stats_overflow_keeps_file() {
2423 let metadata = metadata_with_time_index_unit(TimeUnit::Nanosecond);
2424 let input = new_scan_input(metadata, vec![]).await;
2425 let file = file_handle_with_time_range(
2426 Timestamp::new(0, TimeUnit::Second),
2427 Timestamp::new(i64::MAX, TimeUnit::Second),
2428 );
2429
2430 assert!(input.try_file_level_pruning_stats(&file).is_none());
2431 }
2432
2433 #[test]
2434 fn test_file_level_pruning_stats_keeps_inclusive_boundary() {
2435 let ts_col_name = "ts";
2436 let predicate = Predicate::new(vec![col(ts_col_name).gt_eq(ts_lit(1000))]);
2437 let arrow_schema = Arc::new(ArrowSchema::new(vec![Field::new(
2438 ts_col_name,
2439 ArrowDataType::Timestamp(ArrowTimeUnit::Millisecond, None),
2440 false,
2441 )]));
2442 let stats = FileLevelPruningStats {
2443 min_scalar: ScalarValue::TimestampMillisecond(Some(0), None),
2444 max_scalar: ScalarValue::TimestampMillisecond(Some(1000), None),
2445 time_index_col_name: ts_col_name.to_string(),
2446 };
2447
2448 assert_eq!(
2449 vec![true],
2450 predicate.prune_with_stats(&stats, &arrow_schema)
2451 );
2452 }
2453
2454 #[tokio::test]
2455 async fn test_file_level_pruning_with_dyn_filter_only_predicate() {
2456 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2457 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2458 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2459 predicate_group.add_dyn_filters(vec![Arc::new(DynamicFilterPhysicalExpr::new(
2460 vec![],
2461 physical_lit(false),
2462 ))]);
2463 let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2464 .with_predicate(predicate_group);
2465 let file = file_handle_with_time_range(
2466 Timestamp::new_millisecond(0),
2467 Timestamp::new_millisecond(1000),
2468 );
2469 let mut reader_metrics = ReaderMetrics::default();
2470
2471 let builder = input
2472 .prune_file(&file, PreFilterMode::SkipFields, &mut reader_metrics)
2473 .await
2474 .unwrap();
2475
2476 assert_eq!(1, reader_metrics.filter_metrics.files_time_range_pruned);
2477 let mut ranges = SmallVec::new();
2478 builder.build_ranges(-1, &mut ranges);
2479 assert!(ranges.is_empty());
2480 }
2481
2482 #[tokio::test]
2483 async fn test_manifest_pruning_observes_dynamic_filter_update() {
2484 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2485 let mapper = FlatProjectionMapper::new(&metadata, [0, 2, 3]).unwrap();
2486 let predicate_group = PredicateGroup::new(metadata.as_ref(), &[]).unwrap();
2487 let arrow_schema = metadata.schema.arrow_schema();
2488 let ts_expr = physical_col("ts", arrow_schema.as_ref()).unwrap();
2489 let dyn_filter = Arc::new(DynamicFilterPhysicalExpr::new(
2490 vec![ts_expr.clone()],
2491 physical_lit(true),
2492 ));
2493 predicate_group.add_dyn_filters(vec![dyn_filter.clone()]);
2494 let input = ScanInput::new(SchedulerEnv::new().await.access_layer.clone(), mapper)
2495 .with_predicate(predicate_group);
2496 let file = file_handle_with_time_range(
2497 Timestamp::new_millisecond(0),
2498 Timestamp::new_millisecond(1000),
2499 );
2500
2501 assert!(!input.can_manifest_prune_file(&file));
2502
2503 let updated = physical_binary(
2504 ts_expr,
2505 Operator::Gt,
2506 physical_lit(ScalarValue::TimestampMillisecond(Some(1000), None)),
2507 arrow_schema.as_ref(),
2508 )
2509 .unwrap();
2510 dyn_filter.update(updated).unwrap();
2511
2512 assert!(input.can_manifest_prune_file(&file));
2513 }
2514
2515 #[tokio::test]
2516 async fn test_range_pre_filter_mode() {
2517 let metadata = Arc::new(metadata_with_primary_key(vec![0, 1], false));
2518 let cases = [
2519 (true, MergeMode::LastRow, 1, PreFilterMode::All),
2520 (false, MergeMode::LastNonNull, 1, PreFilterMode::All),
2521 (false, MergeMode::LastRow, 2, PreFilterMode::SkipFields),
2522 (true, MergeMode::LastRow, 2, PreFilterMode::All),
2523 ];
2524
2525 for (append_mode, merge_mode, source_count, expected_mode) in cases {
2526 let input = new_scan_input(metadata.clone(), vec![])
2527 .await
2528 .with_append_mode(append_mode)
2529 .with_merge_mode(merge_mode);
2530
2531 assert_eq!(expected_mode, input.range_pre_filter_mode(source_count));
2532 }
2533 }
2534}