mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48    scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57/// Scans a region and returns rows in a sorted sequence.
58///
59/// The output order is always `order by primary keys, time index` inside every
60/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
61pub struct SeqScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// The scanner is used for compaction.
67    compaction: bool,
68    /// Metrics for each partition.
69    /// The scanner only sets in query and keeps it empty during compaction.
70    metrics_list: PartitionMetricsList,
71}
72
73impl SeqScan {
74    /// Creates a new [SeqScan] with the given input and compaction flag.
75    /// If `compaction` is true, the scanner will not attempt to split ranges.
76    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
77        let mut properties = ScannerProperties::default()
78            .with_append_mode(input.append_mode)
79            .with_total_rows(input.total_rows());
80        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
81        properties.partitions = vec![stream_ctx.partition_ranges()];
82
83        Self {
84            properties,
85            stream_ctx,
86            compaction,
87            metrics_list: PartitionMetricsList::default(),
88        }
89    }
90
91    /// Builds a stream for the query.
92    ///
93    /// The returned stream is not partitioned and will contains all the data. If want
94    /// partitioned scan, use [`RegionScanner::scan_partition`].
95    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
96        let metrics_set = ExecutionPlanMetricsSet::new();
97        let streams = (0..self.properties.partitions.len())
98            .map(|partition: usize| {
99                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
100            })
101            .collect::<Result<Vec<_>, _>>()?;
102
103        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
104        Ok(Box::pin(aggr_stream))
105    }
106
107    /// Scan [`Batch`] in all partitions one by one.
108    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
109        let metrics_set = ExecutionPlanMetricsSet::new();
110
111        let streams = (0..self.properties.partitions.len())
112            .map(|partition| {
113                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
114                self.scan_batch_in_partition(partition, metrics)
115            })
116            .collect::<Result<Vec<_>>>()?;
117
118        Ok(Box::pin(futures::stream::iter(streams).flatten()))
119    }
120
121    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
122    ///
123    /// # Panics
124    /// Panics if the compaction flag is not set.
125    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
126        assert!(self.compaction);
127
128        let metrics_set = ExecutionPlanMetricsSet::new();
129        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
130        debug_assert_eq!(1, self.properties.partitions.len());
131        let partition_ranges = &self.properties.partitions[0];
132
133        let reader = Self::merge_all_ranges_for_compaction(
134            &self.stream_ctx,
135            partition_ranges,
136            &part_metrics,
137        )
138        .await?;
139        Ok(Box::new(reader))
140    }
141
142    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
143    ///
144    /// # Panics
145    /// Panics if the compaction flag is not set.
146    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
147        assert!(self.compaction);
148
149        let metrics_set = ExecutionPlanMetricsSet::new();
150        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
151        debug_assert_eq!(1, self.properties.partitions.len());
152        let partition_ranges = &self.properties.partitions[0];
153
154        let reader = Self::merge_all_flat_ranges_for_compaction(
155            &self.stream_ctx,
156            partition_ranges,
157            &part_metrics,
158        )
159        .await?;
160        Ok(reader)
161    }
162
163    /// Builds a merge reader that reads all ranges.
164    /// Callers MUST not split ranges before calling this method.
165    async fn merge_all_ranges_for_compaction(
166        stream_ctx: &Arc<StreamContext>,
167        partition_ranges: &[PartitionRange],
168        part_metrics: &PartitionMetrics,
169    ) -> Result<BoxedBatchReader> {
170        let mut sources = Vec::new();
171        let range_builder_list = Arc::new(RangeBuilderList::new(
172            stream_ctx.input.num_memtables(),
173            stream_ctx.input.num_files(),
174        ));
175        for part_range in partition_ranges {
176            build_sources(
177                stream_ctx,
178                part_range,
179                true,
180                part_metrics,
181                range_builder_list.clone(),
182                &mut sources,
183            )
184            .await?;
185        }
186
187        common_telemetry::debug!(
188            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
189            stream_ctx.input.mapper.metadata().region_id,
190            partition_ranges.len(),
191            sources.len()
192        );
193        Self::build_reader_from_sources(stream_ctx, sources, None).await
194    }
195
196    /// Builds a merge reader that reads all flat ranges.
197    /// Callers MUST not split ranges before calling this method.
198    async fn merge_all_flat_ranges_for_compaction(
199        stream_ctx: &Arc<StreamContext>,
200        partition_ranges: &[PartitionRange],
201        part_metrics: &PartitionMetrics,
202    ) -> Result<BoxedRecordBatchStream> {
203        let mut sources = Vec::new();
204        let range_builder_list = Arc::new(RangeBuilderList::new(
205            stream_ctx.input.num_memtables(),
206            stream_ctx.input.num_files(),
207        ));
208        for part_range in partition_ranges {
209            build_flat_sources(
210                stream_ctx,
211                part_range,
212                true,
213                part_metrics,
214                range_builder_list.clone(),
215                &mut sources,
216            )
217            .await?;
218        }
219
220        common_telemetry::debug!(
221            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
222            stream_ctx.input.mapper.metadata().region_id,
223            partition_ranges.len(),
224            sources.len()
225        );
226        Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
227    }
228
229    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
230    /// if possible.
231    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
232    pub(crate) async fn build_reader_from_sources(
233        stream_ctx: &StreamContext,
234        mut sources: Vec<Source>,
235        semaphore: Option<Arc<Semaphore>>,
236    ) -> Result<BoxedBatchReader> {
237        if let Some(semaphore) = semaphore.as_ref() {
238            // Read sources in parallel.
239            if sources.len() > 1 {
240                sources = stream_ctx
241                    .input
242                    .create_parallel_sources(sources, semaphore.clone())?;
243            }
244        }
245
246        let mut builder = MergeReaderBuilder::from_sources(sources);
247        let reader = builder.build().await?;
248
249        let dedup = !stream_ctx.input.append_mode;
250        let reader = if dedup {
251            match stream_ctx.input.merge_mode {
252                MergeMode::LastRow => Box::new(DedupReader::new(
253                    reader,
254                    LastRow::new(stream_ctx.input.filter_deleted),
255                )) as _,
256                MergeMode::LastNonNull => Box::new(DedupReader::new(
257                    reader,
258                    LastNonNull::new(stream_ctx.input.filter_deleted),
259                )) as _,
260            }
261        } else {
262            Box::new(reader) as _
263        };
264
265        let reader = match &stream_ctx.input.series_row_selector {
266            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
267            None => reader,
268        };
269
270        Ok(reader)
271    }
272
273    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
274    /// if possible.
275    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
276    pub(crate) async fn build_flat_reader_from_sources(
277        stream_ctx: &StreamContext,
278        mut sources: Vec<BoxedRecordBatchStream>,
279        semaphore: Option<Arc<Semaphore>>,
280    ) -> Result<BoxedRecordBatchStream> {
281        if let Some(semaphore) = semaphore.as_ref() {
282            // Read sources in parallel.
283            if sources.len() > 1 {
284                sources = stream_ctx
285                    .input
286                    .create_parallel_flat_sources(sources, semaphore.clone())?;
287            }
288        }
289
290        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
291        let schema = mapper.input_arrow_schema();
292
293        let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
294
295        let dedup = !stream_ctx.input.append_mode;
296        let reader = if dedup {
297            match stream_ctx.input.merge_mode {
298                MergeMode::LastRow => Box::pin(
299                    FlatDedupReader::new(
300                        reader.into_stream().boxed(),
301                        FlatLastRow::new(stream_ctx.input.filter_deleted),
302                    )
303                    .into_stream(),
304                ) as _,
305                MergeMode::LastNonNull => Box::pin(
306                    FlatDedupReader::new(
307                        reader.into_stream().boxed(),
308                        FlatLastNonNull::new(
309                            mapper.field_column_start(),
310                            stream_ctx.input.filter_deleted,
311                        ),
312                    )
313                    .into_stream(),
314                ) as _,
315            }
316        } else {
317            Box::pin(reader.into_stream()) as _
318        };
319
320        Ok(reader)
321    }
322
323    /// Scans the given partition when the part list is set properly.
324    /// Otherwise the returned stream might not contains any data.
325    fn scan_partition_impl(
326        &self,
327        ctx: &QueryScanContext,
328        metrics_set: &ExecutionPlanMetricsSet,
329        partition: usize,
330    ) -> Result<SendableRecordBatchStream> {
331        if ctx.explain_verbose {
332            common_telemetry::info!(
333                "SeqScan partition {}, region_id: {}",
334                partition,
335                self.stream_ctx.input.region_metadata().region_id
336            );
337        }
338
339        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
340        let input = &self.stream_ctx.input;
341
342        let batch_stream = if input.flat_format {
343            // Use flat scan for bulk memtables
344            self.scan_flat_batch_in_partition(partition, metrics.clone())?
345        } else {
346            // Use regular batch scan for normal memtables
347            self.scan_batch_in_partition(partition, metrics.clone())?
348        };
349        let record_batch_stream = ConvertBatchStream::new(
350            batch_stream,
351            input.mapper.clone(),
352            input.cache_strategy.clone(),
353            metrics,
354        );
355
356        Ok(Box::pin(RecordBatchStreamWrapper::new(
357            input.mapper.output_schema(),
358            Box::pin(record_batch_stream),
359        )))
360    }
361
362    fn scan_batch_in_partition(
363        &self,
364        partition: usize,
365        part_metrics: PartitionMetrics,
366    ) -> Result<ScanBatchStream> {
367        ensure!(
368            partition < self.properties.partitions.len(),
369            PartitionOutOfRangeSnafu {
370                given: partition,
371                all: self.properties.partitions.len(),
372            }
373        );
374
375        if self.properties.partitions[partition].is_empty() {
376            return Ok(Box::pin(futures::stream::empty()));
377        }
378
379        let stream_ctx = self.stream_ctx.clone();
380        let semaphore = self.new_semaphore();
381        let partition_ranges = self.properties.partitions[partition].clone();
382        let compaction = self.compaction;
383        let distinguish_range = self.properties.distinguish_partition_range;
384
385        let stream = try_stream! {
386            part_metrics.on_first_poll();
387
388            let range_builder_list = Arc::new(RangeBuilderList::new(
389                stream_ctx.input.num_memtables(),
390                stream_ctx.input.num_files(),
391            ));
392            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
393                reason: "Unexpected format",
394            })?;
395            // Scans each part.
396            for part_range in partition_ranges {
397                let mut sources = Vec::new();
398                build_sources(
399                    &stream_ctx,
400                    &part_range,
401                    compaction,
402                    &part_metrics,
403                    range_builder_list.clone(),
404                    &mut sources,
405                ).await?;
406
407                let mut metrics = ScannerMetrics::default();
408                let mut fetch_start = Instant::now();
409                let mut reader =
410                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
411                        .await?;
412                #[cfg(debug_assertions)]
413                let mut checker = crate::read::BatchChecker::default()
414                    .with_start(Some(part_range.start))
415                    .with_end(Some(part_range.end));
416
417                while let Some(batch) = reader.next_batch().await? {
418                    metrics.scan_cost += fetch_start.elapsed();
419                    metrics.num_batches += 1;
420                    metrics.num_rows += batch.num_rows();
421
422                    debug_assert!(!batch.is_empty());
423                    if batch.is_empty() {
424                        continue;
425                    }
426
427                    #[cfg(debug_assertions)]
428                    checker.ensure_part_range_batch(
429                        "SeqScan",
430                        _mapper.metadata().region_id,
431                        partition,
432                        part_range,
433                        &batch,
434                    );
435
436                    let yield_start = Instant::now();
437                    yield ScanBatch::Normal(batch);
438                    metrics.yield_cost += yield_start.elapsed();
439
440                    fetch_start = Instant::now();
441                }
442
443                // Yields an empty part to indicate this range is terminated.
444                // The query engine can use this to optimize some queries.
445                if distinguish_range {
446                    let yield_start = Instant::now();
447                    yield ScanBatch::Normal(Batch::empty());
448                    metrics.yield_cost += yield_start.elapsed();
449                }
450
451                metrics.scan_cost += fetch_start.elapsed();
452                part_metrics.merge_metrics(&metrics);
453            }
454
455            part_metrics.on_finish();
456        };
457        Ok(Box::pin(stream))
458    }
459
460    fn scan_flat_batch_in_partition(
461        &self,
462        partition: usize,
463        part_metrics: PartitionMetrics,
464    ) -> Result<ScanBatchStream> {
465        ensure!(
466            partition < self.properties.partitions.len(),
467            PartitionOutOfRangeSnafu {
468                given: partition,
469                all: self.properties.partitions.len(),
470            }
471        );
472
473        if self.properties.partitions[partition].is_empty() {
474            return Ok(Box::pin(futures::stream::empty()));
475        }
476
477        let stream_ctx = self.stream_ctx.clone();
478        let semaphore = self.new_semaphore();
479        let partition_ranges = self.properties.partitions[partition].clone();
480        let compaction = self.compaction;
481
482        let stream = try_stream! {
483            part_metrics.on_first_poll();
484
485            let range_builder_list = Arc::new(RangeBuilderList::new(
486                stream_ctx.input.num_memtables(),
487                stream_ctx.input.num_files(),
488            ));
489            // Scans each part.
490            for part_range in partition_ranges {
491                let mut sources = Vec::new();
492                build_flat_sources(
493                    &stream_ctx,
494                    &part_range,
495                    compaction,
496                    &part_metrics,
497                    range_builder_list.clone(),
498                    &mut sources,
499                ).await?;
500
501                let mut metrics = ScannerMetrics::default();
502                let mut fetch_start = Instant::now();
503                let mut reader =
504                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
505                        .await?;
506
507                while let Some(record_batch) = reader.try_next().await? {
508                    metrics.scan_cost += fetch_start.elapsed();
509                    metrics.num_batches += 1;
510                    metrics.num_rows += record_batch.num_rows();
511
512                    debug_assert!(record_batch.num_rows() > 0);
513                    if record_batch.num_rows() == 0 {
514                        continue;
515                    }
516
517                    let yield_start = Instant::now();
518                    yield ScanBatch::RecordBatch(record_batch);
519                    metrics.yield_cost += yield_start.elapsed();
520
521                    fetch_start = Instant::now();
522                }
523
524                metrics.scan_cost += fetch_start.elapsed();
525                part_metrics.merge_metrics(&metrics);
526            }
527
528            part_metrics.on_finish();
529        };
530        Ok(Box::pin(stream))
531    }
532
533    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
534        if self.properties.target_partitions() > self.properties.num_partitions() {
535            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
536            // This semaphore is partition level.
537            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
538            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
539            // files in a part range.
540            Some(Arc::new(Semaphore::new(
541                self.properties.target_partitions() - self.properties.num_partitions() + 1,
542            )))
543        } else {
544            None
545        }
546    }
547
548    /// Creates a new partition metrics instance.
549    /// Sets the partition metrics for the given partition if it is not for compaction.
550    fn new_partition_metrics(
551        &self,
552        explain_verbose: bool,
553        metrics_set: &ExecutionPlanMetricsSet,
554        partition: usize,
555    ) -> PartitionMetrics {
556        let metrics = PartitionMetrics::new(
557            self.stream_ctx.input.mapper.metadata().region_id,
558            partition,
559            get_scanner_type(self.compaction),
560            self.stream_ctx.query_start,
561            explain_verbose,
562            metrics_set,
563        );
564
565        if !self.compaction {
566            self.metrics_list.set(partition, metrics.clone());
567        }
568
569        metrics
570    }
571
572    /// Finds the maximum number of files to read in a single partition range.
573    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
574        partition_ranges
575            .iter()
576            .map(|part_range| {
577                let range_meta = &ranges[part_range.identifier];
578                range_meta.indices.len()
579            })
580            .max()
581            .unwrap_or(0)
582    }
583
584    /// Checks resource limit for the scanner.
585    pub(crate) fn check_scan_limit(&self) -> Result<()> {
586        // Check max file count limit for all partitions since we scan them in parallel.
587        let total_max_files: usize = self
588            .properties
589            .partitions
590            .iter()
591            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
592            .sum();
593
594        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
595        if total_max_files > max_concurrent_files {
596            return TooManyFilesToReadSnafu {
597                actual: total_max_files,
598                max: max_concurrent_files,
599            }
600            .fail();
601        }
602
603        Ok(())
604    }
605}
606
607impl RegionScanner for SeqScan {
608    fn properties(&self) -> &ScannerProperties {
609        &self.properties
610    }
611
612    fn schema(&self) -> SchemaRef {
613        self.stream_ctx.input.mapper.output_schema()
614    }
615
616    fn metadata(&self) -> RegionMetadataRef {
617        self.stream_ctx.input.mapper.metadata().clone()
618    }
619
620    fn scan_partition(
621        &self,
622        ctx: &QueryScanContext,
623        metrics_set: &ExecutionPlanMetricsSet,
624        partition: usize,
625    ) -> Result<SendableRecordBatchStream, BoxedError> {
626        self.scan_partition_impl(ctx, metrics_set, partition)
627            .map_err(BoxedError::new)
628    }
629
630    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
631        self.properties.prepare(request);
632
633        self.check_scan_limit().map_err(BoxedError::new)?;
634
635        Ok(())
636    }
637
638    fn has_predicate(&self) -> bool {
639        let predicate = self.stream_ctx.input.predicate();
640        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
641    }
642
643    fn set_logical_region(&mut self, logical_region: bool) {
644        self.properties.set_logical_region(logical_region);
645    }
646}
647
648impl DisplayAs for SeqScan {
649    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
650        write!(
651            f,
652            "SeqScan: region={}, ",
653            self.stream_ctx.input.mapper.metadata().region_id
654        )?;
655        match t {
656            // TODO(LFC): Implement all the "TreeRender" display format.
657            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
658                self.stream_ctx.format_for_explain(false, f)
659            }
660            DisplayFormatType::Verbose => {
661                self.stream_ctx.format_for_explain(true, f)?;
662                self.metrics_list.format_verbose_metrics(f)
663            }
664        }
665    }
666}
667
668impl fmt::Debug for SeqScan {
669    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
670        f.debug_struct("SeqScan")
671            .field("num_ranges", &self.stream_ctx.ranges.len())
672            .finish()
673    }
674}
675
676/// Builds sources for the partition range and push them to the `sources` vector.
677pub(crate) async fn build_sources(
678    stream_ctx: &Arc<StreamContext>,
679    part_range: &PartitionRange,
680    compaction: bool,
681    part_metrics: &PartitionMetrics,
682    range_builder_list: Arc<RangeBuilderList>,
683    sources: &mut Vec<Source>,
684) -> Result<()> {
685    // Gets range meta.
686    let range_meta = &stream_ctx.ranges[part_range.identifier];
687    #[cfg(debug_assertions)]
688    if compaction {
689        // Compaction expects input sources are not been split.
690        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
691        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
692            // It should scan all row groups.
693            debug_assert_eq!(
694                -1, row_group_idx.row_group_index,
695                "Expect {} range scan all row groups, given: {}",
696                i, row_group_idx.row_group_index,
697            );
698        }
699    }
700
701    sources.reserve(range_meta.row_group_indices.len());
702    for index in &range_meta.row_group_indices {
703        let stream = if stream_ctx.is_mem_range_index(*index) {
704            let stream = scan_mem_ranges(
705                stream_ctx.clone(),
706                part_metrics.clone(),
707                *index,
708                range_meta.time_range,
709            );
710            Box::pin(stream) as _
711        } else if stream_ctx.is_file_range_index(*index) {
712            let read_type = if compaction {
713                "compaction"
714            } else {
715                "seq_scan_files"
716            };
717            let stream = scan_file_ranges(
718                stream_ctx.clone(),
719                part_metrics.clone(),
720                *index,
721                read_type,
722                range_builder_list.clone(),
723            )
724            .await?;
725            Box::pin(stream) as _
726        } else {
727            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
728        };
729        sources.push(Source::Stream(stream));
730    }
731    Ok(())
732}
733
734/// Builds flat sources for the partition range and push them to the `sources` vector.
735pub(crate) async fn build_flat_sources(
736    stream_ctx: &Arc<StreamContext>,
737    part_range: &PartitionRange,
738    compaction: bool,
739    part_metrics: &PartitionMetrics,
740    range_builder_list: Arc<RangeBuilderList>,
741    sources: &mut Vec<BoxedRecordBatchStream>,
742) -> Result<()> {
743    // Gets range meta.
744    let range_meta = &stream_ctx.ranges[part_range.identifier];
745    #[cfg(debug_assertions)]
746    if compaction {
747        // Compaction expects input sources are not been split.
748        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
749        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
750            // It should scan all row groups.
751            debug_assert_eq!(
752                -1, row_group_idx.row_group_index,
753                "Expect {} range scan all row groups, given: {}",
754                i, row_group_idx.row_group_index,
755            );
756        }
757    }
758
759    sources.reserve(range_meta.row_group_indices.len());
760    for index in &range_meta.row_group_indices {
761        let stream = if stream_ctx.is_mem_range_index(*index) {
762            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
763            Box::pin(stream) as _
764        } else if stream_ctx.is_file_range_index(*index) {
765            let read_type = if compaction {
766                "compaction"
767            } else {
768                "seq_scan_files"
769            };
770            let stream = scan_flat_file_ranges(
771                stream_ctx.clone(),
772                part_metrics.clone(),
773                *index,
774                read_type,
775                range_builder_list.clone(),
776            )
777            .await?;
778            Box::pin(stream) as _
779        } else {
780            scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
781        };
782        sources.push(stream);
783    }
784    Ok(())
785}
786
787#[cfg(test)]
788impl SeqScan {
789    /// Returns the input.
790    pub(crate) fn input(&self) -> &ScanInput {
791        &self.stream_ctx.input
792    }
793}
794
795/// Returns the scanner type.
796fn get_scanner_type(compaction: bool) -> &'static str {
797    if compaction {
798        "SeqScan(compaction)"
799    } else {
800        "SeqScan"
801    }
802}