use std::collections::HashSet;
use std::fmt;
use std::sync::Arc;
use std::time::Instant;
use api::v1::SemanticType;
use common_error::ext::BoxedError;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_recordbatch::SendableRecordBatchStream;
use common_telemetry::{debug, error, tracing, warn};
use common_time::range::TimestampRange;
use datafusion_common::Column;
use datafusion_expr::utils::expr_to_columns;
use datafusion_expr::Expr;
use smallvec::SmallVec;
use store_api::metadata::RegionMetadata;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesDistribution, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Semaphore};
use tokio_stream::wrappers::ReceiverStream;
use crate::access_layer::AccessLayerRef;
use crate::cache::CacheStrategy;
use crate::config::DEFAULT_SCAN_CHANNEL_SIZE;
use crate::error::Result;
use crate::memtable::MemtableRange;
use crate::metrics::READ_SST_COUNT;
use crate::read::compat::{self, CompatBatch};
use crate::read::projection::ProjectionMapper;
use crate::read::range::{FileRangeBuilder, MemRangeBuilder, RangeMeta, RowGroupIndex};
use crate::read::seq_scan::SeqScan;
use crate::read::unordered_scan::UnorderedScan;
use crate::read::{Batch, Source};
use crate::region::options::MergeMode;
use crate::region::version::VersionRef;
use crate::sst::file::FileHandle;
use crate::sst::index::bloom_filter::applier::{
BloomFilterIndexApplierBuilder, BloomFilterIndexApplierRef,
};
use crate::sst::index::fulltext_index::applier::builder::FulltextIndexApplierBuilder;
use crate::sst::index::fulltext_index::applier::FulltextIndexApplierRef;
use crate::sst::index::inverted_index::applier::builder::InvertedIndexApplierBuilder;
use crate::sst::index::inverted_index::applier::InvertedIndexApplierRef;
use crate::sst::parquet::reader::ReaderMetrics;
pub(crate) enum Scanner {
Seq(SeqScan),
Unordered(UnorderedScan),
}
impl Scanner {
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) async fn scan(&self) -> Result<SendableRecordBatchStream, BoxedError> {
match self {
Scanner::Seq(seq_scan) => seq_scan.build_stream(),
Scanner::Unordered(unordered_scan) => unordered_scan.build_stream().await,
}
}
}
#[cfg(test)]
impl Scanner {
pub(crate) fn num_files(&self) -> usize {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().num_files(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_files(),
}
}
pub(crate) fn num_memtables(&self) -> usize {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().num_memtables(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().num_memtables(),
}
}
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
match self {
Scanner::Seq(seq_scan) => seq_scan.input().file_ids(),
Scanner::Unordered(unordered_scan) => unordered_scan.input().file_ids(),
}
}
pub(crate) fn set_target_partitions(&mut self, target_partitions: usize) {
use store_api::region_engine::{PrepareRequest, RegionScanner};
let request = PrepareRequest::default().with_target_partitions(target_partitions);
match self {
Scanner::Seq(seq_scan) => seq_scan.prepare(request).unwrap(),
Scanner::Unordered(unordered_scan) => unordered_scan.prepare(request).unwrap(),
}
}
}
#[cfg_attr(doc, aquamarine::aquamarine)]
pub(crate) struct ScanRegion {
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_strategy: CacheStrategy,
parallel_scan_channel_size: usize,
ignore_inverted_index: bool,
ignore_fulltext_index: bool,
ignore_bloom_filter: bool,
start_time: Option<Instant>,
}
impl ScanRegion {
pub(crate) fn new(
version: VersionRef,
access_layer: AccessLayerRef,
request: ScanRequest,
cache_strategy: CacheStrategy,
) -> ScanRegion {
ScanRegion {
version,
access_layer,
request,
cache_strategy,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
ignore_inverted_index: false,
ignore_fulltext_index: false,
ignore_bloom_filter: false,
start_time: None,
}
}
#[must_use]
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
#[must_use]
pub(crate) fn with_ignore_inverted_index(mut self, ignore: bool) -> Self {
self.ignore_inverted_index = ignore;
self
}
#[must_use]
pub(crate) fn with_ignore_fulltext_index(mut self, ignore: bool) -> Self {
self.ignore_fulltext_index = ignore;
self
}
#[must_use]
pub(crate) fn with_ignore_bloom_filter(mut self, ignore: bool) -> Self {
self.ignore_bloom_filter = ignore;
self
}
#[must_use]
pub(crate) fn with_start_time(mut self, now: Instant) -> Self {
self.start_time = Some(now);
self
}
pub(crate) fn scanner(self) -> Result<Scanner> {
if self.use_unordered_scan() {
self.unordered_scan().map(Scanner::Unordered)
} else {
self.seq_scan().map(Scanner::Seq)
}
}
#[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
pub(crate) fn region_scanner(self) -> Result<RegionScannerRef> {
if self.use_unordered_scan() {
self.unordered_scan().map(|scanner| Box::new(scanner) as _)
} else {
self.seq_scan().map(|scanner| Box::new(scanner) as _)
}
}
pub(crate) fn seq_scan(self) -> Result<SeqScan> {
let input = self.scan_input(true)?;
Ok(SeqScan::new(input, false))
}
pub(crate) fn unordered_scan(self) -> Result<UnorderedScan> {
let input = self.scan_input(true)?;
Ok(UnorderedScan::new(input))
}
#[cfg(test)]
pub(crate) fn scan_without_filter_deleted(self) -> Result<SeqScan> {
let input = self.scan_input(false)?;
Ok(SeqScan::new(input, false))
}
fn use_unordered_scan(&self) -> bool {
self.version.options.append_mode
&& self.request.series_row_selector.is_none()
&& (self.request.distribution.is_none()
|| self.request.distribution == Some(TimeSeriesDistribution::TimeWindowed))
}
fn scan_input(mut self, filter_deleted: bool) -> Result<ScanInput> {
let time_range = self.build_time_range_predicate();
let ssts = &self.version.ssts;
let mut files = Vec::new();
for level in ssts.levels() {
for file in level.files.values() {
if file_in_range(file, &time_range) {
files.push(file.clone());
}
}
}
let memtables = self.version.memtables.list_memtables();
let memtables: Vec<_> = memtables
.into_iter()
.filter(|mem| {
if mem.is_empty() {
return false;
}
let stats = mem.stats();
let (start, end) = stats.time_range().unwrap();
let memtable_range = TimestampRange::new_inclusive(Some(start), Some(end));
memtable_range.intersects(&time_range)
})
.collect();
debug!(
"Scan region {}, request: {:?}, time range: {:?}, memtables: {}, ssts_to_read: {}, append_mode: {}",
self.version.metadata.region_id,
self.request,
time_range,
memtables.len(),
files.len(),
self.version.options.append_mode,
);
self.maybe_remove_field_filters();
let inverted_index_applier = self.build_invereted_index_applier();
let bloom_filter_applier = self.build_bloom_filter_applier();
let fulltext_index_applier = self.build_fulltext_index_applier();
let predicate = PredicateGroup::new(&self.version.metadata, &self.request.filters);
let mapper = match &self.request.projection {
Some(p) => ProjectionMapper::new(&self.version.metadata, p.iter().copied())?,
None => ProjectionMapper::all(&self.version.metadata)?,
};
let memtables = memtables
.into_iter()
.map(|mem| {
let ranges = mem.ranges(
Some(mapper.column_ids()),
predicate.clone(),
self.request.sequence,
);
MemRangeBuilder::new(ranges)
})
.collect();
let input = ScanInput::new(self.access_layer, mapper)
.with_time_range(Some(time_range))
.with_predicate(predicate)
.with_memtables(memtables)
.with_files(files)
.with_cache(self.cache_strategy)
.with_inverted_index_applier(inverted_index_applier)
.with_bloom_filter_index_applier(bloom_filter_applier)
.with_fulltext_index_applier(fulltext_index_applier)
.with_parallel_scan_channel_size(self.parallel_scan_channel_size)
.with_start_time(self.start_time)
.with_append_mode(self.version.options.append_mode)
.with_filter_deleted(filter_deleted)
.with_merge_mode(self.version.options.merge_mode())
.with_series_row_selector(self.request.series_row_selector)
.with_distribution(self.request.distribution);
Ok(input)
}
fn build_time_range_predicate(&self) -> TimestampRange {
let time_index = self.version.metadata.time_index_column();
let unit = time_index
.column_schema
.data_type
.as_timestamp()
.expect("Time index must have timestamp-compatible type")
.unit();
build_time_range_predicate(&time_index.column_schema.name, unit, &self.request.filters)
}
fn maybe_remove_field_filters(&mut self) {
if self.version.options.merge_mode() != MergeMode::LastNonNull {
return;
}
let field_columns = self
.version
.metadata
.field_columns()
.map(|col| &col.column_schema.name)
.collect::<HashSet<_>>();
let mut columns = HashSet::new();
self.request.filters.retain(|expr| {
columns.clear();
if expr_to_columns(expr, &mut columns).is_err() {
return false;
}
for column in &columns {
if field_columns.contains(&column.name) {
return false;
}
}
true
});
}
fn build_invereted_index_applier(&self) -> Option<InvertedIndexApplierRef> {
if self.ignore_inverted_index {
return None;
}
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let inverted_index_cache = self.cache_strategy.inverted_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
InvertedIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
self.version.metadata.as_ref(),
self.version.metadata.inverted_indexed_column_ids(
self.version
.options
.index_options
.inverted_index
.ignore_column_ids
.iter(),
),
self.access_layer.puffin_manager_factory().clone(),
)
.with_file_cache(file_cache)
.with_inverted_index_cache(inverted_index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build invereted index applier"))
.ok()
.flatten()
.map(Arc::new)
}
fn build_bloom_filter_applier(&self) -> Option<BloomFilterIndexApplierRef> {
if self.ignore_bloom_filter {
return None;
}
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let bloom_filter_index_cache = self.cache_strategy.bloom_filter_index_cache().cloned();
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
BloomFilterIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.access_layer.object_store().clone(),
self.version.metadata.as_ref(),
self.access_layer.puffin_manager_factory().clone(),
)
.with_file_cache(file_cache)
.with_bloom_filter_index_cache(bloom_filter_index_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build bloom filter index applier"))
.ok()
.flatten()
.map(Arc::new)
}
fn build_fulltext_index_applier(&self) -> Option<FulltextIndexApplierRef> {
if self.ignore_fulltext_index {
return None;
}
let file_cache = self.cache_strategy.write_cache().map(|w| w.file_cache());
let puffin_metadata_cache = self.cache_strategy.puffin_metadata_cache().cloned();
FulltextIndexApplierBuilder::new(
self.access_layer.region_dir().to_string(),
self.version.metadata.region_id,
self.access_layer.object_store().clone(),
self.access_layer.puffin_manager_factory().clone(),
self.version.metadata.as_ref(),
)
.with_file_cache(file_cache)
.with_puffin_metadata_cache(puffin_metadata_cache)
.build(&self.request.filters)
.inspect_err(|err| warn!(err; "Failed to build fulltext index applier"))
.ok()
.flatten()
.map(Arc::new)
}
}
fn file_in_range(file: &FileHandle, predicate: &TimestampRange) -> bool {
if predicate == &TimestampRange::min_to_max() {
return true;
}
let (start, end) = file.time_range();
let file_ts_range = TimestampRange::new_inclusive(Some(start), Some(end));
file_ts_range.intersects(predicate)
}
pub(crate) struct ScanInput {
access_layer: AccessLayerRef,
pub(crate) mapper: Arc<ProjectionMapper>,
time_range: Option<TimestampRange>,
pub(crate) predicate: PredicateGroup,
pub(crate) memtables: Vec<MemRangeBuilder>,
pub(crate) files: Vec<FileHandle>,
pub(crate) cache_strategy: CacheStrategy,
ignore_file_not_found: bool,
pub(crate) parallel_scan_channel_size: usize,
inverted_index_applier: Option<InvertedIndexApplierRef>,
bloom_filter_index_applier: Option<BloomFilterIndexApplierRef>,
fulltext_index_applier: Option<FulltextIndexApplierRef>,
pub(crate) query_start: Option<Instant>,
pub(crate) append_mode: bool,
pub(crate) filter_deleted: bool,
pub(crate) merge_mode: MergeMode,
pub(crate) series_row_selector: Option<TimeSeriesRowSelector>,
pub(crate) distribution: Option<TimeSeriesDistribution>,
}
impl ScanInput {
#[must_use]
pub(crate) fn new(access_layer: AccessLayerRef, mapper: ProjectionMapper) -> ScanInput {
ScanInput {
access_layer,
mapper: Arc::new(mapper),
time_range: None,
predicate: PredicateGroup::default(),
memtables: Vec::new(),
files: Vec::new(),
cache_strategy: CacheStrategy::Disabled,
ignore_file_not_found: false,
parallel_scan_channel_size: DEFAULT_SCAN_CHANNEL_SIZE,
inverted_index_applier: None,
bloom_filter_index_applier: None,
fulltext_index_applier: None,
query_start: None,
append_mode: false,
filter_deleted: true,
merge_mode: MergeMode::default(),
series_row_selector: None,
distribution: None,
}
}
#[must_use]
pub(crate) fn with_time_range(mut self, time_range: Option<TimestampRange>) -> Self {
self.time_range = time_range;
self
}
#[must_use]
pub(crate) fn with_predicate(mut self, predicate: PredicateGroup) -> Self {
self.predicate = predicate;
self
}
#[must_use]
pub(crate) fn with_memtables(mut self, memtables: Vec<MemRangeBuilder>) -> Self {
self.memtables = memtables;
self
}
#[must_use]
pub(crate) fn with_files(mut self, files: Vec<FileHandle>) -> Self {
self.files = files;
self
}
#[must_use]
pub(crate) fn with_cache(mut self, cache: CacheStrategy) -> Self {
self.cache_strategy = cache;
self
}
#[must_use]
pub(crate) fn with_ignore_file_not_found(mut self, ignore: bool) -> Self {
self.ignore_file_not_found = ignore;
self
}
#[must_use]
pub(crate) fn with_parallel_scan_channel_size(
mut self,
parallel_scan_channel_size: usize,
) -> Self {
self.parallel_scan_channel_size = parallel_scan_channel_size;
self
}
#[must_use]
pub(crate) fn with_inverted_index_applier(
mut self,
applier: Option<InvertedIndexApplierRef>,
) -> Self {
self.inverted_index_applier = applier;
self
}
#[must_use]
pub(crate) fn with_bloom_filter_index_applier(
mut self,
applier: Option<BloomFilterIndexApplierRef>,
) -> Self {
self.bloom_filter_index_applier = applier;
self
}
#[must_use]
pub(crate) fn with_fulltext_index_applier(
mut self,
applier: Option<FulltextIndexApplierRef>,
) -> Self {
self.fulltext_index_applier = applier;
self
}
#[must_use]
pub(crate) fn with_start_time(mut self, now: Option<Instant>) -> Self {
self.query_start = now;
self
}
#[must_use]
pub(crate) fn with_append_mode(mut self, is_append_mode: bool) -> Self {
self.append_mode = is_append_mode;
self
}
#[must_use]
pub(crate) fn with_filter_deleted(mut self, filter_deleted: bool) -> Self {
self.filter_deleted = filter_deleted;
self
}
#[must_use]
pub(crate) fn with_merge_mode(mut self, merge_mode: MergeMode) -> Self {
self.merge_mode = merge_mode;
self
}
#[must_use]
pub(crate) fn with_distribution(
mut self,
distribution: Option<TimeSeriesDistribution>,
) -> Self {
self.distribution = distribution;
self
}
#[must_use]
pub(crate) fn with_series_row_selector(
mut self,
series_row_selector: Option<TimeSeriesRowSelector>,
) -> Self {
self.series_row_selector = series_row_selector;
self
}
pub(crate) fn create_parallel_sources(
&self,
sources: Vec<Source>,
semaphore: Arc<Semaphore>,
) -> Result<Vec<Source>> {
if sources.len() <= 1 {
return Ok(sources);
}
let sources = sources
.into_iter()
.map(|source| {
let (sender, receiver) = mpsc::channel(self.parallel_scan_channel_size);
self.spawn_scan_task(source, semaphore.clone(), sender);
let stream = Box::pin(ReceiverStream::new(receiver));
Source::Stream(stream)
})
.collect();
Ok(sources)
}
pub(crate) fn build_mem_ranges(&self, index: RowGroupIndex) -> SmallVec<[MemtableRange; 2]> {
let memtable = &self.memtables[index.index];
let mut ranges = SmallVec::new();
memtable.build_ranges(index.row_group_index, &mut ranges);
ranges
}
pub(crate) async fn prune_file(
&self,
file_index: usize,
reader_metrics: &mut ReaderMetrics,
) -> Result<FileRangeBuilder> {
let file = &self.files[file_index];
let res = self
.access_layer
.read_sst(file.clone())
.predicate(self.predicate.predicate().cloned())
.projection(Some(self.mapper.column_ids().to_vec()))
.cache(self.cache_strategy.clone())
.inverted_index_applier(self.inverted_index_applier.clone())
.bloom_filter_index_applier(self.bloom_filter_index_applier.clone())
.fulltext_index_applier(self.fulltext_index_applier.clone())
.expected_metadata(Some(self.mapper.metadata().clone()))
.build_reader_input(reader_metrics)
.await;
let (mut file_range_ctx, row_groups) = match res {
Ok(x) => x,
Err(e) => {
if e.is_object_not_found() && self.ignore_file_not_found {
error!(e; "File to scan does not exist, region_id: {}, file: {}", file.region_id(), file.file_id());
return Ok(FileRangeBuilder::default());
} else {
return Err(e);
}
}
};
if !compat::has_same_columns_and_pk_encoding(
self.mapper.metadata(),
file_range_ctx.read_format().metadata(),
) {
let compat = CompatBatch::new(
&self.mapper,
file_range_ctx.read_format().metadata().clone(),
)?;
file_range_ctx.set_compat_batch(Some(compat));
}
Ok(FileRangeBuilder::new(Arc::new(file_range_ctx), row_groups))
}
pub(crate) fn spawn_scan_task(
&self,
mut input: Source,
semaphore: Arc<Semaphore>,
sender: mpsc::Sender<Result<Batch>>,
) {
common_runtime::spawn_global(async move {
loop {
let maybe_batch = {
let _permit = semaphore.acquire().await.unwrap();
input.next_batch().await
};
match maybe_batch {
Ok(Some(batch)) => {
let _ = sender.send(Ok(batch)).await;
}
Ok(None) => break,
Err(e) => {
let _ = sender.send(Err(e)).await;
break;
}
}
}
});
}
pub(crate) fn total_rows(&self) -> usize {
let rows_in_files: usize = self.files.iter().map(|f| f.num_rows()).sum();
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
}
pub(crate) fn predicate(&self) -> Option<&Predicate> {
self.predicate.predicate()
}
pub(crate) fn num_memtables(&self) -> usize {
self.memtables.len()
}
pub(crate) fn num_files(&self) -> usize {
self.files.len()
}
}
#[cfg(test)]
impl ScanInput {
pub(crate) fn file_ids(&self) -> Vec<crate::sst::file::FileId> {
self.files.iter().map(|file| file.file_id()).collect()
}
}
pub(crate) struct StreamContext {
pub(crate) input: ScanInput,
pub(crate) ranges: Vec<RangeMeta>,
pub(crate) query_start: Instant,
}
impl StreamContext {
pub(crate) fn seq_scan_ctx(input: ScanInput, compaction: bool) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::seq_scan_ranges(&input, compaction);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {
input,
ranges,
query_start,
}
}
pub(crate) fn unordered_scan_ctx(input: ScanInput) -> Self {
let query_start = input.query_start.unwrap_or_else(Instant::now);
let ranges = RangeMeta::unordered_scan_ranges(&input);
READ_SST_COUNT.observe(input.num_files() as f64);
Self {
input,
ranges,
query_start,
}
}
pub(crate) fn is_mem_range_index(&self, index: RowGroupIndex) -> bool {
self.input.num_memtables() > index.index
}
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
self.ranges
.iter()
.enumerate()
.map(|(idx, range_meta)| range_meta.new_partition_range(idx))
.collect()
}
pub(crate) fn format_for_explain(&self, verbose: bool, f: &mut fmt::Formatter) -> fmt::Result {
let (mut num_mem_ranges, mut num_file_ranges) = (0, 0);
for range_meta in &self.ranges {
for idx in &range_meta.row_group_indices {
if self.is_mem_range_index(*idx) {
num_mem_ranges += 1;
} else {
num_file_ranges += 1;
}
}
}
write!(
f,
"partition_count={} ({} memtable ranges, {} file {} ranges)",
self.ranges.len(),
num_mem_ranges,
self.input.num_files(),
num_file_ranges,
)?;
if let Some(selector) = &self.input.series_row_selector {
write!(f, ", selector={}", selector)?;
}
if let Some(distribution) = &self.input.distribution {
write!(f, ", distribution={}", distribution)?;
}
if verbose {
self.format_verbose_content(f)?;
}
Ok(())
}
fn format_verbose_content(&self, f: &mut fmt::Formatter) -> fmt::Result {
struct FileWrapper<'a> {
file: &'a FileHandle,
}
impl fmt::Debug for FileWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"[file={}, time_range=({}::{}, {}::{}), rows={}, size={}, index_size={}]",
self.file.file_id(),
self.file.time_range().0.value(),
self.file.time_range().0.unit(),
self.file.time_range().1.value(),
self.file.time_range().1.unit(),
self.file.num_rows(),
self.file.size(),
self.file.index_size()
)
}
}
struct InputWrapper<'a> {
input: &'a ScanInput,
}
impl fmt::Debug for InputWrapper<'_> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
let output_schema = self.input.mapper.output_schema();
if !output_schema.is_empty() {
write!(f, ", projection=")?;
f.debug_list()
.entries(output_schema.column_schemas().iter().map(|col| &col.name))
.finish()?;
}
if let Some(predicate) = &self.input.predicate.predicate() {
if !predicate.exprs().is_empty() {
write!(f, ", filters=[")?;
for (i, expr) in predicate.exprs().iter().enumerate() {
if i == predicate.exprs().len() - 1 {
write!(f, "{}]", expr)?;
} else {
write!(f, "{}, ", expr)?;
}
}
}
}
if !self.input.files.is_empty() {
write!(f, ", files=")?;
f.debug_list()
.entries(self.input.files.iter().map(|file| FileWrapper { file }))
.finish()?;
}
Ok(())
}
}
write!(f, "{:?}", InputWrapper { input: &self.input })
}
}
#[derive(Clone, Default)]
pub struct PredicateGroup {
time_filters: Option<Arc<Vec<SimpleFilterEvaluator>>>,
predicate: Option<Predicate>,
}
impl PredicateGroup {
pub fn new(metadata: &RegionMetadata, exprs: &[Expr]) -> Self {
let mut time_filters = Vec::with_capacity(exprs.len());
let mut columns = HashSet::new();
for expr in exprs {
columns.clear();
let Some(filter) = Self::expr_to_filter(expr, metadata, &mut columns) else {
continue;
};
time_filters.push(filter);
}
let time_filters = if time_filters.is_empty() {
None
} else {
Some(Arc::new(time_filters))
};
let predicate = Predicate::new(exprs.to_vec());
Self {
time_filters,
predicate: Some(predicate),
}
}
pub(crate) fn time_filters(&self) -> Option<Arc<Vec<SimpleFilterEvaluator>>> {
self.time_filters.clone()
}
pub(crate) fn predicate(&self) -> Option<&Predicate> {
self.predicate.as_ref()
}
fn expr_to_filter(
expr: &Expr,
metadata: &RegionMetadata,
columns: &mut HashSet<Column>,
) -> Option<SimpleFilterEvaluator> {
columns.clear();
expr_to_columns(expr, columns).ok()?;
if columns.len() > 1 {
return None;
}
let column = columns.iter().next()?;
let column_meta = metadata.column_by_name(&column.name)?;
if column_meta.semantic_type == SemanticType::Timestamp {
SimpleFilterEvaluator::try_new(expr)
} else {
None
}
}
}