mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::{FlatLastRowReader, LastRowReader};
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::pruner::{PartitionPruner, Pruner};
45use crate::read::range::RangeMeta;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{
48    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
49    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
50    should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{
54    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
55};
56use crate::region::options::MergeMode;
57use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
58
59/// Scans a region and returns rows in a sorted sequence.
60///
61/// The output order is always `order by primary keys, time index` inside every
62/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
63pub struct SeqScan {
64    /// Properties of the scanner.
65    properties: ScannerProperties,
66    /// Context of streams.
67    stream_ctx: Arc<StreamContext>,
68    /// Shared pruner for file range building.
69    pruner: Arc<Pruner>,
70    /// Metrics for each partition.
71    /// The scanner only sets in query and keeps it empty during compaction.
72    metrics_list: PartitionMetricsList,
73}
74
75impl SeqScan {
76    /// Creates a new [SeqScan] with the given input.
77    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
78    pub(crate) fn new(input: ScanInput) -> Self {
79        let mut properties = ScannerProperties::default()
80            .with_append_mode(input.append_mode)
81            .with_total_rows(input.total_rows());
82        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
83        properties.partitions = vec![stream_ctx.partition_ranges()];
84
85        // Create the shared pruner with number of workers equal to CPU cores.
86        let num_workers = common_stat::get_total_cpu_cores().max(1);
87        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
88
89        Self {
90            properties,
91            stream_ctx,
92            pruner,
93            metrics_list: PartitionMetricsList::default(),
94        }
95    }
96
97    /// Builds a stream for the query.
98    ///
99    /// The returned stream is not partitioned and will contains all the data. If want
100    /// partitioned scan, use [`RegionScanner::scan_partition`].
101    #[tracing::instrument(
102        skip_all,
103        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
104    )]
105    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
106        let metrics_set = ExecutionPlanMetricsSet::new();
107        let streams = (0..self.properties.partitions.len())
108            .map(|partition: usize| {
109                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
110            })
111            .collect::<Result<Vec<_>, _>>()?;
112
113        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
114        Ok(Box::pin(aggr_stream))
115    }
116
117    /// Scan [`Batch`] in all partitions one by one.
118    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
119        let metrics_set = ExecutionPlanMetricsSet::new();
120
121        let streams = (0..self.properties.partitions.len())
122            .map(|partition| {
123                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
124                self.scan_batch_in_partition(partition, metrics)
125            })
126            .collect::<Result<Vec<_>>>()?;
127
128        Ok(Box::pin(futures::stream::iter(streams).flatten()))
129    }
130
131    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
132    ///
133    /// # Panics
134    /// Panics if the compaction flag is not set.
135    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
136        assert!(self.stream_ctx.input.compaction);
137
138        let metrics_set = ExecutionPlanMetricsSet::new();
139        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
140        debug_assert_eq!(1, self.properties.partitions.len());
141        let partition_ranges = &self.properties.partitions[0];
142
143        let reader = Self::merge_all_flat_ranges_for_compaction(
144            &self.stream_ctx,
145            partition_ranges,
146            &part_metrics,
147            self.pruner.clone(),
148        )
149        .await?;
150        Ok(reader)
151    }
152
153    /// Builds a merge reader that reads all flat ranges.
154    /// Callers MUST not split ranges before calling this method.
155    async fn merge_all_flat_ranges_for_compaction(
156        stream_ctx: &Arc<StreamContext>,
157        partition_ranges: &[PartitionRange],
158        part_metrics: &PartitionMetrics,
159        pruner: Arc<Pruner>,
160    ) -> Result<BoxedRecordBatchStream> {
161        pruner.add_partition_ranges(partition_ranges);
162        let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
163
164        let mut sources = Vec::new();
165        for part_range in partition_ranges {
166            build_flat_sources(
167                stream_ctx,
168                part_range,
169                true,
170                part_metrics,
171                partition_pruner.clone(),
172                &mut sources,
173                None,
174            )
175            .await?;
176        }
177
178        common_telemetry::debug!(
179            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
180            stream_ctx.input.mapper.metadata().region_id,
181            partition_ranges.len(),
182            sources.len()
183        );
184        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
185    }
186
187    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
188    /// if possible.
189    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
190    pub(crate) async fn build_reader_from_sources(
191        stream_ctx: &StreamContext,
192        mut sources: Vec<Source>,
193        semaphore: Option<Arc<Semaphore>>,
194        part_metrics: Option<&PartitionMetrics>,
195    ) -> Result<BoxedBatchReader> {
196        if let Some(semaphore) = semaphore.as_ref() {
197            // Read sources in parallel.
198            if sources.len() > 1 {
199                sources = stream_ctx
200                    .input
201                    .create_parallel_sources(sources, semaphore.clone())?;
202            }
203        }
204
205        let mut builder = MergeReaderBuilder::from_sources(sources);
206        if let Some(metrics) = part_metrics {
207            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
208        }
209        let reader = builder.build().await?;
210
211        let dedup = !stream_ctx.input.append_mode;
212        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
213        let reader = if dedup {
214            match stream_ctx.input.merge_mode {
215                MergeMode::LastRow => Box::new(DedupReader::new(
216                    reader,
217                    LastRow::new(stream_ctx.input.filter_deleted),
218                    dedup_metrics_reporter,
219                )) as _,
220                MergeMode::LastNonNull => Box::new(DedupReader::new(
221                    reader,
222                    LastNonNull::new(stream_ctx.input.filter_deleted),
223                    dedup_metrics_reporter,
224                )) as _,
225            }
226        } else {
227            Box::new(reader) as _
228        };
229
230        let reader = match &stream_ctx.input.series_row_selector {
231            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
232            None => reader,
233        };
234
235        Ok(reader)
236    }
237
238    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
239    /// if possible.
240    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
241    pub(crate) async fn build_flat_reader_from_sources(
242        stream_ctx: &StreamContext,
243        mut sources: Vec<BoxedRecordBatchStream>,
244        semaphore: Option<Arc<Semaphore>>,
245        part_metrics: Option<&PartitionMetrics>,
246    ) -> Result<BoxedRecordBatchStream> {
247        if let Some(semaphore) = semaphore.as_ref() {
248            // Read sources in parallel.
249            if sources.len() > 1 {
250                sources = stream_ctx
251                    .input
252                    .create_parallel_flat_sources(sources, semaphore.clone())?;
253            }
254        }
255
256        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
257        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
258
259        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
260        let reader =
261            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
262                .await?;
263
264        let dedup = !stream_ctx.input.append_mode;
265        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
266        let reader = if dedup {
267            match stream_ctx.input.merge_mode {
268                MergeMode::LastRow => Box::pin(
269                    FlatDedupReader::new(
270                        reader.into_stream().boxed(),
271                        FlatLastRow::new(stream_ctx.input.filter_deleted),
272                        dedup_metrics_reporter,
273                    )
274                    .into_stream(),
275                ) as _,
276                MergeMode::LastNonNull => Box::pin(
277                    FlatDedupReader::new(
278                        reader.into_stream().boxed(),
279                        FlatLastNonNull::new(
280                            mapper.field_column_start(),
281                            stream_ctx.input.filter_deleted,
282                        ),
283                        dedup_metrics_reporter,
284                    )
285                    .into_stream(),
286                ) as _,
287            }
288        } else {
289            Box::pin(reader.into_stream()) as _
290        };
291
292        let reader = match &stream_ctx.input.series_row_selector {
293            Some(TimeSeriesRowSelector::LastRow) => {
294                Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
295            }
296            None => reader,
297        };
298
299        Ok(reader)
300    }
301
302    /// Scans the given partition when the part list is set properly.
303    /// Otherwise the returned stream might not contains any data.
304    fn scan_partition_impl(
305        &self,
306        ctx: &QueryScanContext,
307        metrics_set: &ExecutionPlanMetricsSet,
308        partition: usize,
309    ) -> Result<SendableRecordBatchStream> {
310        if ctx.explain_verbose {
311            common_telemetry::info!(
312                "SeqScan partition {}, region_id: {}",
313                partition,
314                self.stream_ctx.input.region_metadata().region_id
315            );
316        }
317
318        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
319        let input = &self.stream_ctx.input;
320
321        let batch_stream = if input.flat_format {
322            // Use flat scan for bulk memtables
323            self.scan_flat_batch_in_partition(partition, metrics.clone())?
324        } else {
325            // Use regular batch scan for normal memtables
326            self.scan_batch_in_partition(partition, metrics.clone())?
327        };
328        let record_batch_stream = ConvertBatchStream::new(
329            batch_stream,
330            input.mapper.clone(),
331            input.cache_strategy.clone(),
332            metrics,
333        );
334
335        Ok(Box::pin(RecordBatchStreamWrapper::new(
336            input.mapper.output_schema(),
337            Box::pin(record_batch_stream),
338        )))
339    }
340
341    #[tracing::instrument(
342        skip_all,
343        fields(
344            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
345            partition = partition
346        )
347    )]
348    fn scan_batch_in_partition(
349        &self,
350        partition: usize,
351        part_metrics: PartitionMetrics,
352    ) -> Result<ScanBatchStream> {
353        ensure!(
354            partition < self.properties.partitions.len(),
355            PartitionOutOfRangeSnafu {
356                given: partition,
357                all: self.properties.partitions.len(),
358            }
359        );
360
361        if self.properties.partitions[partition].is_empty() {
362            return Ok(Box::pin(futures::stream::empty()));
363        }
364
365        let stream_ctx = self.stream_ctx.clone();
366        let semaphore = self.new_semaphore();
367        let partition_ranges = self.properties.partitions[partition].clone();
368        let compaction = self.stream_ctx.input.compaction;
369        let distinguish_range = self.properties.distinguish_partition_range;
370        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
371        let pruner = self.pruner.clone();
372        // Initializes ref counts for the pruner.
373        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
374        // then the ref count won't be decremented.
375        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
376        pruner.add_partition_ranges(&partition_ranges);
377        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
378
379        let stream = try_stream! {
380            part_metrics.on_first_poll();
381            // Start fetch time before building sources so scan cost contains
382            // build part cost.
383            let mut fetch_start = Instant::now();
384
385            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
386                reason: "Unexpected format",
387            })?;
388            // Scans each part.
389            for part_range in partition_ranges {
390                let mut sources = Vec::new();
391                build_sources(
392                    &stream_ctx,
393                    &part_range,
394                    compaction,
395                    &part_metrics,
396                    partition_pruner.clone(),
397                    &mut sources,
398                    file_scan_semaphore.clone(),
399                ).await?;
400
401                let mut reader =
402                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
403                        .await?;
404                #[cfg(debug_assertions)]
405                let mut checker = crate::read::BatchChecker::default()
406                    .with_start(Some(part_range.start))
407                    .with_end(Some(part_range.end));
408
409                let mut metrics = ScannerMetrics {
410                    scan_cost: fetch_start.elapsed(),
411                    ..Default::default()
412                };
413                fetch_start = Instant::now();
414
415                while let Some(batch) = reader.next_batch().await? {
416                    metrics.scan_cost += fetch_start.elapsed();
417                    metrics.num_batches += 1;
418                    metrics.num_rows += batch.num_rows();
419
420                    debug_assert!(!batch.is_empty());
421                    if batch.is_empty() {
422                        fetch_start = Instant::now();
423                        continue;
424                    }
425
426                    #[cfg(debug_assertions)]
427                    checker.ensure_part_range_batch(
428                        "SeqScan",
429                        _mapper.metadata().region_id,
430                        partition,
431                        part_range,
432                        &batch,
433                    );
434
435                    let yield_start = Instant::now();
436                    yield ScanBatch::Normal(batch);
437                    metrics.yield_cost += yield_start.elapsed();
438
439                    fetch_start = Instant::now();
440                }
441
442                // Yields an empty part to indicate this range is terminated.
443                // The query engine can use this to optimize some queries.
444                if distinguish_range {
445                    let yield_start = Instant::now();
446                    yield ScanBatch::Normal(Batch::empty());
447                    metrics.yield_cost += yield_start.elapsed();
448                }
449
450                metrics.scan_cost += fetch_start.elapsed();
451                fetch_start = Instant::now();
452                part_metrics.merge_metrics(&metrics);
453            }
454
455            part_metrics.on_finish();
456        };
457        Ok(Box::pin(stream))
458    }
459
460    #[tracing::instrument(
461        skip_all,
462        fields(
463            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
464            partition = partition
465        )
466    )]
467    fn scan_flat_batch_in_partition(
468        &self,
469        partition: usize,
470        part_metrics: PartitionMetrics,
471    ) -> Result<ScanBatchStream> {
472        ensure!(
473            partition < self.properties.partitions.len(),
474            PartitionOutOfRangeSnafu {
475                given: partition,
476                all: self.properties.partitions.len(),
477            }
478        );
479
480        if self.properties.partitions[partition].is_empty() {
481            return Ok(Box::pin(futures::stream::empty()));
482        }
483
484        let stream_ctx = self.stream_ctx.clone();
485        let semaphore = self.new_semaphore();
486        let partition_ranges = self.properties.partitions[partition].clone();
487        let compaction = self.stream_ctx.input.compaction;
488        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
489        let pruner = self.pruner.clone();
490        // Initializes ref counts for the pruner.
491        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
492        // then the ref count won't be decremented.
493        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
494        pruner.add_partition_ranges(&partition_ranges);
495        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
496
497        let stream = try_stream! {
498            part_metrics.on_first_poll();
499            // Start fetch time before building sources so scan cost contains
500            // build part cost.
501            let mut fetch_start = Instant::now();
502
503            // Scans each part.
504            for part_range in partition_ranges {
505                let mut sources = Vec::new();
506                build_flat_sources(
507                    &stream_ctx,
508                    &part_range,
509                    compaction,
510                    &part_metrics,
511                    partition_pruner.clone(),
512                    &mut sources,
513                    file_scan_semaphore.clone(),
514                ).await?;
515
516                let mut reader =
517                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
518                        .await?;
519
520                let mut metrics = ScannerMetrics {
521                    scan_cost: fetch_start.elapsed(),
522                    ..Default::default()
523                };
524                fetch_start = Instant::now();
525
526                while let Some(record_batch) = reader.try_next().await? {
527                    metrics.scan_cost += fetch_start.elapsed();
528                    metrics.num_batches += 1;
529                    metrics.num_rows += record_batch.num_rows();
530
531                    debug_assert!(record_batch.num_rows() > 0);
532                    if record_batch.num_rows() == 0 {
533                        fetch_start = Instant::now();
534                        continue;
535                    }
536
537                    let yield_start = Instant::now();
538                    yield ScanBatch::RecordBatch(record_batch);
539                    metrics.yield_cost += yield_start.elapsed();
540
541                    fetch_start = Instant::now();
542                }
543
544                metrics.scan_cost += fetch_start.elapsed();
545                fetch_start = Instant::now();
546                part_metrics.merge_metrics(&metrics);
547            }
548
549            part_metrics.on_finish();
550        };
551        Ok(Box::pin(stream))
552    }
553
554    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
555        if self.properties.target_partitions() > self.properties.num_partitions() {
556            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
557            // This semaphore is partition level.
558            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
559            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
560            // files in a part range.
561            Some(Arc::new(Semaphore::new(
562                self.properties.target_partitions() - self.properties.num_partitions() + 1,
563            )))
564        } else {
565            None
566        }
567    }
568
569    /// Creates a new partition metrics instance.
570    /// Sets the partition metrics for the given partition if it is not for compaction.
571    fn new_partition_metrics(
572        &self,
573        explain_verbose: bool,
574        metrics_set: &ExecutionPlanMetricsSet,
575        partition: usize,
576    ) -> PartitionMetrics {
577        let metrics = PartitionMetrics::new(
578            self.stream_ctx.input.mapper.metadata().region_id,
579            partition,
580            get_scanner_type(self.stream_ctx.input.compaction),
581            self.stream_ctx.query_start,
582            explain_verbose,
583            metrics_set,
584        );
585
586        if !self.stream_ctx.input.compaction {
587            self.metrics_list.set(partition, metrics.clone());
588        }
589
590        metrics
591    }
592
593    /// Finds the maximum number of files to read in a single partition range.
594    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
595        partition_ranges
596            .iter()
597            .map(|part_range| {
598                let range_meta = &ranges[part_range.identifier];
599                range_meta.indices.len()
600            })
601            .max()
602            .unwrap_or(0)
603    }
604
605    /// Checks resource limit for the scanner.
606    pub(crate) fn check_scan_limit(&self) -> Result<()> {
607        // Check max file count limit for all partitions since we scan them in parallel.
608        let total_max_files: usize = self
609            .properties
610            .partitions
611            .iter()
612            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
613            .sum();
614
615        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
616        if total_max_files > max_concurrent_files {
617            return TooManyFilesToReadSnafu {
618                actual: total_max_files,
619                max: max_concurrent_files,
620            }
621            .fail();
622        }
623
624        Ok(())
625    }
626}
627
628impl RegionScanner for SeqScan {
629    fn name(&self) -> &str {
630        "SeqScan"
631    }
632
633    fn properties(&self) -> &ScannerProperties {
634        &self.properties
635    }
636
637    fn schema(&self) -> SchemaRef {
638        self.stream_ctx.input.mapper.output_schema()
639    }
640
641    fn metadata(&self) -> RegionMetadataRef {
642        self.stream_ctx.input.mapper.metadata().clone()
643    }
644
645    fn scan_partition(
646        &self,
647        ctx: &QueryScanContext,
648        metrics_set: &ExecutionPlanMetricsSet,
649        partition: usize,
650    ) -> Result<SendableRecordBatchStream, BoxedError> {
651        self.scan_partition_impl(ctx, metrics_set, partition)
652            .map_err(BoxedError::new)
653    }
654
655    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
656        self.properties.prepare(request);
657
658        self.check_scan_limit().map_err(BoxedError::new)?;
659
660        Ok(())
661    }
662
663    fn has_predicate_without_region(&self) -> bool {
664        let predicate = self
665            .stream_ctx
666            .input
667            .predicate_group()
668            .predicate_without_region();
669        predicate.is_some()
670    }
671
672    fn add_dyn_filter_to_predicate(
673        &mut self,
674        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
675    ) -> Vec<bool> {
676        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
677    }
678
679    fn set_logical_region(&mut self, logical_region: bool) {
680        self.properties.set_logical_region(logical_region);
681    }
682}
683
684impl DisplayAs for SeqScan {
685    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
686        write!(
687            f,
688            "SeqScan: region={}, ",
689            self.stream_ctx.input.mapper.metadata().region_id
690        )?;
691        match t {
692            // TODO(LFC): Implement all the "TreeRender" display format.
693            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
694                self.stream_ctx.format_for_explain(false, f)
695            }
696            DisplayFormatType::Verbose => {
697                self.stream_ctx.format_for_explain(true, f)?;
698                self.metrics_list.format_verbose_metrics(f)
699            }
700        }
701    }
702}
703
704impl fmt::Debug for SeqScan {
705    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
706        f.debug_struct("SeqScan")
707            .field("num_ranges", &self.stream_ctx.ranges.len())
708            .finish()
709    }
710}
711
712/// Builds sources for the partition range and push them to the `sources` vector.
713pub(crate) async fn build_sources(
714    stream_ctx: &Arc<StreamContext>,
715    part_range: &PartitionRange,
716    compaction: bool,
717    part_metrics: &PartitionMetrics,
718    partition_pruner: Arc<PartitionPruner>,
719    sources: &mut Vec<Source>,
720    semaphore: Option<Arc<Semaphore>>,
721) -> Result<()> {
722    // Gets range meta.
723    let range_meta = &stream_ctx.ranges[part_range.identifier];
724    #[cfg(debug_assertions)]
725    if compaction {
726        // Compaction expects input sources are not been split.
727        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
728        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
729            // It should scan all row groups.
730            debug_assert_eq!(
731                -1, row_group_idx.row_group_index,
732                "Expect {} range scan all row groups, given: {}",
733                i, row_group_idx.row_group_index,
734            );
735        }
736    }
737
738    let read_type = if compaction {
739        "compaction"
740    } else {
741        "seq_scan_files"
742    };
743    let num_indices = range_meta.row_group_indices.len();
744    if num_indices == 0 {
745        return Ok(());
746    }
747
748    sources.reserve(num_indices);
749    let mut ordered_sources = Vec::with_capacity(num_indices);
750    ordered_sources.resize_with(num_indices, || None);
751    let mut file_scan_tasks = Vec::new();
752
753    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
754        if stream_ctx.is_mem_range_index(*index) {
755            let stream = scan_mem_ranges(
756                stream_ctx.clone(),
757                part_metrics.clone(),
758                *index,
759                range_meta.time_range,
760            );
761            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
762        } else if stream_ctx.is_file_range_index(*index) {
763            if let Some(semaphore_ref) = semaphore.as_ref() {
764                // run in parallel, controlled by semaphore
765                let stream_ctx = stream_ctx.clone();
766                let part_metrics = part_metrics.clone();
767                let partition_pruner = partition_pruner.clone();
768                let semaphore = Arc::clone(semaphore_ref);
769                let row_group_index = *index;
770                file_scan_tasks.push(async move {
771                    let _permit = semaphore.acquire().await.unwrap();
772                    let stream = scan_file_ranges(
773                        stream_ctx,
774                        part_metrics,
775                        row_group_index,
776                        read_type,
777                        partition_pruner,
778                    )
779                    .await?;
780                    Ok((position, Source::Stream(Box::pin(stream) as _)))
781                });
782            } else {
783                // no semaphore, run sequentially
784                let stream = scan_file_ranges(
785                    stream_ctx.clone(),
786                    part_metrics.clone(),
787                    *index,
788                    read_type,
789                    partition_pruner.clone(),
790                )
791                .await?;
792                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
793            }
794        } else {
795            let stream =
796                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
797            ordered_sources[position] = Some(Source::Stream(stream));
798        }
799    }
800
801    if !file_scan_tasks.is_empty() {
802        let results = futures::future::try_join_all(file_scan_tasks).await?;
803        for (position, source) in results {
804            ordered_sources[position] = Some(source);
805        }
806    }
807
808    for source in ordered_sources.into_iter().flatten() {
809        sources.push(source);
810    }
811    Ok(())
812}
813
814/// Builds flat sources for the partition range and push them to the `sources` vector.
815pub(crate) async fn build_flat_sources(
816    stream_ctx: &Arc<StreamContext>,
817    part_range: &PartitionRange,
818    compaction: bool,
819    part_metrics: &PartitionMetrics,
820    partition_pruner: Arc<PartitionPruner>,
821    sources: &mut Vec<BoxedRecordBatchStream>,
822    semaphore: Option<Arc<Semaphore>>,
823) -> Result<()> {
824    // Gets range meta.
825    let range_meta = &stream_ctx.ranges[part_range.identifier];
826    #[cfg(debug_assertions)]
827    if compaction {
828        // Compaction expects input sources are not been split.
829        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
830        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
831            // It should scan all row groups.
832            debug_assert_eq!(
833                -1, row_group_idx.row_group_index,
834                "Expect {} range scan all row groups, given: {}",
835                i, row_group_idx.row_group_index,
836            );
837        }
838    }
839
840    let read_type = if compaction {
841        "compaction"
842    } else {
843        "seq_scan_files"
844    };
845    let num_indices = range_meta.row_group_indices.len();
846    if num_indices == 0 {
847        return Ok(());
848    }
849
850    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
851    sources.reserve(num_indices);
852    let mut ordered_sources = Vec::with_capacity(num_indices);
853    ordered_sources.resize_with(num_indices, || None);
854    let mut file_scan_tasks = Vec::new();
855
856    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
857        if stream_ctx.is_mem_range_index(*index) {
858            let stream = scan_flat_mem_ranges(
859                stream_ctx.clone(),
860                part_metrics.clone(),
861                *index,
862                range_meta.time_range,
863            );
864            ordered_sources[position] = Some(Box::pin(stream) as _);
865        } else if stream_ctx.is_file_range_index(*index) {
866            if let Some(semaphore_ref) = semaphore.as_ref() {
867                // run in parallel, controlled by semaphore
868                let stream_ctx = stream_ctx.clone();
869                let part_metrics = part_metrics.clone();
870                let partition_pruner = partition_pruner.clone();
871                let semaphore = Arc::clone(semaphore_ref);
872                let row_group_index = *index;
873                file_scan_tasks.push(async move {
874                    let _permit = semaphore.acquire().await.unwrap();
875                    let stream = scan_flat_file_ranges(
876                        stream_ctx,
877                        part_metrics,
878                        row_group_index,
879                        read_type,
880                        partition_pruner,
881                    )
882                    .await?;
883                    Ok((position, Box::pin(stream) as _))
884                });
885            } else {
886                // no semaphore, run sequentially
887                let stream = scan_flat_file_ranges(
888                    stream_ctx.clone(),
889                    part_metrics.clone(),
890                    *index,
891                    read_type,
892                    partition_pruner.clone(),
893                )
894                .await?;
895                ordered_sources[position] = Some(Box::pin(stream) as _);
896            }
897        } else {
898            let stream =
899                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
900            ordered_sources[position] = Some(stream);
901        }
902    }
903
904    if !file_scan_tasks.is_empty() {
905        let results = futures::future::try_join_all(file_scan_tasks).await?;
906        for (position, stream) in results {
907            ordered_sources[position] = Some(stream);
908        }
909    }
910
911    for stream in ordered_sources.into_iter().flatten() {
912        if should_split {
913            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
914        } else {
915            sources.push(stream);
916        }
917    }
918
919    if should_split {
920        common_telemetry::debug!(
921            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
922            stream_ctx.input.region_metadata().region_id,
923            sources.len(),
924            part_range,
925        );
926    }
927
928    Ok(())
929}
930
931#[cfg(test)]
932impl SeqScan {
933    /// Returns the input.
934    pub(crate) fn input(&self) -> &ScanInput {
935        &self.stream_ctx.input
936    }
937}
938
939/// Returns the scanner type.
940fn get_scanner_type(compaction: bool) -> &'static str {
941    if compaction {
942        "SeqScan(compaction)"
943    } else {
944        "SeqScan"
945    }
946}