Skip to main content

mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
39use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
40use crate::read::flat_merge::FlatMergeReader;
41use crate::read::last_row::FlatLastRowReader;
42use crate::read::pruner::{PartitionPruner, Pruner};
43use crate::read::range::RangeMeta;
44use crate::read::range_cache::{
45    build_range_cache_key, cache_flat_range_stream, cached_flat_range_stream,
46};
47use crate::read::scan_region::{ScanInput, StreamContext};
48use crate::read::scan_util::{
49    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
50    scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57/// Scans a region and returns rows in a sorted sequence.
58///
59/// The output order is always `order by primary keys, time index` inside every
60/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
61pub struct SeqScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// Shared pruner for file range building.
67    pruner: Arc<Pruner>,
68    /// Metrics for each partition.
69    /// The scanner only sets in query and keeps it empty during compaction.
70    metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74    /// Creates a new [SeqScan] with the given input.
75    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
76    pub(crate) fn new(input: ScanInput) -> Self {
77        let mut properties = ScannerProperties::default()
78            .with_append_mode(input.append_mode)
79            .with_total_rows(input.total_rows());
80        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
81        properties.partitions = vec![stream_ctx.partition_ranges()];
82
83        // Create the shared pruner with number of workers equal to CPU cores.
84        let num_workers = common_stat::get_total_cpu_cores().max(1);
85        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
86
87        Self {
88            properties,
89            stream_ctx,
90            pruner,
91            metrics_list: PartitionMetricsList::default(),
92        }
93    }
94
95    /// Builds a stream for the query.
96    ///
97    /// The returned stream is not partitioned and will contains all the data. If want
98    /// partitioned scan, use [`RegionScanner::scan_partition`].
99    #[tracing::instrument(
100        skip_all,
101        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
102    )]
103    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
104        let metrics_set = ExecutionPlanMetricsSet::new();
105        let streams = (0..self.properties.partitions.len())
106            .map(|partition: usize| {
107                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
108            })
109            .collect::<Result<Vec<_>, _>>()?;
110
111        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
112        Ok(Box::pin(aggr_stream))
113    }
114
115    /// Scan [`Batch`] in all partitions one by one.
116    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
117        let metrics_set = ExecutionPlanMetricsSet::new();
118
119        let streams = (0..self.properties.partitions.len())
120            .map(|partition| {
121                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
122                self.scan_flat_batch_in_partition(partition, metrics)
123            })
124            .collect::<Result<Vec<_>>>()?;
125
126        Ok(Box::pin(futures::stream::iter(streams).flatten()))
127    }
128
129    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
130    ///
131    /// # Panics
132    /// Panics if the compaction flag is not set.
133    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
134        assert!(self.stream_ctx.input.compaction);
135
136        let metrics_set = ExecutionPlanMetricsSet::new();
137        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
138        debug_assert_eq!(1, self.properties.partitions.len());
139        let partition_ranges = &self.properties.partitions[0];
140
141        let reader = Self::merge_all_flat_ranges_for_compaction(
142            &self.stream_ctx,
143            partition_ranges,
144            &part_metrics,
145            self.pruner.clone(),
146        )
147        .await?;
148        Ok(reader)
149    }
150
151    /// Builds a merge reader that reads all flat ranges.
152    /// Callers MUST not split ranges before calling this method.
153    async fn merge_all_flat_ranges_for_compaction(
154        stream_ctx: &Arc<StreamContext>,
155        partition_ranges: &[PartitionRange],
156        part_metrics: &PartitionMetrics,
157        pruner: Arc<Pruner>,
158    ) -> Result<BoxedRecordBatchStream> {
159        pruner.add_partition_ranges(partition_ranges);
160        let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
161
162        let mut sources = Vec::new();
163        for part_range in partition_ranges {
164            build_flat_sources(
165                stream_ctx,
166                part_range,
167                true,
168                part_metrics,
169                partition_pruner.clone(),
170                &mut sources,
171                None,
172            )
173            .await?;
174        }
175
176        common_telemetry::debug!(
177            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
178            stream_ctx.input.mapper.metadata().region_id,
179            partition_ranges.len(),
180            sources.len()
181        );
182        Self::build_flat_reader_from_sources(
183            stream_ctx,
184            sources,
185            None,
186            None,
187            false,
188            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
189        )
190        .await
191    }
192
193    /// Builds a flat reader to read sources that returns RecordBatch.
194    /// If `semaphore` is provided, reads sources in parallel if possible.
195    /// If `skip_dedup` is true, the merged stream is returned without applying flat dedup.
196    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
197    pub(crate) async fn build_flat_reader_from_sources(
198        stream_ctx: &StreamContext,
199        mut sources: Vec<BoxedRecordBatchStream>,
200        semaphore: Option<Arc<Semaphore>>,
201        part_metrics: Option<&PartitionMetrics>,
202        skip_dedup: bool,
203        channel_size: usize,
204    ) -> Result<BoxedRecordBatchStream> {
205        if let Some(semaphore) = semaphore.as_ref() {
206            // Read sources in parallel.
207            if sources.len() > 1 {
208                sources = stream_ctx.input.create_parallel_flat_sources(
209                    sources,
210                    semaphore.clone(),
211                    channel_size,
212                )?;
213            }
214        }
215
216        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
217        let reader: BoxedRecordBatchStream = if sources.len() == 1 {
218            // Currently, we can't skip dedup when there is only one source because
219            // that source may have duplicate rows.
220            sources.pop().unwrap()
221        } else {
222            let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
223            let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
224            let reader =
225                FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
226                    .await?;
227            Box::pin(reader.into_stream())
228        };
229
230        let dedup = !skip_dedup && !stream_ctx.input.append_mode;
231        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
232        let reader = if dedup {
233            match stream_ctx.input.merge_mode {
234                MergeMode::LastRow => Box::pin(
235                    FlatDedupReader::new(
236                        reader,
237                        FlatLastRow::new(stream_ctx.input.filter_deleted),
238                        dedup_metrics_reporter,
239                    )
240                    .into_stream(),
241                ) as _,
242                MergeMode::LastNonNull => Box::pin(
243                    FlatDedupReader::new(
244                        reader,
245                        FlatLastNonNull::new(
246                            mapper.field_column_start(),
247                            stream_ctx.input.filter_deleted,
248                        ),
249                        dedup_metrics_reporter,
250                    )
251                    .into_stream(),
252                ) as _,
253            }
254        } else {
255            reader
256        };
257
258        let reader = match &stream_ctx.input.series_row_selector {
259            Some(TimeSeriesRowSelector::LastRow) => {
260                Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
261            }
262            None => reader,
263        };
264
265        Ok(reader)
266    }
267
268    /// Builds a flat read stream for one partition range.
269    pub(crate) async fn build_flat_partition_range_read(
270        stream_ctx: &Arc<StreamContext>,
271        part_range: &PartitionRange,
272        compaction: bool,
273        part_metrics: &PartitionMetrics,
274        partition_pruner: Arc<PartitionPruner>,
275        file_scan_semaphore: Option<Arc<Semaphore>>,
276        merge_semaphore: Option<Arc<Semaphore>>,
277    ) -> Result<(BoxedRecordBatchStream, usize)> {
278        let cache_key = build_range_cache_key(stream_ctx, part_range);
279
280        if let Some(key) = cache_key.as_ref() {
281            if let Some(value) = stream_ctx.input.cache_strategy.get_range_result(key) {
282                part_metrics.inc_range_cache_hit();
283                return Ok((cached_flat_range_stream(value), DEFAULT_READ_BATCH_SIZE));
284            }
285            part_metrics.inc_range_cache_miss();
286        }
287
288        let mut sources = Vec::new();
289        let split_batch_size = build_flat_sources(
290            stream_ctx,
291            part_range,
292            compaction,
293            part_metrics,
294            partition_pruner,
295            &mut sources,
296            file_scan_semaphore,
297        )
298        .await?;
299        let estimated_rows_per_batch = split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE);
300        let channel_size = compute_parallel_channel_size(estimated_rows_per_batch);
301        let stream = Self::build_flat_reader_from_sources(
302            stream_ctx,
303            sources,
304            merge_semaphore,
305            Some(part_metrics),
306            false,
307            channel_size,
308        )
309        .await?;
310
311        let stream = match cache_key {
312            Some(key) => cache_flat_range_stream(
313                stream,
314                stream_ctx.input.cache_strategy.clone(),
315                key,
316                part_metrics.clone(),
317            ),
318            None => stream,
319        };
320
321        Ok((stream, estimated_rows_per_batch))
322    }
323
324    /// Scans the given partition when the part list is set properly.
325    /// Otherwise the returned stream might not contains any data.
326    fn scan_partition_impl(
327        &self,
328        ctx: &QueryScanContext,
329        metrics_set: &ExecutionPlanMetricsSet,
330        partition: usize,
331    ) -> Result<SendableRecordBatchStream> {
332        if ctx.explain_verbose {
333            common_telemetry::info!(
334                "SeqScan partition {}, region_id: {}",
335                partition,
336                self.stream_ctx.input.region_metadata().region_id
337            );
338        }
339
340        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
341        let input = &self.stream_ctx.input;
342
343        let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
344        let record_batch_stream = ConvertBatchStream::new(
345            batch_stream,
346            input.mapper.clone(),
347            input.cache_strategy.clone(),
348            metrics,
349        );
350
351        Ok(Box::pin(RecordBatchStreamWrapper::new(
352            input.mapper.output_schema(),
353            Box::pin(record_batch_stream),
354        )))
355    }
356
357    #[tracing::instrument(
358        skip_all,
359        fields(
360            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
361            partition = partition
362        )
363    )]
364    fn scan_flat_batch_in_partition(
365        &self,
366        partition: usize,
367        part_metrics: PartitionMetrics,
368    ) -> Result<ScanBatchStream> {
369        ensure!(
370            partition < self.properties.partitions.len(),
371            PartitionOutOfRangeSnafu {
372                given: partition,
373                all: self.properties.partitions.len(),
374            }
375        );
376
377        if self.properties.partitions[partition].is_empty() {
378            return Ok(Box::pin(futures::stream::empty()));
379        }
380
381        let stream_ctx = self.stream_ctx.clone();
382        let semaphore = self.new_semaphore();
383        let partition_ranges = self.properties.partitions[partition].clone();
384        let compaction = self.stream_ctx.input.compaction;
385        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
386        let pruner = self.pruner.clone();
387        // Initializes ref counts for the pruner.
388        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
389        // then the ref count won't be decremented.
390        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
391        pruner.add_partition_ranges(&partition_ranges);
392        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
393
394        let stream = try_stream! {
395            part_metrics.on_first_poll();
396            // Start fetch time before building sources so scan cost contains
397            // build part cost.
398            let mut fetch_start = Instant::now();
399
400            // Scans each part.
401            for part_range in partition_ranges {
402                let (mut reader, _) = Self::build_flat_partition_range_read(
403                    &stream_ctx,
404                    &part_range,
405                    compaction,
406                    &part_metrics,
407                    partition_pruner.clone(),
408                    file_scan_semaphore.clone(),
409                    semaphore.clone(),
410                )
411                .await?;
412
413                let mut metrics = ScannerMetrics {
414                    scan_cost: fetch_start.elapsed(),
415                    ..Default::default()
416                };
417                fetch_start = Instant::now();
418
419                while let Some(record_batch) = reader.try_next().await? {
420                    metrics.scan_cost += fetch_start.elapsed();
421                    metrics.num_batches += 1;
422                    metrics.num_rows += record_batch.num_rows();
423
424                    debug_assert!(record_batch.num_rows() > 0);
425                    if record_batch.num_rows() == 0 {
426                        fetch_start = Instant::now();
427                        continue;
428                    }
429
430                    let yield_start = Instant::now();
431                    yield ScanBatch::RecordBatch(record_batch);
432                    metrics.yield_cost += yield_start.elapsed();
433
434                    fetch_start = Instant::now();
435                }
436
437                metrics.scan_cost += fetch_start.elapsed();
438                fetch_start = Instant::now();
439                part_metrics.merge_metrics(&metrics);
440            }
441
442            part_metrics.on_finish();
443        };
444        Ok(Box::pin(stream))
445    }
446
447    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
448        if self.properties.target_partitions() > self.properties.num_partitions() {
449            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
450            // This semaphore is partition level.
451            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
452            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
453            // files in a part range.
454            Some(Arc::new(Semaphore::new(
455                self.properties.target_partitions() - self.properties.num_partitions() + 1,
456            )))
457        } else {
458            None
459        }
460    }
461
462    /// Creates a new partition metrics instance.
463    /// Sets the partition metrics for the given partition if it is not for compaction.
464    fn new_partition_metrics(
465        &self,
466        explain_verbose: bool,
467        metrics_set: &ExecutionPlanMetricsSet,
468        partition: usize,
469    ) -> PartitionMetrics {
470        let metrics = PartitionMetrics::new(
471            self.stream_ctx.input.mapper.metadata().region_id,
472            partition,
473            get_scanner_type(self.stream_ctx.input.compaction),
474            self.stream_ctx.query_start,
475            explain_verbose,
476            metrics_set,
477        );
478
479        if !self.stream_ctx.input.compaction {
480            self.metrics_list.set(partition, metrics.clone());
481        }
482
483        metrics
484    }
485
486    /// Finds the maximum number of files to read in a single partition range.
487    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
488        partition_ranges
489            .iter()
490            .map(|part_range| {
491                let range_meta = &ranges[part_range.identifier];
492                range_meta.indices.len()
493            })
494            .max()
495            .unwrap_or(0)
496    }
497
498    /// Checks resource limit for the scanner.
499    pub(crate) fn check_scan_limit(&self) -> Result<()> {
500        // Check max file count limit for all partitions since we scan them in parallel.
501        let total_max_files: usize = self
502            .properties
503            .partitions
504            .iter()
505            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
506            .sum();
507
508        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
509        if total_max_files > max_concurrent_files {
510            return TooManyFilesToReadSnafu {
511                actual: total_max_files,
512                max: max_concurrent_files,
513            }
514            .fail();
515        }
516
517        Ok(())
518    }
519}
520
521impl RegionScanner for SeqScan {
522    fn name(&self) -> &str {
523        "SeqScan"
524    }
525
526    fn properties(&self) -> &ScannerProperties {
527        &self.properties
528    }
529
530    fn schema(&self) -> SchemaRef {
531        self.stream_ctx.input.mapper.output_schema()
532    }
533
534    fn metadata(&self) -> RegionMetadataRef {
535        self.stream_ctx.input.mapper.metadata().clone()
536    }
537
538    fn scan_partition(
539        &self,
540        ctx: &QueryScanContext,
541        metrics_set: &ExecutionPlanMetricsSet,
542        partition: usize,
543    ) -> Result<SendableRecordBatchStream, BoxedError> {
544        self.scan_partition_impl(ctx, metrics_set, partition)
545            .map_err(BoxedError::new)
546    }
547
548    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
549        self.properties.prepare(request);
550
551        self.check_scan_limit().map_err(BoxedError::new)?;
552
553        Ok(())
554    }
555
556    fn has_predicate_without_region(&self) -> bool {
557        let predicate = self
558            .stream_ctx
559            .input
560            .predicate_group()
561            .predicate_without_region();
562        predicate.is_some()
563    }
564
565    fn add_dyn_filter_to_predicate(
566        &mut self,
567        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
568    ) -> Vec<bool> {
569        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
570    }
571
572    fn set_logical_region(&mut self, logical_region: bool) {
573        self.properties.set_logical_region(logical_region);
574    }
575}
576
577impl DisplayAs for SeqScan {
578    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
579        write!(
580            f,
581            "SeqScan: region={}, ",
582            self.stream_ctx.input.mapper.metadata().region_id
583        )?;
584        match t {
585            // TODO(LFC): Implement all the "TreeRender" display format.
586            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
587                self.stream_ctx.format_for_explain(false, f)
588            }
589            DisplayFormatType::Verbose => {
590                self.stream_ctx.format_for_explain(true, f)?;
591                self.metrics_list.format_verbose_metrics(f)
592            }
593        }
594    }
595}
596
597impl fmt::Debug for SeqScan {
598    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
599        f.debug_struct("SeqScan")
600            .field("num_ranges", &self.stream_ctx.ranges.len())
601            .finish()
602    }
603}
604
605/// Builds flat sources for the partition range and push them to the `sources` vector.
606/// Returns the estimated rows per batch after splitting if splitting is applied, or `None`.
607pub(crate) async fn build_flat_sources(
608    stream_ctx: &Arc<StreamContext>,
609    part_range: &PartitionRange,
610    compaction: bool,
611    part_metrics: &PartitionMetrics,
612    partition_pruner: Arc<PartitionPruner>,
613    sources: &mut Vec<BoxedRecordBatchStream>,
614    semaphore: Option<Arc<Semaphore>>,
615) -> Result<Option<usize>> {
616    // Gets range meta.
617    let range_meta = &stream_ctx.ranges[part_range.identifier];
618    #[cfg(debug_assertions)]
619    if compaction {
620        // Compaction expects input sources are not been split.
621        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
622        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
623            // It should scan all row groups.
624            debug_assert_eq!(
625                -1, row_group_idx.row_group_index,
626                "Expect {} range scan all row groups, given: {}",
627                i, row_group_idx.row_group_index,
628            );
629        }
630    }
631
632    let read_type = if compaction {
633        "compaction"
634    } else {
635        "seq_scan_files"
636    };
637    let num_indices = range_meta.row_group_indices.len();
638    if num_indices == 0 {
639        return Ok(None);
640    }
641
642    let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
643    let should_split = split_batch_size.is_some();
644    sources.reserve(num_indices);
645    let mut ordered_sources = Vec::with_capacity(num_indices);
646    ordered_sources.resize_with(num_indices, || None);
647    let mut file_scan_tasks = Vec::new();
648
649    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
650        if stream_ctx.is_mem_range_index(*index) {
651            let stream = scan_flat_mem_ranges(
652                stream_ctx.clone(),
653                part_metrics.clone(),
654                *index,
655                range_meta.time_range,
656            );
657            ordered_sources[position] = Some(Box::pin(stream) as _);
658        } else if stream_ctx.is_file_range_index(*index) {
659            if let Some(semaphore_ref) = semaphore.as_ref() {
660                // run in parallel, controlled by semaphore
661                let stream_ctx = stream_ctx.clone();
662                let part_metrics = part_metrics.clone();
663                let partition_pruner = partition_pruner.clone();
664                let semaphore = Arc::clone(semaphore_ref);
665                let row_group_index = *index;
666                file_scan_tasks.push(async move {
667                    let _permit = semaphore.acquire().await.unwrap();
668                    let stream = scan_flat_file_ranges(
669                        stream_ctx,
670                        part_metrics,
671                        row_group_index,
672                        read_type,
673                        partition_pruner,
674                    )
675                    .await?;
676                    Ok((position, Box::pin(stream) as _))
677                });
678            } else {
679                // no semaphore, run sequentially
680                let stream = scan_flat_file_ranges(
681                    stream_ctx.clone(),
682                    part_metrics.clone(),
683                    *index,
684                    read_type,
685                    partition_pruner.clone(),
686                )
687                .await?;
688                ordered_sources[position] = Some(Box::pin(stream) as _);
689            }
690        } else {
691            let stream =
692                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
693            ordered_sources[position] = Some(stream);
694        }
695    }
696
697    if !file_scan_tasks.is_empty() {
698        let results = futures::future::try_join_all(file_scan_tasks).await?;
699        for (position, stream) in results {
700            ordered_sources[position] = Some(stream);
701        }
702    }
703
704    for stream in ordered_sources.into_iter().flatten() {
705        if should_split {
706            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
707        } else {
708            sources.push(stream);
709        }
710    }
711
712    if should_split {
713        common_telemetry::debug!(
714            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
715            stream_ctx.input.region_metadata().region_id,
716            sources.len(),
717            part_range,
718        );
719    }
720
721    Ok(split_batch_size)
722}
723
724#[cfg(test)]
725impl SeqScan {
726    /// Returns the input.
727    pub(crate) fn input(&self) -> &ScanInput {
728        &self.stream_ctx.input
729    }
730}
731
732/// Returns the scanner type.
733fn get_scanner_type(compaction: bool) -> &'static str {
734    if compaction {
735        "SeqScan(compaction)"
736    } else {
737        "SeqScan"
738    }
739}