mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48    scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57/// Scans a region and returns rows in a sorted sequence.
58///
59/// The output order is always `order by primary keys, time index` inside every
60/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
61pub struct SeqScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// Metrics for each partition.
67    /// The scanner only sets in query and keeps it empty during compaction.
68    metrics_list: PartitionMetricsList,
69}
70
71impl SeqScan {
72    /// Creates a new [SeqScan] with the given input.
73    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
74    pub(crate) fn new(input: ScanInput) -> Self {
75        let mut properties = ScannerProperties::default()
76            .with_append_mode(input.append_mode)
77            .with_total_rows(input.total_rows());
78        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
79        properties.partitions = vec![stream_ctx.partition_ranges()];
80
81        Self {
82            properties,
83            stream_ctx,
84            metrics_list: PartitionMetricsList::default(),
85        }
86    }
87
88    /// Builds a stream for the query.
89    ///
90    /// The returned stream is not partitioned and will contains all the data. If want
91    /// partitioned scan, use [`RegionScanner::scan_partition`].
92    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93        let metrics_set = ExecutionPlanMetricsSet::new();
94        let streams = (0..self.properties.partitions.len())
95            .map(|partition: usize| {
96                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
97            })
98            .collect::<Result<Vec<_>, _>>()?;
99
100        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
101        Ok(Box::pin(aggr_stream))
102    }
103
104    /// Scan [`Batch`] in all partitions one by one.
105    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
106        let metrics_set = ExecutionPlanMetricsSet::new();
107
108        let streams = (0..self.properties.partitions.len())
109            .map(|partition| {
110                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
111                self.scan_batch_in_partition(partition, metrics)
112            })
113            .collect::<Result<Vec<_>>>()?;
114
115        Ok(Box::pin(futures::stream::iter(streams).flatten()))
116    }
117
118    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
119    ///
120    /// # Panics
121    /// Panics if the compaction flag is not set.
122    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
123        assert!(self.stream_ctx.input.compaction);
124
125        let metrics_set = ExecutionPlanMetricsSet::new();
126        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
127        debug_assert_eq!(1, self.properties.partitions.len());
128        let partition_ranges = &self.properties.partitions[0];
129
130        let reader = Self::merge_all_ranges_for_compaction(
131            &self.stream_ctx,
132            partition_ranges,
133            &part_metrics,
134        )
135        .await?;
136        Ok(Box::new(reader))
137    }
138
139    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
140    ///
141    /// # Panics
142    /// Panics if the compaction flag is not set.
143    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
144        assert!(self.stream_ctx.input.compaction);
145
146        let metrics_set = ExecutionPlanMetricsSet::new();
147        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
148        debug_assert_eq!(1, self.properties.partitions.len());
149        let partition_ranges = &self.properties.partitions[0];
150
151        let reader = Self::merge_all_flat_ranges_for_compaction(
152            &self.stream_ctx,
153            partition_ranges,
154            &part_metrics,
155        )
156        .await?;
157        Ok(reader)
158    }
159
160    /// Builds a merge reader that reads all ranges.
161    /// Callers MUST not split ranges before calling this method.
162    async fn merge_all_ranges_for_compaction(
163        stream_ctx: &Arc<StreamContext>,
164        partition_ranges: &[PartitionRange],
165        part_metrics: &PartitionMetrics,
166    ) -> Result<BoxedBatchReader> {
167        let mut sources = Vec::new();
168        let range_builder_list = Arc::new(RangeBuilderList::new(
169            stream_ctx.input.num_memtables(),
170            stream_ctx.input.num_files(),
171        ));
172        for part_range in partition_ranges {
173            build_sources(
174                stream_ctx,
175                part_range,
176                true,
177                part_metrics,
178                range_builder_list.clone(),
179                &mut sources,
180            )
181            .await?;
182        }
183
184        common_telemetry::debug!(
185            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
186            stream_ctx.input.mapper.metadata().region_id,
187            partition_ranges.len(),
188            sources.len()
189        );
190        Self::build_reader_from_sources(stream_ctx, sources, None).await
191    }
192
193    /// Builds a merge reader that reads all flat ranges.
194    /// Callers MUST not split ranges before calling this method.
195    async fn merge_all_flat_ranges_for_compaction(
196        stream_ctx: &Arc<StreamContext>,
197        partition_ranges: &[PartitionRange],
198        part_metrics: &PartitionMetrics,
199    ) -> Result<BoxedRecordBatchStream> {
200        let mut sources = Vec::new();
201        let range_builder_list = Arc::new(RangeBuilderList::new(
202            stream_ctx.input.num_memtables(),
203            stream_ctx.input.num_files(),
204        ));
205        for part_range in partition_ranges {
206            build_flat_sources(
207                stream_ctx,
208                part_range,
209                true,
210                part_metrics,
211                range_builder_list.clone(),
212                &mut sources,
213            )
214            .await?;
215        }
216
217        common_telemetry::debug!(
218            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
219            stream_ctx.input.mapper.metadata().region_id,
220            partition_ranges.len(),
221            sources.len()
222        );
223        Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
224    }
225
226    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
227    /// if possible.
228    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
229    pub(crate) async fn build_reader_from_sources(
230        stream_ctx: &StreamContext,
231        mut sources: Vec<Source>,
232        semaphore: Option<Arc<Semaphore>>,
233    ) -> Result<BoxedBatchReader> {
234        if let Some(semaphore) = semaphore.as_ref() {
235            // Read sources in parallel.
236            if sources.len() > 1 {
237                sources = stream_ctx
238                    .input
239                    .create_parallel_sources(sources, semaphore.clone())?;
240            }
241        }
242
243        let mut builder = MergeReaderBuilder::from_sources(sources);
244        let reader = builder.build().await?;
245
246        let dedup = !stream_ctx.input.append_mode;
247        let reader = if dedup {
248            match stream_ctx.input.merge_mode {
249                MergeMode::LastRow => Box::new(DedupReader::new(
250                    reader,
251                    LastRow::new(stream_ctx.input.filter_deleted),
252                )) as _,
253                MergeMode::LastNonNull => Box::new(DedupReader::new(
254                    reader,
255                    LastNonNull::new(stream_ctx.input.filter_deleted),
256                )) as _,
257            }
258        } else {
259            Box::new(reader) as _
260        };
261
262        let reader = match &stream_ctx.input.series_row_selector {
263            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
264            None => reader,
265        };
266
267        Ok(reader)
268    }
269
270    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
271    /// if possible.
272    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273    pub(crate) async fn build_flat_reader_from_sources(
274        stream_ctx: &StreamContext,
275        mut sources: Vec<BoxedRecordBatchStream>,
276        semaphore: Option<Arc<Semaphore>>,
277    ) -> Result<BoxedRecordBatchStream> {
278        if let Some(semaphore) = semaphore.as_ref() {
279            // Read sources in parallel.
280            if sources.len() > 1 {
281                sources = stream_ctx
282                    .input
283                    .create_parallel_flat_sources(sources, semaphore.clone())?;
284            }
285        }
286
287        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
288        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
289
290        let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
291
292        let dedup = !stream_ctx.input.append_mode;
293        let reader = if dedup {
294            match stream_ctx.input.merge_mode {
295                MergeMode::LastRow => Box::pin(
296                    FlatDedupReader::new(
297                        reader.into_stream().boxed(),
298                        FlatLastRow::new(stream_ctx.input.filter_deleted),
299                    )
300                    .into_stream(),
301                ) as _,
302                MergeMode::LastNonNull => Box::pin(
303                    FlatDedupReader::new(
304                        reader.into_stream().boxed(),
305                        FlatLastNonNull::new(
306                            mapper.field_column_start(),
307                            stream_ctx.input.filter_deleted,
308                        ),
309                    )
310                    .into_stream(),
311                ) as _,
312            }
313        } else {
314            Box::pin(reader.into_stream()) as _
315        };
316
317        Ok(reader)
318    }
319
320    /// Scans the given partition when the part list is set properly.
321    /// Otherwise the returned stream might not contains any data.
322    fn scan_partition_impl(
323        &self,
324        ctx: &QueryScanContext,
325        metrics_set: &ExecutionPlanMetricsSet,
326        partition: usize,
327    ) -> Result<SendableRecordBatchStream> {
328        if ctx.explain_verbose {
329            common_telemetry::info!(
330                "SeqScan partition {}, region_id: {}",
331                partition,
332                self.stream_ctx.input.region_metadata().region_id
333            );
334        }
335
336        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
337        let input = &self.stream_ctx.input;
338
339        let batch_stream = if input.flat_format {
340            // Use flat scan for bulk memtables
341            self.scan_flat_batch_in_partition(partition, metrics.clone())?
342        } else {
343            // Use regular batch scan for normal memtables
344            self.scan_batch_in_partition(partition, metrics.clone())?
345        };
346        let record_batch_stream = ConvertBatchStream::new(
347            batch_stream,
348            input.mapper.clone(),
349            input.cache_strategy.clone(),
350            metrics,
351        );
352
353        Ok(Box::pin(RecordBatchStreamWrapper::new(
354            input.mapper.output_schema(),
355            Box::pin(record_batch_stream),
356        )))
357    }
358
359    fn scan_batch_in_partition(
360        &self,
361        partition: usize,
362        part_metrics: PartitionMetrics,
363    ) -> Result<ScanBatchStream> {
364        ensure!(
365            partition < self.properties.partitions.len(),
366            PartitionOutOfRangeSnafu {
367                given: partition,
368                all: self.properties.partitions.len(),
369            }
370        );
371
372        if self.properties.partitions[partition].is_empty() {
373            return Ok(Box::pin(futures::stream::empty()));
374        }
375
376        let stream_ctx = self.stream_ctx.clone();
377        let semaphore = self.new_semaphore();
378        let partition_ranges = self.properties.partitions[partition].clone();
379        let compaction = self.stream_ctx.input.compaction;
380        let distinguish_range = self.properties.distinguish_partition_range;
381
382        let stream = try_stream! {
383            part_metrics.on_first_poll();
384
385            let range_builder_list = Arc::new(RangeBuilderList::new(
386                stream_ctx.input.num_memtables(),
387                stream_ctx.input.num_files(),
388            ));
389            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
390                reason: "Unexpected format",
391            })?;
392            // Scans each part.
393            for part_range in partition_ranges {
394                let mut sources = Vec::new();
395                build_sources(
396                    &stream_ctx,
397                    &part_range,
398                    compaction,
399                    &part_metrics,
400                    range_builder_list.clone(),
401                    &mut sources,
402                ).await?;
403
404                let mut metrics = ScannerMetrics::default();
405                let mut fetch_start = Instant::now();
406                let mut reader =
407                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
408                        .await?;
409                #[cfg(debug_assertions)]
410                let mut checker = crate::read::BatchChecker::default()
411                    .with_start(Some(part_range.start))
412                    .with_end(Some(part_range.end));
413
414                while let Some(batch) = reader.next_batch().await? {
415                    metrics.scan_cost += fetch_start.elapsed();
416                    metrics.num_batches += 1;
417                    metrics.num_rows += batch.num_rows();
418
419                    debug_assert!(!batch.is_empty());
420                    if batch.is_empty() {
421                        continue;
422                    }
423
424                    #[cfg(debug_assertions)]
425                    checker.ensure_part_range_batch(
426                        "SeqScan",
427                        _mapper.metadata().region_id,
428                        partition,
429                        part_range,
430                        &batch,
431                    );
432
433                    let yield_start = Instant::now();
434                    yield ScanBatch::Normal(batch);
435                    metrics.yield_cost += yield_start.elapsed();
436
437                    fetch_start = Instant::now();
438                }
439
440                // Yields an empty part to indicate this range is terminated.
441                // The query engine can use this to optimize some queries.
442                if distinguish_range {
443                    let yield_start = Instant::now();
444                    yield ScanBatch::Normal(Batch::empty());
445                    metrics.yield_cost += yield_start.elapsed();
446                }
447
448                metrics.scan_cost += fetch_start.elapsed();
449                part_metrics.merge_metrics(&metrics);
450            }
451
452            part_metrics.on_finish();
453        };
454        Ok(Box::pin(stream))
455    }
456
457    fn scan_flat_batch_in_partition(
458        &self,
459        partition: usize,
460        part_metrics: PartitionMetrics,
461    ) -> Result<ScanBatchStream> {
462        ensure!(
463            partition < self.properties.partitions.len(),
464            PartitionOutOfRangeSnafu {
465                given: partition,
466                all: self.properties.partitions.len(),
467            }
468        );
469
470        if self.properties.partitions[partition].is_empty() {
471            return Ok(Box::pin(futures::stream::empty()));
472        }
473
474        let stream_ctx = self.stream_ctx.clone();
475        let semaphore = self.new_semaphore();
476        let partition_ranges = self.properties.partitions[partition].clone();
477        let compaction = self.stream_ctx.input.compaction;
478
479        let stream = try_stream! {
480            part_metrics.on_first_poll();
481
482            let range_builder_list = Arc::new(RangeBuilderList::new(
483                stream_ctx.input.num_memtables(),
484                stream_ctx.input.num_files(),
485            ));
486            // Scans each part.
487            for part_range in partition_ranges {
488                let mut sources = Vec::new();
489                build_flat_sources(
490                    &stream_ctx,
491                    &part_range,
492                    compaction,
493                    &part_metrics,
494                    range_builder_list.clone(),
495                    &mut sources,
496                ).await?;
497
498                let mut metrics = ScannerMetrics::default();
499                let mut fetch_start = Instant::now();
500                let mut reader =
501                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
502                        .await?;
503
504                while let Some(record_batch) = reader.try_next().await? {
505                    metrics.scan_cost += fetch_start.elapsed();
506                    metrics.num_batches += 1;
507                    metrics.num_rows += record_batch.num_rows();
508
509                    debug_assert!(record_batch.num_rows() > 0);
510                    if record_batch.num_rows() == 0 {
511                        continue;
512                    }
513
514                    let yield_start = Instant::now();
515                    yield ScanBatch::RecordBatch(record_batch);
516                    metrics.yield_cost += yield_start.elapsed();
517
518                    fetch_start = Instant::now();
519                }
520
521                metrics.scan_cost += fetch_start.elapsed();
522                part_metrics.merge_metrics(&metrics);
523            }
524
525            part_metrics.on_finish();
526        };
527        Ok(Box::pin(stream))
528    }
529
530    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
531        if self.properties.target_partitions() > self.properties.num_partitions() {
532            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
533            // This semaphore is partition level.
534            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
535            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
536            // files in a part range.
537            Some(Arc::new(Semaphore::new(
538                self.properties.target_partitions() - self.properties.num_partitions() + 1,
539            )))
540        } else {
541            None
542        }
543    }
544
545    /// Creates a new partition metrics instance.
546    /// Sets the partition metrics for the given partition if it is not for compaction.
547    fn new_partition_metrics(
548        &self,
549        explain_verbose: bool,
550        metrics_set: &ExecutionPlanMetricsSet,
551        partition: usize,
552    ) -> PartitionMetrics {
553        let metrics = PartitionMetrics::new(
554            self.stream_ctx.input.mapper.metadata().region_id,
555            partition,
556            get_scanner_type(self.stream_ctx.input.compaction),
557            self.stream_ctx.query_start,
558            explain_verbose,
559            metrics_set,
560        );
561
562        if !self.stream_ctx.input.compaction {
563            self.metrics_list.set(partition, metrics.clone());
564        }
565
566        metrics
567    }
568
569    /// Finds the maximum number of files to read in a single partition range.
570    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
571        partition_ranges
572            .iter()
573            .map(|part_range| {
574                let range_meta = &ranges[part_range.identifier];
575                range_meta.indices.len()
576            })
577            .max()
578            .unwrap_or(0)
579    }
580
581    /// Checks resource limit for the scanner.
582    pub(crate) fn check_scan_limit(&self) -> Result<()> {
583        // Check max file count limit for all partitions since we scan them in parallel.
584        let total_max_files: usize = self
585            .properties
586            .partitions
587            .iter()
588            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
589            .sum();
590
591        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
592        if total_max_files > max_concurrent_files {
593            return TooManyFilesToReadSnafu {
594                actual: total_max_files,
595                max: max_concurrent_files,
596            }
597            .fail();
598        }
599
600        Ok(())
601    }
602}
603
604impl RegionScanner for SeqScan {
605    fn properties(&self) -> &ScannerProperties {
606        &self.properties
607    }
608
609    fn schema(&self) -> SchemaRef {
610        self.stream_ctx.input.mapper.output_schema()
611    }
612
613    fn metadata(&self) -> RegionMetadataRef {
614        self.stream_ctx.input.mapper.metadata().clone()
615    }
616
617    fn scan_partition(
618        &self,
619        ctx: &QueryScanContext,
620        metrics_set: &ExecutionPlanMetricsSet,
621        partition: usize,
622    ) -> Result<SendableRecordBatchStream, BoxedError> {
623        self.scan_partition_impl(ctx, metrics_set, partition)
624            .map_err(BoxedError::new)
625    }
626
627    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
628        self.properties.prepare(request);
629
630        self.check_scan_limit().map_err(BoxedError::new)?;
631
632        Ok(())
633    }
634
635    fn has_predicate(&self) -> bool {
636        let predicate = self.stream_ctx.input.predicate();
637        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
638    }
639
640    fn set_logical_region(&mut self, logical_region: bool) {
641        self.properties.set_logical_region(logical_region);
642    }
643}
644
645impl DisplayAs for SeqScan {
646    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
647        write!(
648            f,
649            "SeqScan: region={}, ",
650            self.stream_ctx.input.mapper.metadata().region_id
651        )?;
652        match t {
653            // TODO(LFC): Implement all the "TreeRender" display format.
654            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
655                self.stream_ctx.format_for_explain(false, f)
656            }
657            DisplayFormatType::Verbose => {
658                self.stream_ctx.format_for_explain(true, f)?;
659                self.metrics_list.format_verbose_metrics(f)
660            }
661        }
662    }
663}
664
665impl fmt::Debug for SeqScan {
666    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
667        f.debug_struct("SeqScan")
668            .field("num_ranges", &self.stream_ctx.ranges.len())
669            .finish()
670    }
671}
672
673/// Builds sources for the partition range and push them to the `sources` vector.
674pub(crate) async fn build_sources(
675    stream_ctx: &Arc<StreamContext>,
676    part_range: &PartitionRange,
677    compaction: bool,
678    part_metrics: &PartitionMetrics,
679    range_builder_list: Arc<RangeBuilderList>,
680    sources: &mut Vec<Source>,
681) -> Result<()> {
682    // Gets range meta.
683    let range_meta = &stream_ctx.ranges[part_range.identifier];
684    #[cfg(debug_assertions)]
685    if compaction {
686        // Compaction expects input sources are not been split.
687        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
688        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
689            // It should scan all row groups.
690            debug_assert_eq!(
691                -1, row_group_idx.row_group_index,
692                "Expect {} range scan all row groups, given: {}",
693                i, row_group_idx.row_group_index,
694            );
695        }
696    }
697
698    sources.reserve(range_meta.row_group_indices.len());
699    for index in &range_meta.row_group_indices {
700        let stream = if stream_ctx.is_mem_range_index(*index) {
701            let stream = scan_mem_ranges(
702                stream_ctx.clone(),
703                part_metrics.clone(),
704                *index,
705                range_meta.time_range,
706            );
707            Box::pin(stream) as _
708        } else if stream_ctx.is_file_range_index(*index) {
709            let read_type = if compaction {
710                "compaction"
711            } else {
712                "seq_scan_files"
713            };
714            let stream = scan_file_ranges(
715                stream_ctx.clone(),
716                part_metrics.clone(),
717                *index,
718                read_type,
719                range_builder_list.clone(),
720            )
721            .await?;
722            Box::pin(stream) as _
723        } else {
724            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
725        };
726        sources.push(Source::Stream(stream));
727    }
728    Ok(())
729}
730
731/// Builds flat sources for the partition range and push them to the `sources` vector.
732pub(crate) async fn build_flat_sources(
733    stream_ctx: &Arc<StreamContext>,
734    part_range: &PartitionRange,
735    compaction: bool,
736    part_metrics: &PartitionMetrics,
737    range_builder_list: Arc<RangeBuilderList>,
738    sources: &mut Vec<BoxedRecordBatchStream>,
739) -> Result<()> {
740    // Gets range meta.
741    let range_meta = &stream_ctx.ranges[part_range.identifier];
742    #[cfg(debug_assertions)]
743    if compaction {
744        // Compaction expects input sources are not been split.
745        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
746        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
747            // It should scan all row groups.
748            debug_assert_eq!(
749                -1, row_group_idx.row_group_index,
750                "Expect {} range scan all row groups, given: {}",
751                i, row_group_idx.row_group_index,
752            );
753        }
754    }
755
756    sources.reserve(range_meta.row_group_indices.len());
757    for index in &range_meta.row_group_indices {
758        let stream = if stream_ctx.is_mem_range_index(*index) {
759            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
760            Box::pin(stream) as _
761        } else if stream_ctx.is_file_range_index(*index) {
762            let read_type = if compaction {
763                "compaction"
764            } else {
765                "seq_scan_files"
766            };
767            let stream = scan_flat_file_ranges(
768                stream_ctx.clone(),
769                part_metrics.clone(),
770                *index,
771                read_type,
772                range_builder_list.clone(),
773            )
774            .await?;
775            Box::pin(stream) as _
776        } else {
777            scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
778        };
779        sources.push(stream);
780    }
781    Ok(())
782}
783
784#[cfg(test)]
785impl SeqScan {
786    /// Returns the input.
787    pub(crate) fn input(&self) -> &ScanInput {
788        &self.stream_ctx.input
789    }
790}
791
792/// Returns the scanner type.
793fn get_scanner_type(compaction: bool) -> &'static str {
794    if compaction {
795        "SeqScan(compaction)"
796    } else {
797        "SeqScan"
798    }
799}