mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49    should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58/// Scans a region and returns rows in a sorted sequence.
59///
60/// The output order is always `order by primary keys, time index` inside every
61/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
62pub struct SeqScan {
63    /// Properties of the scanner.
64    properties: ScannerProperties,
65    /// Context of streams.
66    stream_ctx: Arc<StreamContext>,
67    /// Metrics for each partition.
68    /// The scanner only sets in query and keeps it empty during compaction.
69    metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73    /// Creates a new [SeqScan] with the given input.
74    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            metrics_list: PartitionMetricsList::default(),
86        }
87    }
88
89    /// Builds a stream for the query.
90    ///
91    /// The returned stream is not partitioned and will contains all the data. If want
92    /// partitioned scan, use [`RegionScanner::scan_partition`].
93    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
94        let metrics_set = ExecutionPlanMetricsSet::new();
95        let streams = (0..self.properties.partitions.len())
96            .map(|partition: usize| {
97                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
98            })
99            .collect::<Result<Vec<_>, _>>()?;
100
101        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
102        Ok(Box::pin(aggr_stream))
103    }
104
105    /// Scan [`Batch`] in all partitions one by one.
106    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
107        let metrics_set = ExecutionPlanMetricsSet::new();
108
109        let streams = (0..self.properties.partitions.len())
110            .map(|partition| {
111                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
112                self.scan_batch_in_partition(partition, metrics)
113            })
114            .collect::<Result<Vec<_>>>()?;
115
116        Ok(Box::pin(futures::stream::iter(streams).flatten()))
117    }
118
119    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
120    ///
121    /// # Panics
122    /// Panics if the compaction flag is not set.
123    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
124        assert!(self.stream_ctx.input.compaction);
125
126        let metrics_set = ExecutionPlanMetricsSet::new();
127        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
128        debug_assert_eq!(1, self.properties.partitions.len());
129        let partition_ranges = &self.properties.partitions[0];
130
131        let reader = Self::merge_all_ranges_for_compaction(
132            &self.stream_ctx,
133            partition_ranges,
134            &part_metrics,
135        )
136        .await?;
137        Ok(Box::new(reader))
138    }
139
140    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
141    ///
142    /// # Panics
143    /// Panics if the compaction flag is not set.
144    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
145        assert!(self.stream_ctx.input.compaction);
146
147        let metrics_set = ExecutionPlanMetricsSet::new();
148        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
149        debug_assert_eq!(1, self.properties.partitions.len());
150        let partition_ranges = &self.properties.partitions[0];
151
152        let reader = Self::merge_all_flat_ranges_for_compaction(
153            &self.stream_ctx,
154            partition_ranges,
155            &part_metrics,
156        )
157        .await?;
158        Ok(reader)
159    }
160
161    /// Builds a merge reader that reads all ranges.
162    /// Callers MUST not split ranges before calling this method.
163    async fn merge_all_ranges_for_compaction(
164        stream_ctx: &Arc<StreamContext>,
165        partition_ranges: &[PartitionRange],
166        part_metrics: &PartitionMetrics,
167    ) -> Result<BoxedBatchReader> {
168        let mut sources = Vec::new();
169        let range_builder_list = Arc::new(RangeBuilderList::new(
170            stream_ctx.input.num_memtables(),
171            stream_ctx.input.num_files(),
172        ));
173        for part_range in partition_ranges {
174            build_sources(
175                stream_ctx,
176                part_range,
177                true,
178                part_metrics,
179                range_builder_list.clone(),
180                &mut sources,
181                None,
182            )
183            .await?;
184        }
185
186        common_telemetry::debug!(
187            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
188            stream_ctx.input.mapper.metadata().region_id,
189            partition_ranges.len(),
190            sources.len()
191        );
192        Self::build_reader_from_sources(stream_ctx, sources, None, None).await
193    }
194
195    /// Builds a merge reader that reads all flat ranges.
196    /// Callers MUST not split ranges before calling this method.
197    async fn merge_all_flat_ranges_for_compaction(
198        stream_ctx: &Arc<StreamContext>,
199        partition_ranges: &[PartitionRange],
200        part_metrics: &PartitionMetrics,
201    ) -> Result<BoxedRecordBatchStream> {
202        let mut sources = Vec::new();
203        let range_builder_list = Arc::new(RangeBuilderList::new(
204            stream_ctx.input.num_memtables(),
205            stream_ctx.input.num_files(),
206        ));
207        for part_range in partition_ranges {
208            build_flat_sources(
209                stream_ctx,
210                part_range,
211                true,
212                part_metrics,
213                range_builder_list.clone(),
214                &mut sources,
215                None,
216            )
217            .await?;
218        }
219
220        common_telemetry::debug!(
221            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222            stream_ctx.input.mapper.metadata().region_id,
223            partition_ranges.len(),
224            sources.len()
225        );
226        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
227    }
228
229    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
230    /// if possible.
231    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232    pub(crate) async fn build_reader_from_sources(
233        stream_ctx: &StreamContext,
234        mut sources: Vec<Source>,
235        semaphore: Option<Arc<Semaphore>>,
236        part_metrics: Option<&PartitionMetrics>,
237    ) -> Result<BoxedBatchReader> {
238        if let Some(semaphore) = semaphore.as_ref() {
239            // Read sources in parallel.
240            if sources.len() > 1 {
241                sources = stream_ctx
242                    .input
243                    .create_parallel_sources(sources, semaphore.clone())?;
244            }
245        }
246
247        let mut builder = MergeReaderBuilder::from_sources(sources);
248        if let Some(metrics) = part_metrics {
249            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
250        }
251        let reader = builder.build().await?;
252
253        let dedup = !stream_ctx.input.append_mode;
254        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
255        let reader = if dedup {
256            match stream_ctx.input.merge_mode {
257                MergeMode::LastRow => Box::new(DedupReader::new(
258                    reader,
259                    LastRow::new(stream_ctx.input.filter_deleted),
260                    dedup_metrics_reporter,
261                )) as _,
262                MergeMode::LastNonNull => Box::new(DedupReader::new(
263                    reader,
264                    LastNonNull::new(stream_ctx.input.filter_deleted),
265                    dedup_metrics_reporter,
266                )) as _,
267            }
268        } else {
269            Box::new(reader) as _
270        };
271
272        let reader = match &stream_ctx.input.series_row_selector {
273            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
274            None => reader,
275        };
276
277        Ok(reader)
278    }
279
280    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
281    /// if possible.
282    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
283    pub(crate) async fn build_flat_reader_from_sources(
284        stream_ctx: &StreamContext,
285        mut sources: Vec<BoxedRecordBatchStream>,
286        semaphore: Option<Arc<Semaphore>>,
287        part_metrics: Option<&PartitionMetrics>,
288    ) -> Result<BoxedRecordBatchStream> {
289        if let Some(semaphore) = semaphore.as_ref() {
290            // Read sources in parallel.
291            if sources.len() > 1 {
292                sources = stream_ctx
293                    .input
294                    .create_parallel_flat_sources(sources, semaphore.clone())?;
295            }
296        }
297
298        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
299        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
300
301        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
302        let reader =
303            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
304                .await?;
305
306        let dedup = !stream_ctx.input.append_mode;
307        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
308        let reader = if dedup {
309            match stream_ctx.input.merge_mode {
310                MergeMode::LastRow => Box::pin(
311                    FlatDedupReader::new(
312                        reader.into_stream().boxed(),
313                        FlatLastRow::new(stream_ctx.input.filter_deleted),
314                        dedup_metrics_reporter,
315                    )
316                    .into_stream(),
317                ) as _,
318                MergeMode::LastNonNull => Box::pin(
319                    FlatDedupReader::new(
320                        reader.into_stream().boxed(),
321                        FlatLastNonNull::new(
322                            mapper.field_column_start(),
323                            stream_ctx.input.filter_deleted,
324                        ),
325                        dedup_metrics_reporter,
326                    )
327                    .into_stream(),
328                ) as _,
329            }
330        } else {
331            Box::pin(reader.into_stream()) as _
332        };
333
334        Ok(reader)
335    }
336
337    /// Scans the given partition when the part list is set properly.
338    /// Otherwise the returned stream might not contains any data.
339    fn scan_partition_impl(
340        &self,
341        ctx: &QueryScanContext,
342        metrics_set: &ExecutionPlanMetricsSet,
343        partition: usize,
344    ) -> Result<SendableRecordBatchStream> {
345        if ctx.explain_verbose {
346            common_telemetry::info!(
347                "SeqScan partition {}, region_id: {}",
348                partition,
349                self.stream_ctx.input.region_metadata().region_id
350            );
351        }
352
353        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
354        let input = &self.stream_ctx.input;
355
356        let batch_stream = if input.flat_format {
357            // Use flat scan for bulk memtables
358            self.scan_flat_batch_in_partition(partition, metrics.clone())?
359        } else {
360            // Use regular batch scan for normal memtables
361            self.scan_batch_in_partition(partition, metrics.clone())?
362        };
363        let record_batch_stream = ConvertBatchStream::new(
364            batch_stream,
365            input.mapper.clone(),
366            input.cache_strategy.clone(),
367            metrics,
368        );
369
370        Ok(Box::pin(RecordBatchStreamWrapper::new(
371            input.mapper.output_schema(),
372            Box::pin(record_batch_stream),
373        )))
374    }
375
376    fn scan_batch_in_partition(
377        &self,
378        partition: usize,
379        part_metrics: PartitionMetrics,
380    ) -> Result<ScanBatchStream> {
381        ensure!(
382            partition < self.properties.partitions.len(),
383            PartitionOutOfRangeSnafu {
384                given: partition,
385                all: self.properties.partitions.len(),
386            }
387        );
388
389        if self.properties.partitions[partition].is_empty() {
390            return Ok(Box::pin(futures::stream::empty()));
391        }
392
393        let stream_ctx = self.stream_ctx.clone();
394        let semaphore = self.new_semaphore();
395        let partition_ranges = self.properties.partitions[partition].clone();
396        let compaction = self.stream_ctx.input.compaction;
397        let distinguish_range = self.properties.distinguish_partition_range;
398        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
399
400        let stream = try_stream! {
401            part_metrics.on_first_poll();
402
403            let range_builder_list = Arc::new(RangeBuilderList::new(
404                stream_ctx.input.num_memtables(),
405                stream_ctx.input.num_files(),
406            ));
407            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
408                reason: "Unexpected format",
409            })?;
410            // Scans each part.
411            for part_range in partition_ranges {
412                let mut sources = Vec::new();
413                build_sources(
414                    &stream_ctx,
415                    &part_range,
416                    compaction,
417                    &part_metrics,
418                    range_builder_list.clone(),
419                    &mut sources,
420                    file_scan_semaphore.clone(),
421                ).await?;
422
423                let mut metrics = ScannerMetrics::default();
424                let mut fetch_start = Instant::now();
425                let mut reader =
426                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
427                        .await?;
428                #[cfg(debug_assertions)]
429                let mut checker = crate::read::BatchChecker::default()
430                    .with_start(Some(part_range.start))
431                    .with_end(Some(part_range.end));
432
433                while let Some(batch) = reader.next_batch().await? {
434                    metrics.scan_cost += fetch_start.elapsed();
435                    metrics.num_batches += 1;
436                    metrics.num_rows += batch.num_rows();
437
438                    debug_assert!(!batch.is_empty());
439                    if batch.is_empty() {
440                        continue;
441                    }
442
443                    #[cfg(debug_assertions)]
444                    checker.ensure_part_range_batch(
445                        "SeqScan",
446                        _mapper.metadata().region_id,
447                        partition,
448                        part_range,
449                        &batch,
450                    );
451
452                    let yield_start = Instant::now();
453                    yield ScanBatch::Normal(batch);
454                    metrics.yield_cost += yield_start.elapsed();
455
456                    fetch_start = Instant::now();
457                }
458
459                // Yields an empty part to indicate this range is terminated.
460                // The query engine can use this to optimize some queries.
461                if distinguish_range {
462                    let yield_start = Instant::now();
463                    yield ScanBatch::Normal(Batch::empty());
464                    metrics.yield_cost += yield_start.elapsed();
465                }
466
467                metrics.scan_cost += fetch_start.elapsed();
468                part_metrics.merge_metrics(&metrics);
469            }
470
471            part_metrics.on_finish();
472        };
473        Ok(Box::pin(stream))
474    }
475
476    fn scan_flat_batch_in_partition(
477        &self,
478        partition: usize,
479        part_metrics: PartitionMetrics,
480    ) -> Result<ScanBatchStream> {
481        ensure!(
482            partition < self.properties.partitions.len(),
483            PartitionOutOfRangeSnafu {
484                given: partition,
485                all: self.properties.partitions.len(),
486            }
487        );
488
489        if self.properties.partitions[partition].is_empty() {
490            return Ok(Box::pin(futures::stream::empty()));
491        }
492
493        let stream_ctx = self.stream_ctx.clone();
494        let semaphore = self.new_semaphore();
495        let partition_ranges = self.properties.partitions[partition].clone();
496        let compaction = self.stream_ctx.input.compaction;
497        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
498
499        let stream = try_stream! {
500            part_metrics.on_first_poll();
501
502            let range_builder_list = Arc::new(RangeBuilderList::new(
503                stream_ctx.input.num_memtables(),
504                stream_ctx.input.num_files(),
505            ));
506            // Scans each part.
507            for part_range in partition_ranges {
508                let mut sources = Vec::new();
509                build_flat_sources(
510                    &stream_ctx,
511                    &part_range,
512                    compaction,
513                    &part_metrics,
514                    range_builder_list.clone(),
515                    &mut sources,
516                    file_scan_semaphore.clone(),
517                ).await?;
518
519                let mut metrics = ScannerMetrics::default();
520                let mut fetch_start = Instant::now();
521                let mut reader =
522                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
523                        .await?;
524
525                while let Some(record_batch) = reader.try_next().await? {
526                    metrics.scan_cost += fetch_start.elapsed();
527                    metrics.num_batches += 1;
528                    metrics.num_rows += record_batch.num_rows();
529
530                    debug_assert!(record_batch.num_rows() > 0);
531                    if record_batch.num_rows() == 0 {
532                        continue;
533                    }
534
535                    let yield_start = Instant::now();
536                    yield ScanBatch::RecordBatch(record_batch);
537                    metrics.yield_cost += yield_start.elapsed();
538
539                    fetch_start = Instant::now();
540                }
541
542                metrics.scan_cost += fetch_start.elapsed();
543                part_metrics.merge_metrics(&metrics);
544            }
545
546            part_metrics.on_finish();
547        };
548        Ok(Box::pin(stream))
549    }
550
551    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
552        if self.properties.target_partitions() > self.properties.num_partitions() {
553            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
554            // This semaphore is partition level.
555            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
556            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
557            // files in a part range.
558            Some(Arc::new(Semaphore::new(
559                self.properties.target_partitions() - self.properties.num_partitions() + 1,
560            )))
561        } else {
562            None
563        }
564    }
565
566    /// Creates a new partition metrics instance.
567    /// Sets the partition metrics for the given partition if it is not for compaction.
568    fn new_partition_metrics(
569        &self,
570        explain_verbose: bool,
571        metrics_set: &ExecutionPlanMetricsSet,
572        partition: usize,
573    ) -> PartitionMetrics {
574        let metrics = PartitionMetrics::new(
575            self.stream_ctx.input.mapper.metadata().region_id,
576            partition,
577            get_scanner_type(self.stream_ctx.input.compaction),
578            self.stream_ctx.query_start,
579            explain_verbose,
580            metrics_set,
581        );
582
583        if !self.stream_ctx.input.compaction {
584            self.metrics_list.set(partition, metrics.clone());
585        }
586
587        metrics
588    }
589
590    /// Finds the maximum number of files to read in a single partition range.
591    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
592        partition_ranges
593            .iter()
594            .map(|part_range| {
595                let range_meta = &ranges[part_range.identifier];
596                range_meta.indices.len()
597            })
598            .max()
599            .unwrap_or(0)
600    }
601
602    /// Checks resource limit for the scanner.
603    pub(crate) fn check_scan_limit(&self) -> Result<()> {
604        // Check max file count limit for all partitions since we scan them in parallel.
605        let total_max_files: usize = self
606            .properties
607            .partitions
608            .iter()
609            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
610            .sum();
611
612        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
613        if total_max_files > max_concurrent_files {
614            return TooManyFilesToReadSnafu {
615                actual: total_max_files,
616                max: max_concurrent_files,
617            }
618            .fail();
619        }
620
621        Ok(())
622    }
623}
624
625impl RegionScanner for SeqScan {
626    fn name(&self) -> &str {
627        "SeqScan"
628    }
629
630    fn properties(&self) -> &ScannerProperties {
631        &self.properties
632    }
633
634    fn schema(&self) -> SchemaRef {
635        self.stream_ctx.input.mapper.output_schema()
636    }
637
638    fn metadata(&self) -> RegionMetadataRef {
639        self.stream_ctx.input.mapper.metadata().clone()
640    }
641
642    fn scan_partition(
643        &self,
644        ctx: &QueryScanContext,
645        metrics_set: &ExecutionPlanMetricsSet,
646        partition: usize,
647    ) -> Result<SendableRecordBatchStream, BoxedError> {
648        self.scan_partition_impl(ctx, metrics_set, partition)
649            .map_err(BoxedError::new)
650    }
651
652    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
653        self.properties.prepare(request);
654
655        self.check_scan_limit().map_err(BoxedError::new)?;
656
657        Ok(())
658    }
659
660    fn has_predicate_without_region(&self) -> bool {
661        let predicate = self
662            .stream_ctx
663            .input
664            .predicate_group()
665            .predicate_without_region();
666        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
667    }
668
669    fn set_logical_region(&mut self, logical_region: bool) {
670        self.properties.set_logical_region(logical_region);
671    }
672}
673
674impl DisplayAs for SeqScan {
675    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
676        write!(
677            f,
678            "SeqScan: region={}, ",
679            self.stream_ctx.input.mapper.metadata().region_id
680        )?;
681        match t {
682            // TODO(LFC): Implement all the "TreeRender" display format.
683            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
684                self.stream_ctx.format_for_explain(false, f)
685            }
686            DisplayFormatType::Verbose => {
687                self.stream_ctx.format_for_explain(true, f)?;
688                self.metrics_list.format_verbose_metrics(f)
689            }
690        }
691    }
692}
693
694impl fmt::Debug for SeqScan {
695    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
696        f.debug_struct("SeqScan")
697            .field("num_ranges", &self.stream_ctx.ranges.len())
698            .finish()
699    }
700}
701
702/// Builds sources for the partition range and push them to the `sources` vector.
703pub(crate) async fn build_sources(
704    stream_ctx: &Arc<StreamContext>,
705    part_range: &PartitionRange,
706    compaction: bool,
707    part_metrics: &PartitionMetrics,
708    range_builder_list: Arc<RangeBuilderList>,
709    sources: &mut Vec<Source>,
710    semaphore: Option<Arc<Semaphore>>,
711) -> Result<()> {
712    // Gets range meta.
713    let range_meta = &stream_ctx.ranges[part_range.identifier];
714    #[cfg(debug_assertions)]
715    if compaction {
716        // Compaction expects input sources are not been split.
717        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
718        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
719            // It should scan all row groups.
720            debug_assert_eq!(
721                -1, row_group_idx.row_group_index,
722                "Expect {} range scan all row groups, given: {}",
723                i, row_group_idx.row_group_index,
724            );
725        }
726    }
727
728    let read_type = if compaction {
729        "compaction"
730    } else {
731        "seq_scan_files"
732    };
733    let num_indices = range_meta.row_group_indices.len();
734    if num_indices == 0 {
735        return Ok(());
736    }
737
738    sources.reserve(num_indices);
739    let mut ordered_sources = Vec::with_capacity(num_indices);
740    ordered_sources.resize_with(num_indices, || None);
741    let mut file_scan_tasks = Vec::new();
742
743    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
744        if stream_ctx.is_mem_range_index(*index) {
745            let stream = scan_mem_ranges(
746                stream_ctx.clone(),
747                part_metrics.clone(),
748                *index,
749                range_meta.time_range,
750            );
751            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
752        } else if stream_ctx.is_file_range_index(*index) {
753            if let Some(semaphore_ref) = semaphore.as_ref() {
754                // run in parallel, controlled by semaphore
755                let stream_ctx = stream_ctx.clone();
756                let part_metrics = part_metrics.clone();
757                let range_builder_list = range_builder_list.clone();
758                let semaphore = Arc::clone(semaphore_ref);
759                let row_group_index = *index;
760                file_scan_tasks.push(async move {
761                    let _permit = semaphore.acquire().await.unwrap();
762                    let stream = scan_file_ranges(
763                        stream_ctx,
764                        part_metrics,
765                        row_group_index,
766                        read_type,
767                        range_builder_list,
768                    )
769                    .await?;
770                    Ok((position, Source::Stream(Box::pin(stream) as _)))
771                });
772            } else {
773                // no semaphore, run sequentially
774                let stream = scan_file_ranges(
775                    stream_ctx.clone(),
776                    part_metrics.clone(),
777                    *index,
778                    read_type,
779                    range_builder_list.clone(),
780                )
781                .await?;
782                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
783            }
784        } else {
785            let stream =
786                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
787            ordered_sources[position] = Some(Source::Stream(stream));
788        }
789    }
790
791    if !file_scan_tasks.is_empty() {
792        let results = futures::future::try_join_all(file_scan_tasks).await?;
793        for (position, source) in results {
794            ordered_sources[position] = Some(source);
795        }
796    }
797
798    for source in ordered_sources.into_iter().flatten() {
799        sources.push(source);
800    }
801    Ok(())
802}
803
804/// Builds flat sources for the partition range and push them to the `sources` vector.
805pub(crate) async fn build_flat_sources(
806    stream_ctx: &Arc<StreamContext>,
807    part_range: &PartitionRange,
808    compaction: bool,
809    part_metrics: &PartitionMetrics,
810    range_builder_list: Arc<RangeBuilderList>,
811    sources: &mut Vec<BoxedRecordBatchStream>,
812    semaphore: Option<Arc<Semaphore>>,
813) -> Result<()> {
814    // Gets range meta.
815    let range_meta = &stream_ctx.ranges[part_range.identifier];
816    #[cfg(debug_assertions)]
817    if compaction {
818        // Compaction expects input sources are not been split.
819        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
820        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
821            // It should scan all row groups.
822            debug_assert_eq!(
823                -1, row_group_idx.row_group_index,
824                "Expect {} range scan all row groups, given: {}",
825                i, row_group_idx.row_group_index,
826            );
827        }
828    }
829
830    let read_type = if compaction {
831        "compaction"
832    } else {
833        "seq_scan_files"
834    };
835    let num_indices = range_meta.row_group_indices.len();
836    if num_indices == 0 {
837        return Ok(());
838    }
839
840    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
841    sources.reserve(num_indices);
842    let mut ordered_sources = Vec::with_capacity(num_indices);
843    ordered_sources.resize_with(num_indices, || None);
844    let mut file_scan_tasks = Vec::new();
845
846    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
847        if stream_ctx.is_mem_range_index(*index) {
848            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
849            ordered_sources[position] = Some(Box::pin(stream) as _);
850        } else if stream_ctx.is_file_range_index(*index) {
851            if let Some(semaphore_ref) = semaphore.as_ref() {
852                // run in parallel, controlled by semaphore
853                let stream_ctx = stream_ctx.clone();
854                let part_metrics = part_metrics.clone();
855                let range_builder_list = range_builder_list.clone();
856                let semaphore = Arc::clone(semaphore_ref);
857                let row_group_index = *index;
858                file_scan_tasks.push(async move {
859                    let _permit = semaphore.acquire().await.unwrap();
860                    let stream = scan_flat_file_ranges(
861                        stream_ctx,
862                        part_metrics,
863                        row_group_index,
864                        read_type,
865                        range_builder_list,
866                    )
867                    .await?;
868                    Ok((position, Box::pin(stream) as _))
869                });
870            } else {
871                // no semaphore, run sequentially
872                let stream = scan_flat_file_ranges(
873                    stream_ctx.clone(),
874                    part_metrics.clone(),
875                    *index,
876                    read_type,
877                    range_builder_list.clone(),
878                )
879                .await?;
880                ordered_sources[position] = Some(Box::pin(stream) as _);
881            }
882        } else {
883            let stream =
884                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
885            ordered_sources[position] = Some(stream);
886        }
887    }
888
889    if !file_scan_tasks.is_empty() {
890        let results = futures::future::try_join_all(file_scan_tasks).await?;
891        for (position, stream) in results {
892            ordered_sources[position] = Some(stream);
893        }
894    }
895
896    for stream in ordered_sources.into_iter().flatten() {
897        if should_split {
898            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
899        } else {
900            sources.push(stream);
901        }
902    }
903
904    if should_split {
905        common_telemetry::debug!(
906            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
907            stream_ctx.input.region_metadata().region_id,
908            sources.len(),
909            part_range,
910        );
911    }
912
913    Ok(())
914}
915
916#[cfg(test)]
917impl SeqScan {
918    /// Returns the input.
919    pub(crate) fn input(&self) -> &ScanInput {
920        &self.stream_ctx.input
921    }
922}
923
924/// Returns the scanner type.
925fn get_scanner_type(compaction: bool) -> &'static str {
926    if compaction {
927        "SeqScan(compaction)"
928    } else {
929        "SeqScan"
930    }
931}