mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{ensure, OptionExt};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
48    PartitionMetrics, PartitionMetricsList,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52    scan_util, Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57/// Scans a region and returns rows in a sorted sequence.
58///
59/// The output order is always `order by primary keys, time index` inside every
60/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
61pub struct SeqScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// The scanner is used for compaction.
67    compaction: bool,
68    /// Metrics for each partition.
69    /// The scanner only sets in query and keeps it empty during compaction.
70    metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74    /// Creates a new [SeqScan] with the given input and compaction flag.
75    /// If `compaction` is true, the scanner will not attempt to split ranges.
76    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
77        let mut properties = ScannerProperties::default()
78            .with_append_mode(input.append_mode)
79            .with_total_rows(input.total_rows());
80        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
81        properties.partitions = vec![stream_ctx.partition_ranges()];
82
83        Self {
84            properties,
85            stream_ctx,
86            compaction,
87            metrics_list: PartitionMetricsList::default(),
88        }
89    }
90
91    /// Builds a stream for the query.
92    ///
93    /// The returned stream is not partitioned and will contains all the data. If want
94    /// partitioned scan, use [`RegionScanner::scan_partition`].
95    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96        let metrics_set = ExecutionPlanMetricsSet::new();
97        let streams = (0..self.properties.partitions.len())
98            .map(|partition: usize| {
99                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
100            })
101            .collect::<Result<Vec<_>, _>>()?;
102
103        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
104        Ok(Box::pin(aggr_stream))
105    }
106
107    /// Scan [`Batch`] in all partitions one by one.
108    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
109        let metrics_set = ExecutionPlanMetricsSet::new();
110
111        let streams = (0..self.properties.partitions.len())
112            .map(|partition| {
113                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
114                self.scan_batch_in_partition(partition, metrics)
115            })
116            .collect::<Result<Vec<_>>>()?;
117
118        Ok(Box::pin(futures::stream::iter(streams).flatten()))
119    }
120
121    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
122    ///
123    /// # Panics
124    /// Panics if the compaction flag is not set.
125    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
126        assert!(self.compaction);
127
128        let metrics_set = ExecutionPlanMetricsSet::new();
129        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
130        debug_assert_eq!(1, self.properties.partitions.len());
131        let partition_ranges = &self.properties.partitions[0];
132
133        let reader = Self::merge_all_ranges_for_compaction(
134            &self.stream_ctx,
135            partition_ranges,
136            &part_metrics,
137        )
138        .await?;
139        Ok(Box::new(reader))
140    }
141
142    /// Builds a merge reader that reads all ranges.
143    /// Callers MUST not split ranges before calling this method.
144    async fn merge_all_ranges_for_compaction(
145        stream_ctx: &Arc<StreamContext>,
146        partition_ranges: &[PartitionRange],
147        part_metrics: &PartitionMetrics,
148    ) -> Result<BoxedBatchReader> {
149        let mut sources = Vec::new();
150        let range_builder_list = Arc::new(RangeBuilderList::new(
151            stream_ctx.input.num_memtables(),
152            stream_ctx.input.num_files(),
153        ));
154        for part_range in partition_ranges {
155            build_sources(
156                stream_ctx,
157                part_range,
158                true,
159                part_metrics,
160                range_builder_list.clone(),
161                &mut sources,
162            )
163            .await?;
164        }
165
166        common_telemetry::debug!(
167            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
168            stream_ctx.input.mapper.metadata().region_id,
169            partition_ranges.len(),
170            sources.len()
171        );
172        Self::build_reader_from_sources(stream_ctx, sources, None).await
173    }
174
175    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
176    /// if possible.
177    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
178    pub(crate) async fn build_reader_from_sources(
179        stream_ctx: &StreamContext,
180        mut sources: Vec<Source>,
181        semaphore: Option<Arc<Semaphore>>,
182    ) -> Result<BoxedBatchReader> {
183        if let Some(semaphore) = semaphore.as_ref() {
184            // Read sources in parallel.
185            if sources.len() > 1 {
186                sources = stream_ctx
187                    .input
188                    .create_parallel_sources(sources, semaphore.clone())?;
189            }
190        }
191
192        let mut builder = MergeReaderBuilder::from_sources(sources);
193        let reader = builder.build().await?;
194
195        let dedup = !stream_ctx.input.append_mode;
196        let reader = if dedup {
197            match stream_ctx.input.merge_mode {
198                MergeMode::LastRow => Box::new(DedupReader::new(
199                    reader,
200                    LastRow::new(stream_ctx.input.filter_deleted),
201                )) as _,
202                MergeMode::LastNonNull => Box::new(DedupReader::new(
203                    reader,
204                    LastNonNull::new(stream_ctx.input.filter_deleted),
205                )) as _,
206            }
207        } else {
208            Box::new(reader) as _
209        };
210
211        let reader = match &stream_ctx.input.series_row_selector {
212            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
213            None => reader,
214        };
215
216        Ok(reader)
217    }
218
219    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
220    /// if possible.
221    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
222    pub(crate) async fn build_flat_reader_from_sources(
223        stream_ctx: &StreamContext,
224        mut sources: Vec<BoxedRecordBatchStream>,
225        semaphore: Option<Arc<Semaphore>>,
226    ) -> Result<BoxedRecordBatchStream> {
227        if let Some(semaphore) = semaphore.as_ref() {
228            // Read sources in parallel.
229            if sources.len() > 1 {
230                sources = stream_ctx
231                    .input
232                    .create_parallel_flat_sources(sources, semaphore.clone())?;
233            }
234        }
235
236        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
237        let schema = mapper.input_arrow_schema();
238
239        let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
240
241        let dedup = !stream_ctx.input.append_mode;
242        let reader = if dedup {
243            match stream_ctx.input.merge_mode {
244                MergeMode::LastRow => Box::pin(
245                    FlatDedupReader::new(
246                        reader.into_stream().boxed(),
247                        FlatLastRow::new(stream_ctx.input.filter_deleted),
248                    )
249                    .into_stream(),
250                ) as _,
251                MergeMode::LastNonNull => Box::pin(
252                    FlatDedupReader::new(
253                        reader.into_stream().boxed(),
254                        FlatLastNonNull::new(
255                            mapper.field_column_start(),
256                            stream_ctx.input.filter_deleted,
257                        ),
258                    )
259                    .into_stream(),
260                ) as _,
261            }
262        } else {
263            Box::pin(reader.into_stream()) as _
264        };
265
266        Ok(reader)
267    }
268
269    /// Scans the given partition when the part list is set properly.
270    /// Otherwise the returned stream might not contains any data.
271    fn scan_partition_impl(
272        &self,
273        ctx: &QueryScanContext,
274        metrics_set: &ExecutionPlanMetricsSet,
275        partition: usize,
276    ) -> Result<SendableRecordBatchStream> {
277        if ctx.explain_verbose {
278            common_telemetry::info!(
279                "SeqScan partition {}, region_id: {}",
280                partition,
281                self.stream_ctx.input.region_metadata().region_id
282            );
283        }
284
285        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
286        let input = &self.stream_ctx.input;
287
288        let batch_stream = if input.flat_format {
289            // Use flat scan for bulk memtables
290            self.scan_flat_batch_in_partition(partition, metrics.clone())?
291        } else {
292            // Use regular batch scan for normal memtables
293            self.scan_batch_in_partition(partition, metrics.clone())?
294        };
295        let record_batch_stream = ConvertBatchStream::new(
296            batch_stream,
297            input.mapper.clone(),
298            input.cache_strategy.clone(),
299            metrics,
300        );
301
302        Ok(Box::pin(RecordBatchStreamWrapper::new(
303            input.mapper.output_schema(),
304            Box::pin(record_batch_stream),
305        )))
306    }
307
308    fn scan_batch_in_partition(
309        &self,
310        partition: usize,
311        part_metrics: PartitionMetrics,
312    ) -> Result<ScanBatchStream> {
313        ensure!(
314            partition < self.properties.partitions.len(),
315            PartitionOutOfRangeSnafu {
316                given: partition,
317                all: self.properties.partitions.len(),
318            }
319        );
320
321        if self.properties.partitions[partition].is_empty() {
322            return Ok(Box::pin(futures::stream::empty()));
323        }
324
325        let stream_ctx = self.stream_ctx.clone();
326        let semaphore = self.new_semaphore();
327        let partition_ranges = self.properties.partitions[partition].clone();
328        let compaction = self.compaction;
329        let distinguish_range = self.properties.distinguish_partition_range;
330
331        let stream = try_stream! {
332            part_metrics.on_first_poll();
333
334            let range_builder_list = Arc::new(RangeBuilderList::new(
335                stream_ctx.input.num_memtables(),
336                stream_ctx.input.num_files(),
337            ));
338            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
339                reason: "Unexpected format",
340            })?;
341            // Scans each part.
342            for part_range in partition_ranges {
343                let mut sources = Vec::new();
344                build_sources(
345                    &stream_ctx,
346                    &part_range,
347                    compaction,
348                    &part_metrics,
349                    range_builder_list.clone(),
350                    &mut sources,
351                ).await?;
352
353                let mut metrics = ScannerMetrics::default();
354                let mut fetch_start = Instant::now();
355                let mut reader =
356                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
357                        .await?;
358                #[cfg(debug_assertions)]
359                let mut checker = crate::read::BatchChecker::default()
360                    .with_start(Some(part_range.start))
361                    .with_end(Some(part_range.end));
362
363                while let Some(batch) = reader.next_batch().await? {
364                    metrics.scan_cost += fetch_start.elapsed();
365                    metrics.num_batches += 1;
366                    metrics.num_rows += batch.num_rows();
367
368                    debug_assert!(!batch.is_empty());
369                    if batch.is_empty() {
370                        continue;
371                    }
372
373                    #[cfg(debug_assertions)]
374                    checker.ensure_part_range_batch(
375                        "SeqScan",
376                        _mapper.metadata().region_id,
377                        partition,
378                        part_range,
379                        &batch,
380                    );
381
382                    let yield_start = Instant::now();
383                    yield ScanBatch::Normal(batch);
384                    metrics.yield_cost += yield_start.elapsed();
385
386                    fetch_start = Instant::now();
387                }
388
389                // Yields an empty part to indicate this range is terminated.
390                // The query engine can use this to optimize some queries.
391                if distinguish_range {
392                    let yield_start = Instant::now();
393                    yield ScanBatch::Normal(Batch::empty());
394                    metrics.yield_cost += yield_start.elapsed();
395                }
396
397                metrics.scan_cost += fetch_start.elapsed();
398                part_metrics.merge_metrics(&metrics);
399            }
400
401            part_metrics.on_finish();
402        };
403        Ok(Box::pin(stream))
404    }
405
406    fn scan_flat_batch_in_partition(
407        &self,
408        partition: usize,
409        part_metrics: PartitionMetrics,
410    ) -> Result<ScanBatchStream> {
411        ensure!(
412            partition < self.properties.partitions.len(),
413            PartitionOutOfRangeSnafu {
414                given: partition,
415                all: self.properties.partitions.len(),
416            }
417        );
418
419        if self.properties.partitions[partition].is_empty() {
420            return Ok(Box::pin(futures::stream::empty()));
421        }
422
423        let stream_ctx = self.stream_ctx.clone();
424        let semaphore = self.new_semaphore();
425        let partition_ranges = self.properties.partitions[partition].clone();
426        let compaction = self.compaction;
427
428        let stream = try_stream! {
429            part_metrics.on_first_poll();
430
431            let range_builder_list = Arc::new(RangeBuilderList::new(
432                stream_ctx.input.num_memtables(),
433                stream_ctx.input.num_files(),
434            ));
435            // Scans each part.
436            for part_range in partition_ranges {
437                let mut sources = Vec::new();
438                build_flat_sources(
439                    &stream_ctx,
440                    &part_range,
441                    compaction,
442                    &part_metrics,
443                    range_builder_list.clone(),
444                    &mut sources,
445                ).await?;
446
447                let mut metrics = ScannerMetrics::default();
448                let mut fetch_start = Instant::now();
449                let mut reader =
450                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
451                        .await?;
452
453                while let Some(record_batch) = reader.try_next().await? {
454                    metrics.scan_cost += fetch_start.elapsed();
455                    metrics.num_batches += 1;
456                    metrics.num_rows += record_batch.num_rows();
457
458                    debug_assert!(record_batch.num_rows() > 0);
459                    if record_batch.num_rows() == 0 {
460                        continue;
461                    }
462
463                    let yield_start = Instant::now();
464                    yield ScanBatch::RecordBatch(record_batch);
465                    metrics.yield_cost += yield_start.elapsed();
466
467                    fetch_start = Instant::now();
468                }
469
470                metrics.scan_cost += fetch_start.elapsed();
471                part_metrics.merge_metrics(&metrics);
472            }
473
474            part_metrics.on_finish();
475        };
476        Ok(Box::pin(stream))
477    }
478
479    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
480        if self.properties.target_partitions() > self.properties.num_partitions() {
481            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
482            // This semaphore is partition level.
483            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
484            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
485            // files in a part range.
486            Some(Arc::new(Semaphore::new(
487                self.properties.target_partitions() - self.properties.num_partitions() + 1,
488            )))
489        } else {
490            None
491        }
492    }
493
494    /// Creates a new partition metrics instance.
495    /// Sets the partition metrics for the given partition if it is not for compaction.
496    fn new_partition_metrics(
497        &self,
498        explain_verbose: bool,
499        metrics_set: &ExecutionPlanMetricsSet,
500        partition: usize,
501    ) -> PartitionMetrics {
502        let metrics = PartitionMetrics::new(
503            self.stream_ctx.input.mapper.metadata().region_id,
504            partition,
505            get_scanner_type(self.compaction),
506            self.stream_ctx.query_start,
507            explain_verbose,
508            metrics_set,
509        );
510
511        if !self.compaction {
512            self.metrics_list.set(partition, metrics.clone());
513        }
514
515        metrics
516    }
517
518    /// Finds the maximum number of files to read in a single partition range.
519    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
520        partition_ranges
521            .iter()
522            .map(|part_range| {
523                let range_meta = &ranges[part_range.identifier];
524                range_meta.indices.len()
525            })
526            .max()
527            .unwrap_or(0)
528    }
529
530    /// Checks resource limit for the scanner.
531    pub(crate) fn check_scan_limit(&self) -> Result<()> {
532        // Check max file count limit for all partitions since we scan them in parallel.
533        let total_max_files: usize = self
534            .properties
535            .partitions
536            .iter()
537            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
538            .sum();
539
540        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
541        if total_max_files > max_concurrent_files {
542            return TooManyFilesToReadSnafu {
543                actual: total_max_files,
544                max: max_concurrent_files,
545            }
546            .fail();
547        }
548
549        Ok(())
550    }
551}
552
553impl RegionScanner for SeqScan {
554    fn properties(&self) -> &ScannerProperties {
555        &self.properties
556    }
557
558    fn schema(&self) -> SchemaRef {
559        self.stream_ctx.input.mapper.output_schema()
560    }
561
562    fn metadata(&self) -> RegionMetadataRef {
563        self.stream_ctx.input.mapper.metadata().clone()
564    }
565
566    fn scan_partition(
567        &self,
568        ctx: &QueryScanContext,
569        metrics_set: &ExecutionPlanMetricsSet,
570        partition: usize,
571    ) -> Result<SendableRecordBatchStream, BoxedError> {
572        self.scan_partition_impl(ctx, metrics_set, partition)
573            .map_err(BoxedError::new)
574    }
575
576    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
577        self.properties.prepare(request);
578
579        self.check_scan_limit().map_err(BoxedError::new)?;
580
581        Ok(())
582    }
583
584    fn has_predicate(&self) -> bool {
585        let predicate = self.stream_ctx.input.predicate();
586        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
587    }
588
589    fn set_logical_region(&mut self, logical_region: bool) {
590        self.properties.set_logical_region(logical_region);
591    }
592}
593
594impl DisplayAs for SeqScan {
595    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
596        write!(
597            f,
598            "SeqScan: region={}, ",
599            self.stream_ctx.input.mapper.metadata().region_id
600        )?;
601        match t {
602            // TODO(LFC): Implement all the "TreeRender" display format.
603            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
604                self.stream_ctx.format_for_explain(false, f)
605            }
606            DisplayFormatType::Verbose => {
607                self.stream_ctx.format_for_explain(true, f)?;
608                self.metrics_list.format_verbose_metrics(f)
609            }
610        }
611    }
612}
613
614impl fmt::Debug for SeqScan {
615    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616        f.debug_struct("SeqScan")
617            .field("num_ranges", &self.stream_ctx.ranges.len())
618            .finish()
619    }
620}
621
622/// Builds sources for the partition range and push them to the `sources` vector.
623pub(crate) async fn build_sources(
624    stream_ctx: &Arc<StreamContext>,
625    part_range: &PartitionRange,
626    compaction: bool,
627    part_metrics: &PartitionMetrics,
628    range_builder_list: Arc<RangeBuilderList>,
629    sources: &mut Vec<Source>,
630) -> Result<()> {
631    // Gets range meta.
632    let range_meta = &stream_ctx.ranges[part_range.identifier];
633    #[cfg(debug_assertions)]
634    if compaction {
635        // Compaction expects input sources are not been split.
636        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
637        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
638            // It should scan all row groups.
639            debug_assert_eq!(
640                -1, row_group_idx.row_group_index,
641                "Expect {} range scan all row groups, given: {}",
642                i, row_group_idx.row_group_index,
643            );
644        }
645    }
646
647    sources.reserve(range_meta.row_group_indices.len());
648    for index in &range_meta.row_group_indices {
649        let stream = if stream_ctx.is_mem_range_index(*index) {
650            let stream = scan_mem_ranges(
651                stream_ctx.clone(),
652                part_metrics.clone(),
653                *index,
654                range_meta.time_range,
655            );
656            Box::pin(stream) as _
657        } else if stream_ctx.is_file_range_index(*index) {
658            let read_type = if compaction {
659                "compaction"
660            } else {
661                "seq_scan_files"
662            };
663            let stream = scan_file_ranges(
664                stream_ctx.clone(),
665                part_metrics.clone(),
666                *index,
667                read_type,
668                range_builder_list.clone(),
669            )
670            .await?;
671            Box::pin(stream) as _
672        } else {
673            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
674        };
675        sources.push(Source::Stream(stream));
676    }
677    Ok(())
678}
679
680/// Builds flat sources for the partition range and push them to the `sources` vector.
681pub(crate) async fn build_flat_sources(
682    stream_ctx: &Arc<StreamContext>,
683    part_range: &PartitionRange,
684    compaction: bool,
685    part_metrics: &PartitionMetrics,
686    range_builder_list: Arc<RangeBuilderList>,
687    sources: &mut Vec<BoxedRecordBatchStream>,
688) -> Result<()> {
689    // Gets range meta.
690    let range_meta = &stream_ctx.ranges[part_range.identifier];
691    #[cfg(debug_assertions)]
692    if compaction {
693        // Compaction expects input sources are not been split.
694        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
695        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
696            // It should scan all row groups.
697            debug_assert_eq!(
698                -1, row_group_idx.row_group_index,
699                "Expect {} range scan all row groups, given: {}",
700                i, row_group_idx.row_group_index,
701            );
702        }
703    }
704
705    sources.reserve(range_meta.row_group_indices.len());
706    for index in &range_meta.row_group_indices {
707        let stream = if stream_ctx.is_mem_range_index(*index) {
708            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
709            Box::pin(stream) as _
710        } else if stream_ctx.is_file_range_index(*index) {
711            let read_type = if compaction {
712                "compaction"
713            } else {
714                "seq_scan_files"
715            };
716            let stream = scan_flat_file_ranges(
717                stream_ctx.clone(),
718                part_metrics.clone(),
719                *index,
720                read_type,
721                range_builder_list.clone(),
722            )
723            .await?;
724            Box::pin(stream) as _
725        } else {
726            scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
727        };
728        sources.push(stream);
729    }
730    Ok(())
731}
732
733#[cfg(test)]
734impl SeqScan {
735    /// Returns the input.
736    pub(crate) fn input(&self) -> &ScanInput {
737        &self.stream_ctx.input
738    }
739}
740
741/// Returns the scanner type.
742fn get_scanner_type(compaction: bool) -> &'static str {
743    if compaction {
744        "SeqScan(compaction)"
745    } else {
746        "SeqScan"
747    }
748}