mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49    should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58/// Scans a region and returns rows in a sorted sequence.
59///
60/// The output order is always `order by primary keys, time index` inside every
61/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
62pub struct SeqScan {
63    /// Properties of the scanner.
64    properties: ScannerProperties,
65    /// Context of streams.
66    stream_ctx: Arc<StreamContext>,
67    /// Metrics for each partition.
68    /// The scanner only sets in query and keeps it empty during compaction.
69    metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73    /// Creates a new [SeqScan] with the given input.
74    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            metrics_list: PartitionMetricsList::default(),
86        }
87    }
88
89    /// Builds a stream for the query.
90    ///
91    /// The returned stream is not partitioned and will contains all the data. If want
92    /// partitioned scan, use [`RegionScanner::scan_partition`].
93    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
94        let metrics_set = ExecutionPlanMetricsSet::new();
95        let streams = (0..self.properties.partitions.len())
96            .map(|partition: usize| {
97                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
98            })
99            .collect::<Result<Vec<_>, _>>()?;
100
101        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
102        Ok(Box::pin(aggr_stream))
103    }
104
105    /// Scan [`Batch`] in all partitions one by one.
106    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
107        let metrics_set = ExecutionPlanMetricsSet::new();
108
109        let streams = (0..self.properties.partitions.len())
110            .map(|partition| {
111                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
112                self.scan_batch_in_partition(partition, metrics)
113            })
114            .collect::<Result<Vec<_>>>()?;
115
116        Ok(Box::pin(futures::stream::iter(streams).flatten()))
117    }
118
119    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
120    ///
121    /// # Panics
122    /// Panics if the compaction flag is not set.
123    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
124        assert!(self.stream_ctx.input.compaction);
125
126        let metrics_set = ExecutionPlanMetricsSet::new();
127        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
128        debug_assert_eq!(1, self.properties.partitions.len());
129        let partition_ranges = &self.properties.partitions[0];
130
131        let reader = Self::merge_all_ranges_for_compaction(
132            &self.stream_ctx,
133            partition_ranges,
134            &part_metrics,
135        )
136        .await?;
137        Ok(Box::new(reader))
138    }
139
140    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
141    ///
142    /// # Panics
143    /// Panics if the compaction flag is not set.
144    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
145        assert!(self.stream_ctx.input.compaction);
146
147        let metrics_set = ExecutionPlanMetricsSet::new();
148        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
149        debug_assert_eq!(1, self.properties.partitions.len());
150        let partition_ranges = &self.properties.partitions[0];
151
152        let reader = Self::merge_all_flat_ranges_for_compaction(
153            &self.stream_ctx,
154            partition_ranges,
155            &part_metrics,
156        )
157        .await?;
158        Ok(reader)
159    }
160
161    /// Builds a merge reader that reads all ranges.
162    /// Callers MUST not split ranges before calling this method.
163    async fn merge_all_ranges_for_compaction(
164        stream_ctx: &Arc<StreamContext>,
165        partition_ranges: &[PartitionRange],
166        part_metrics: &PartitionMetrics,
167    ) -> Result<BoxedBatchReader> {
168        let mut sources = Vec::new();
169        let range_builder_list = Arc::new(RangeBuilderList::new(
170            stream_ctx.input.num_memtables(),
171            stream_ctx.input.num_files(),
172        ));
173        for part_range in partition_ranges {
174            build_sources(
175                stream_ctx,
176                part_range,
177                true,
178                part_metrics,
179                range_builder_list.clone(),
180                &mut sources,
181                None,
182            )
183            .await?;
184        }
185
186        common_telemetry::debug!(
187            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
188            stream_ctx.input.mapper.metadata().region_id,
189            partition_ranges.len(),
190            sources.len()
191        );
192        Self::build_reader_from_sources(stream_ctx, sources, None).await
193    }
194
195    /// Builds a merge reader that reads all flat ranges.
196    /// Callers MUST not split ranges before calling this method.
197    async fn merge_all_flat_ranges_for_compaction(
198        stream_ctx: &Arc<StreamContext>,
199        partition_ranges: &[PartitionRange],
200        part_metrics: &PartitionMetrics,
201    ) -> Result<BoxedRecordBatchStream> {
202        let mut sources = Vec::new();
203        let range_builder_list = Arc::new(RangeBuilderList::new(
204            stream_ctx.input.num_memtables(),
205            stream_ctx.input.num_files(),
206        ));
207        for part_range in partition_ranges {
208            build_flat_sources(
209                stream_ctx,
210                part_range,
211                true,
212                part_metrics,
213                range_builder_list.clone(),
214                &mut sources,
215                None,
216            )
217            .await?;
218        }
219
220        common_telemetry::debug!(
221            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222            stream_ctx.input.mapper.metadata().region_id,
223            partition_ranges.len(),
224            sources.len()
225        );
226        Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
227    }
228
229    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
230    /// if possible.
231    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232    pub(crate) async fn build_reader_from_sources(
233        stream_ctx: &StreamContext,
234        mut sources: Vec<Source>,
235        semaphore: Option<Arc<Semaphore>>,
236    ) -> Result<BoxedBatchReader> {
237        if let Some(semaphore) = semaphore.as_ref() {
238            // Read sources in parallel.
239            if sources.len() > 1 {
240                sources = stream_ctx
241                    .input
242                    .create_parallel_sources(sources, semaphore.clone())?;
243            }
244        }
245
246        let mut builder = MergeReaderBuilder::from_sources(sources);
247        let reader = builder.build().await?;
248
249        let dedup = !stream_ctx.input.append_mode;
250        let reader = if dedup {
251            match stream_ctx.input.merge_mode {
252                MergeMode::LastRow => Box::new(DedupReader::new(
253                    reader,
254                    LastRow::new(stream_ctx.input.filter_deleted),
255                )) as _,
256                MergeMode::LastNonNull => Box::new(DedupReader::new(
257                    reader,
258                    LastNonNull::new(stream_ctx.input.filter_deleted),
259                )) as _,
260            }
261        } else {
262            Box::new(reader) as _
263        };
264
265        let reader = match &stream_ctx.input.series_row_selector {
266            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
267            None => reader,
268        };
269
270        Ok(reader)
271    }
272
273    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
274    /// if possible.
275    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
276    pub(crate) async fn build_flat_reader_from_sources(
277        stream_ctx: &StreamContext,
278        mut sources: Vec<BoxedRecordBatchStream>,
279        semaphore: Option<Arc<Semaphore>>,
280    ) -> Result<BoxedRecordBatchStream> {
281        if let Some(semaphore) = semaphore.as_ref() {
282            // Read sources in parallel.
283            if sources.len() > 1 {
284                sources = stream_ctx
285                    .input
286                    .create_parallel_flat_sources(sources, semaphore.clone())?;
287            }
288        }
289
290        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
291        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
292
293        let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
294
295        let dedup = !stream_ctx.input.append_mode;
296        let reader = if dedup {
297            match stream_ctx.input.merge_mode {
298                MergeMode::LastRow => Box::pin(
299                    FlatDedupReader::new(
300                        reader.into_stream().boxed(),
301                        FlatLastRow::new(stream_ctx.input.filter_deleted),
302                    )
303                    .into_stream(),
304                ) as _,
305                MergeMode::LastNonNull => Box::pin(
306                    FlatDedupReader::new(
307                        reader.into_stream().boxed(),
308                        FlatLastNonNull::new(
309                            mapper.field_column_start(),
310                            stream_ctx.input.filter_deleted,
311                        ),
312                    )
313                    .into_stream(),
314                ) as _,
315            }
316        } else {
317            Box::pin(reader.into_stream()) as _
318        };
319
320        Ok(reader)
321    }
322
323    /// Scans the given partition when the part list is set properly.
324    /// Otherwise the returned stream might not contains any data.
325    fn scan_partition_impl(
326        &self,
327        ctx: &QueryScanContext,
328        metrics_set: &ExecutionPlanMetricsSet,
329        partition: usize,
330    ) -> Result<SendableRecordBatchStream> {
331        if ctx.explain_verbose {
332            common_telemetry::info!(
333                "SeqScan partition {}, region_id: {}",
334                partition,
335                self.stream_ctx.input.region_metadata().region_id
336            );
337        }
338
339        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
340        let input = &self.stream_ctx.input;
341
342        let batch_stream = if input.flat_format {
343            // Use flat scan for bulk memtables
344            self.scan_flat_batch_in_partition(partition, metrics.clone())?
345        } else {
346            // Use regular batch scan for normal memtables
347            self.scan_batch_in_partition(partition, metrics.clone())?
348        };
349        let record_batch_stream = ConvertBatchStream::new(
350            batch_stream,
351            input.mapper.clone(),
352            input.cache_strategy.clone(),
353            metrics,
354        );
355
356        Ok(Box::pin(RecordBatchStreamWrapper::new(
357            input.mapper.output_schema(),
358            Box::pin(record_batch_stream),
359        )))
360    }
361
362    fn scan_batch_in_partition(
363        &self,
364        partition: usize,
365        part_metrics: PartitionMetrics,
366    ) -> Result<ScanBatchStream> {
367        ensure!(
368            partition < self.properties.partitions.len(),
369            PartitionOutOfRangeSnafu {
370                given: partition,
371                all: self.properties.partitions.len(),
372            }
373        );
374
375        if self.properties.partitions[partition].is_empty() {
376            return Ok(Box::pin(futures::stream::empty()));
377        }
378
379        let stream_ctx = self.stream_ctx.clone();
380        let semaphore = self.new_semaphore();
381        let partition_ranges = self.properties.partitions[partition].clone();
382        let compaction = self.stream_ctx.input.compaction;
383        let distinguish_range = self.properties.distinguish_partition_range;
384        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
385
386        let stream = try_stream! {
387            part_metrics.on_first_poll();
388
389            let range_builder_list = Arc::new(RangeBuilderList::new(
390                stream_ctx.input.num_memtables(),
391                stream_ctx.input.num_files(),
392            ));
393            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
394                reason: "Unexpected format",
395            })?;
396            // Scans each part.
397            for part_range in partition_ranges {
398                let mut sources = Vec::new();
399                build_sources(
400                    &stream_ctx,
401                    &part_range,
402                    compaction,
403                    &part_metrics,
404                    range_builder_list.clone(),
405                    &mut sources,
406                    file_scan_semaphore.clone(),
407                ).await?;
408
409                let mut metrics = ScannerMetrics::default();
410                let mut fetch_start = Instant::now();
411                let mut reader =
412                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
413                        .await?;
414                #[cfg(debug_assertions)]
415                let mut checker = crate::read::BatchChecker::default()
416                    .with_start(Some(part_range.start))
417                    .with_end(Some(part_range.end));
418
419                while let Some(batch) = reader.next_batch().await? {
420                    metrics.scan_cost += fetch_start.elapsed();
421                    metrics.num_batches += 1;
422                    metrics.num_rows += batch.num_rows();
423
424                    debug_assert!(!batch.is_empty());
425                    if batch.is_empty() {
426                        continue;
427                    }
428
429                    #[cfg(debug_assertions)]
430                    checker.ensure_part_range_batch(
431                        "SeqScan",
432                        _mapper.metadata().region_id,
433                        partition,
434                        part_range,
435                        &batch,
436                    );
437
438                    let yield_start = Instant::now();
439                    yield ScanBatch::Normal(batch);
440                    metrics.yield_cost += yield_start.elapsed();
441
442                    fetch_start = Instant::now();
443                }
444
445                // Yields an empty part to indicate this range is terminated.
446                // The query engine can use this to optimize some queries.
447                if distinguish_range {
448                    let yield_start = Instant::now();
449                    yield ScanBatch::Normal(Batch::empty());
450                    metrics.yield_cost += yield_start.elapsed();
451                }
452
453                metrics.scan_cost += fetch_start.elapsed();
454                part_metrics.merge_metrics(&metrics);
455            }
456
457            part_metrics.on_finish();
458        };
459        Ok(Box::pin(stream))
460    }
461
462    fn scan_flat_batch_in_partition(
463        &self,
464        partition: usize,
465        part_metrics: PartitionMetrics,
466    ) -> Result<ScanBatchStream> {
467        ensure!(
468            partition < self.properties.partitions.len(),
469            PartitionOutOfRangeSnafu {
470                given: partition,
471                all: self.properties.partitions.len(),
472            }
473        );
474
475        if self.properties.partitions[partition].is_empty() {
476            return Ok(Box::pin(futures::stream::empty()));
477        }
478
479        let stream_ctx = self.stream_ctx.clone();
480        let semaphore = self.new_semaphore();
481        let partition_ranges = self.properties.partitions[partition].clone();
482        let compaction = self.stream_ctx.input.compaction;
483        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
484
485        let stream = try_stream! {
486            part_metrics.on_first_poll();
487
488            let range_builder_list = Arc::new(RangeBuilderList::new(
489                stream_ctx.input.num_memtables(),
490                stream_ctx.input.num_files(),
491            ));
492            // Scans each part.
493            for part_range in partition_ranges {
494                let mut sources = Vec::new();
495                build_flat_sources(
496                    &stream_ctx,
497                    &part_range,
498                    compaction,
499                    &part_metrics,
500                    range_builder_list.clone(),
501                    &mut sources,
502                    file_scan_semaphore.clone(),
503                ).await?;
504
505                let mut metrics = ScannerMetrics::default();
506                let mut fetch_start = Instant::now();
507                let mut reader =
508                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
509                        .await?;
510
511                while let Some(record_batch) = reader.try_next().await? {
512                    metrics.scan_cost += fetch_start.elapsed();
513                    metrics.num_batches += 1;
514                    metrics.num_rows += record_batch.num_rows();
515
516                    debug_assert!(record_batch.num_rows() > 0);
517                    if record_batch.num_rows() == 0 {
518                        continue;
519                    }
520
521                    let yield_start = Instant::now();
522                    yield ScanBatch::RecordBatch(record_batch);
523                    metrics.yield_cost += yield_start.elapsed();
524
525                    fetch_start = Instant::now();
526                }
527
528                metrics.scan_cost += fetch_start.elapsed();
529                part_metrics.merge_metrics(&metrics);
530            }
531
532            part_metrics.on_finish();
533        };
534        Ok(Box::pin(stream))
535    }
536
537    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
538        if self.properties.target_partitions() > self.properties.num_partitions() {
539            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
540            // This semaphore is partition level.
541            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
542            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
543            // files in a part range.
544            Some(Arc::new(Semaphore::new(
545                self.properties.target_partitions() - self.properties.num_partitions() + 1,
546            )))
547        } else {
548            None
549        }
550    }
551
552    /// Creates a new partition metrics instance.
553    /// Sets the partition metrics for the given partition if it is not for compaction.
554    fn new_partition_metrics(
555        &self,
556        explain_verbose: bool,
557        metrics_set: &ExecutionPlanMetricsSet,
558        partition: usize,
559    ) -> PartitionMetrics {
560        let metrics = PartitionMetrics::new(
561            self.stream_ctx.input.mapper.metadata().region_id,
562            partition,
563            get_scanner_type(self.stream_ctx.input.compaction),
564            self.stream_ctx.query_start,
565            explain_verbose,
566            metrics_set,
567        );
568
569        if !self.stream_ctx.input.compaction {
570            self.metrics_list.set(partition, metrics.clone());
571        }
572
573        metrics
574    }
575
576    /// Finds the maximum number of files to read in a single partition range.
577    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
578        partition_ranges
579            .iter()
580            .map(|part_range| {
581                let range_meta = &ranges[part_range.identifier];
582                range_meta.indices.len()
583            })
584            .max()
585            .unwrap_or(0)
586    }
587
588    /// Checks resource limit for the scanner.
589    pub(crate) fn check_scan_limit(&self) -> Result<()> {
590        // Check max file count limit for all partitions since we scan them in parallel.
591        let total_max_files: usize = self
592            .properties
593            .partitions
594            .iter()
595            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
596            .sum();
597
598        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
599        if total_max_files > max_concurrent_files {
600            return TooManyFilesToReadSnafu {
601                actual: total_max_files,
602                max: max_concurrent_files,
603            }
604            .fail();
605        }
606
607        Ok(())
608    }
609}
610
611impl RegionScanner for SeqScan {
612    fn name(&self) -> &str {
613        "SeqScan"
614    }
615
616    fn properties(&self) -> &ScannerProperties {
617        &self.properties
618    }
619
620    fn schema(&self) -> SchemaRef {
621        self.stream_ctx.input.mapper.output_schema()
622    }
623
624    fn metadata(&self) -> RegionMetadataRef {
625        self.stream_ctx.input.mapper.metadata().clone()
626    }
627
628    fn scan_partition(
629        &self,
630        ctx: &QueryScanContext,
631        metrics_set: &ExecutionPlanMetricsSet,
632        partition: usize,
633    ) -> Result<SendableRecordBatchStream, BoxedError> {
634        self.scan_partition_impl(ctx, metrics_set, partition)
635            .map_err(BoxedError::new)
636    }
637
638    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
639        self.properties.prepare(request);
640
641        self.check_scan_limit().map_err(BoxedError::new)?;
642
643        Ok(())
644    }
645
646    fn has_predicate_without_region(&self) -> bool {
647        let predicate = self
648            .stream_ctx
649            .input
650            .predicate_group()
651            .predicate_without_region();
652        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
653    }
654
655    fn set_logical_region(&mut self, logical_region: bool) {
656        self.properties.set_logical_region(logical_region);
657    }
658}
659
660impl DisplayAs for SeqScan {
661    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
662        write!(
663            f,
664            "SeqScan: region={}, ",
665            self.stream_ctx.input.mapper.metadata().region_id
666        )?;
667        match t {
668            // TODO(LFC): Implement all the "TreeRender" display format.
669            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
670                self.stream_ctx.format_for_explain(false, f)
671            }
672            DisplayFormatType::Verbose => {
673                self.stream_ctx.format_for_explain(true, f)?;
674                self.metrics_list.format_verbose_metrics(f)
675            }
676        }
677    }
678}
679
680impl fmt::Debug for SeqScan {
681    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
682        f.debug_struct("SeqScan")
683            .field("num_ranges", &self.stream_ctx.ranges.len())
684            .finish()
685    }
686}
687
688/// Builds sources for the partition range and push them to the `sources` vector.
689pub(crate) async fn build_sources(
690    stream_ctx: &Arc<StreamContext>,
691    part_range: &PartitionRange,
692    compaction: bool,
693    part_metrics: &PartitionMetrics,
694    range_builder_list: Arc<RangeBuilderList>,
695    sources: &mut Vec<Source>,
696    semaphore: Option<Arc<Semaphore>>,
697) -> Result<()> {
698    // Gets range meta.
699    let range_meta = &stream_ctx.ranges[part_range.identifier];
700    #[cfg(debug_assertions)]
701    if compaction {
702        // Compaction expects input sources are not been split.
703        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
704        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
705            // It should scan all row groups.
706            debug_assert_eq!(
707                -1, row_group_idx.row_group_index,
708                "Expect {} range scan all row groups, given: {}",
709                i, row_group_idx.row_group_index,
710            );
711        }
712    }
713
714    let read_type = if compaction {
715        "compaction"
716    } else {
717        "seq_scan_files"
718    };
719    let num_indices = range_meta.row_group_indices.len();
720    if num_indices == 0 {
721        return Ok(());
722    }
723
724    sources.reserve(num_indices);
725    let mut ordered_sources = Vec::with_capacity(num_indices);
726    ordered_sources.resize_with(num_indices, || None);
727    let mut file_scan_tasks = Vec::new();
728
729    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
730        if stream_ctx.is_mem_range_index(*index) {
731            let stream = scan_mem_ranges(
732                stream_ctx.clone(),
733                part_metrics.clone(),
734                *index,
735                range_meta.time_range,
736            );
737            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
738        } else if stream_ctx.is_file_range_index(*index) {
739            if let Some(semaphore_ref) = semaphore.as_ref() {
740                // run in parallel, controlled by semaphore
741                let stream_ctx = stream_ctx.clone();
742                let part_metrics = part_metrics.clone();
743                let range_builder_list = range_builder_list.clone();
744                let semaphore = Arc::clone(semaphore_ref);
745                let row_group_index = *index;
746                file_scan_tasks.push(async move {
747                    let _permit = semaphore.acquire().await.unwrap();
748                    let stream = scan_file_ranges(
749                        stream_ctx,
750                        part_metrics,
751                        row_group_index,
752                        read_type,
753                        range_builder_list,
754                    )
755                    .await?;
756                    Ok((position, Source::Stream(Box::pin(stream) as _)))
757                });
758            } else {
759                // no semaphore, run sequentially
760                let stream = scan_file_ranges(
761                    stream_ctx.clone(),
762                    part_metrics.clone(),
763                    *index,
764                    read_type,
765                    range_builder_list.clone(),
766                )
767                .await?;
768                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
769            }
770        } else {
771            let stream =
772                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
773            ordered_sources[position] = Some(Source::Stream(stream));
774        }
775    }
776
777    if !file_scan_tasks.is_empty() {
778        let results = futures::future::try_join_all(file_scan_tasks).await?;
779        for (position, source) in results {
780            ordered_sources[position] = Some(source);
781        }
782    }
783
784    for source in ordered_sources.into_iter().flatten() {
785        sources.push(source);
786    }
787    Ok(())
788}
789
790/// Builds flat sources for the partition range and push them to the `sources` vector.
791pub(crate) async fn build_flat_sources(
792    stream_ctx: &Arc<StreamContext>,
793    part_range: &PartitionRange,
794    compaction: bool,
795    part_metrics: &PartitionMetrics,
796    range_builder_list: Arc<RangeBuilderList>,
797    sources: &mut Vec<BoxedRecordBatchStream>,
798    semaphore: Option<Arc<Semaphore>>,
799) -> Result<()> {
800    // Gets range meta.
801    let range_meta = &stream_ctx.ranges[part_range.identifier];
802    #[cfg(debug_assertions)]
803    if compaction {
804        // Compaction expects input sources are not been split.
805        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
806        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
807            // It should scan all row groups.
808            debug_assert_eq!(
809                -1, row_group_idx.row_group_index,
810                "Expect {} range scan all row groups, given: {}",
811                i, row_group_idx.row_group_index,
812            );
813        }
814    }
815
816    let read_type = if compaction {
817        "compaction"
818    } else {
819        "seq_scan_files"
820    };
821    let num_indices = range_meta.row_group_indices.len();
822    if num_indices == 0 {
823        return Ok(());
824    }
825
826    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
827    sources.reserve(num_indices);
828    let mut ordered_sources = Vec::with_capacity(num_indices);
829    ordered_sources.resize_with(num_indices, || None);
830    let mut file_scan_tasks = Vec::new();
831
832    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
833        if stream_ctx.is_mem_range_index(*index) {
834            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
835            ordered_sources[position] = Some(Box::pin(stream) as _);
836        } else if stream_ctx.is_file_range_index(*index) {
837            if let Some(semaphore_ref) = semaphore.as_ref() {
838                // run in parallel, controlled by semaphore
839                let stream_ctx = stream_ctx.clone();
840                let part_metrics = part_metrics.clone();
841                let range_builder_list = range_builder_list.clone();
842                let semaphore = Arc::clone(semaphore_ref);
843                let row_group_index = *index;
844                file_scan_tasks.push(async move {
845                    let _permit = semaphore.acquire().await.unwrap();
846                    let stream = scan_flat_file_ranges(
847                        stream_ctx,
848                        part_metrics,
849                        row_group_index,
850                        read_type,
851                        range_builder_list,
852                    )
853                    .await?;
854                    Ok((position, Box::pin(stream) as _))
855                });
856            } else {
857                // no semaphore, run sequentially
858                let stream = scan_flat_file_ranges(
859                    stream_ctx.clone(),
860                    part_metrics.clone(),
861                    *index,
862                    read_type,
863                    range_builder_list.clone(),
864                )
865                .await?;
866                ordered_sources[position] = Some(Box::pin(stream) as _);
867            }
868        } else {
869            let stream =
870                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
871            ordered_sources[position] = Some(stream);
872        }
873    }
874
875    if !file_scan_tasks.is_empty() {
876        let results = futures::future::try_join_all(file_scan_tasks).await?;
877        for (position, stream) in results {
878            ordered_sources[position] = Some(stream);
879        }
880    }
881
882    for stream in ordered_sources.into_iter().flatten() {
883        if should_split {
884            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
885        } else {
886            sources.push(stream);
887        }
888    }
889
890    if should_split {
891        common_telemetry::debug!(
892            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
893            stream_ctx.input.region_metadata().region_id,
894            sources.len(),
895            part_range,
896        );
897    }
898
899    Ok(())
900}
901
902#[cfg(test)]
903impl SeqScan {
904    /// Returns the input.
905    pub(crate) fn input(&self) -> &ScanInput {
906        &self.stream_ctx.input
907    }
908}
909
910/// Returns the scanner type.
911fn get_scanner_type(compaction: bool) -> &'static str {
912    if compaction {
913        "SeqScan(compaction)"
914    } else {
915        "SeqScan"
916    }
917}