mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
48    scan_flat_mem_ranges, scan_mem_ranges,
49};
50use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
51use crate::read::{
52    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
53};
54use crate::region::options::MergeMode;
55use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
56
57/// Scans a region and returns rows in a sorted sequence.
58///
59/// The output order is always `order by primary keys, time index` inside every
60/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
61pub struct SeqScan {
62    /// Properties of the scanner.
63    properties: ScannerProperties,
64    /// Context of streams.
65    stream_ctx: Arc<StreamContext>,
66    /// Metrics for each partition.
67    /// The scanner only sets in query and keeps it empty during compaction.
68    metrics_list: PartitionMetricsList,
69}
70
71impl SeqScan {
72    /// Creates a new [SeqScan] with the given input.
73    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
74    pub(crate) fn new(input: ScanInput) -> Self {
75        let mut properties = ScannerProperties::default()
76            .with_append_mode(input.append_mode)
77            .with_total_rows(input.total_rows());
78        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
79        properties.partitions = vec![stream_ctx.partition_ranges()];
80
81        Self {
82            properties,
83            stream_ctx,
84            metrics_list: PartitionMetricsList::default(),
85        }
86    }
87
88    /// Builds a stream for the query.
89    ///
90    /// The returned stream is not partitioned and will contains all the data. If want
91    /// partitioned scan, use [`RegionScanner::scan_partition`].
92    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
93        let metrics_set = ExecutionPlanMetricsSet::new();
94        let streams = (0..self.properties.partitions.len())
95            .map(|partition: usize| {
96                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
97            })
98            .collect::<Result<Vec<_>, _>>()?;
99
100        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
101        Ok(Box::pin(aggr_stream))
102    }
103
104    /// Scan [`Batch`] in all partitions one by one.
105    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
106        let metrics_set = ExecutionPlanMetricsSet::new();
107
108        let streams = (0..self.properties.partitions.len())
109            .map(|partition| {
110                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
111                self.scan_batch_in_partition(partition, metrics)
112            })
113            .collect::<Result<Vec<_>>>()?;
114
115        Ok(Box::pin(futures::stream::iter(streams).flatten()))
116    }
117
118    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
119    ///
120    /// # Panics
121    /// Panics if the compaction flag is not set.
122    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
123        assert!(self.stream_ctx.input.compaction);
124
125        let metrics_set = ExecutionPlanMetricsSet::new();
126        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
127        debug_assert_eq!(1, self.properties.partitions.len());
128        let partition_ranges = &self.properties.partitions[0];
129
130        let reader = Self::merge_all_ranges_for_compaction(
131            &self.stream_ctx,
132            partition_ranges,
133            &part_metrics,
134        )
135        .await?;
136        Ok(Box::new(reader))
137    }
138
139    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
140    ///
141    /// # Panics
142    /// Panics if the compaction flag is not set.
143    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
144        assert!(self.stream_ctx.input.compaction);
145
146        let metrics_set = ExecutionPlanMetricsSet::new();
147        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
148        debug_assert_eq!(1, self.properties.partitions.len());
149        let partition_ranges = &self.properties.partitions[0];
150
151        let reader = Self::merge_all_flat_ranges_for_compaction(
152            &self.stream_ctx,
153            partition_ranges,
154            &part_metrics,
155        )
156        .await?;
157        Ok(reader)
158    }
159
160    /// Builds a merge reader that reads all ranges.
161    /// Callers MUST not split ranges before calling this method.
162    async fn merge_all_ranges_for_compaction(
163        stream_ctx: &Arc<StreamContext>,
164        partition_ranges: &[PartitionRange],
165        part_metrics: &PartitionMetrics,
166    ) -> Result<BoxedBatchReader> {
167        let mut sources = Vec::new();
168        let range_builder_list = Arc::new(RangeBuilderList::new(
169            stream_ctx.input.num_memtables(),
170            stream_ctx.input.num_files(),
171        ));
172        for part_range in partition_ranges {
173            build_sources(
174                stream_ctx,
175                part_range,
176                true,
177                part_metrics,
178                range_builder_list.clone(),
179                &mut sources,
180            )
181            .await?;
182        }
183
184        common_telemetry::debug!(
185            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
186            stream_ctx.input.mapper.metadata().region_id,
187            partition_ranges.len(),
188            sources.len()
189        );
190        Self::build_reader_from_sources(stream_ctx, sources, None).await
191    }
192
193    /// Builds a merge reader that reads all flat ranges.
194    /// Callers MUST not split ranges before calling this method.
195    async fn merge_all_flat_ranges_for_compaction(
196        stream_ctx: &Arc<StreamContext>,
197        partition_ranges: &[PartitionRange],
198        part_metrics: &PartitionMetrics,
199    ) -> Result<BoxedRecordBatchStream> {
200        let mut sources = Vec::new();
201        let range_builder_list = Arc::new(RangeBuilderList::new(
202            stream_ctx.input.num_memtables(),
203            stream_ctx.input.num_files(),
204        ));
205        for part_range in partition_ranges {
206            build_flat_sources(
207                stream_ctx,
208                part_range,
209                true,
210                part_metrics,
211                range_builder_list.clone(),
212                &mut sources,
213            )
214            .await?;
215        }
216
217        common_telemetry::debug!(
218            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
219            stream_ctx.input.mapper.metadata().region_id,
220            partition_ranges.len(),
221            sources.len()
222        );
223        Self::build_flat_reader_from_sources(stream_ctx, sources, None).await
224    }
225
226    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
227    /// if possible.
228    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
229    pub(crate) async fn build_reader_from_sources(
230        stream_ctx: &StreamContext,
231        mut sources: Vec<Source>,
232        semaphore: Option<Arc<Semaphore>>,
233    ) -> Result<BoxedBatchReader> {
234        if let Some(semaphore) = semaphore.as_ref() {
235            // Read sources in parallel.
236            if sources.len() > 1 {
237                sources = stream_ctx
238                    .input
239                    .create_parallel_sources(sources, semaphore.clone())?;
240            }
241        }
242
243        let mut builder = MergeReaderBuilder::from_sources(sources);
244        let reader = builder.build().await?;
245
246        let dedup = !stream_ctx.input.append_mode;
247        let reader = if dedup {
248            match stream_ctx.input.merge_mode {
249                MergeMode::LastRow => Box::new(DedupReader::new(
250                    reader,
251                    LastRow::new(stream_ctx.input.filter_deleted),
252                )) as _,
253                MergeMode::LastNonNull => Box::new(DedupReader::new(
254                    reader,
255                    LastNonNull::new(stream_ctx.input.filter_deleted),
256                )) as _,
257            }
258        } else {
259            Box::new(reader) as _
260        };
261
262        let reader = match &stream_ctx.input.series_row_selector {
263            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
264            None => reader,
265        };
266
267        Ok(reader)
268    }
269
270    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
271    /// if possible.
272    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
273    pub(crate) async fn build_flat_reader_from_sources(
274        stream_ctx: &StreamContext,
275        mut sources: Vec<BoxedRecordBatchStream>,
276        semaphore: Option<Arc<Semaphore>>,
277    ) -> Result<BoxedRecordBatchStream> {
278        if let Some(semaphore) = semaphore.as_ref() {
279            // Read sources in parallel.
280            if sources.len() > 1 {
281                sources = stream_ctx
282                    .input
283                    .create_parallel_flat_sources(sources, semaphore.clone())?;
284            }
285        }
286
287        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
288        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
289
290        let reader = FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE).await?;
291
292        let dedup = !stream_ctx.input.append_mode;
293        let reader = if dedup {
294            match stream_ctx.input.merge_mode {
295                MergeMode::LastRow => Box::pin(
296                    FlatDedupReader::new(
297                        reader.into_stream().boxed(),
298                        FlatLastRow::new(stream_ctx.input.filter_deleted),
299                    )
300                    .into_stream(),
301                ) as _,
302                MergeMode::LastNonNull => Box::pin(
303                    FlatDedupReader::new(
304                        reader.into_stream().boxed(),
305                        FlatLastNonNull::new(
306                            mapper.field_column_start(),
307                            stream_ctx.input.filter_deleted,
308                        ),
309                    )
310                    .into_stream(),
311                ) as _,
312            }
313        } else {
314            Box::pin(reader.into_stream()) as _
315        };
316
317        Ok(reader)
318    }
319
320    /// Scans the given partition when the part list is set properly.
321    /// Otherwise the returned stream might not contains any data.
322    fn scan_partition_impl(
323        &self,
324        ctx: &QueryScanContext,
325        metrics_set: &ExecutionPlanMetricsSet,
326        partition: usize,
327    ) -> Result<SendableRecordBatchStream> {
328        if ctx.explain_verbose {
329            common_telemetry::info!(
330                "SeqScan partition {}, region_id: {}",
331                partition,
332                self.stream_ctx.input.region_metadata().region_id
333            );
334        }
335
336        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
337        let input = &self.stream_ctx.input;
338
339        let batch_stream = if input.flat_format {
340            // Use flat scan for bulk memtables
341            self.scan_flat_batch_in_partition(partition, metrics.clone())?
342        } else {
343            // Use regular batch scan for normal memtables
344            self.scan_batch_in_partition(partition, metrics.clone())?
345        };
346        let record_batch_stream = ConvertBatchStream::new(
347            batch_stream,
348            input.mapper.clone(),
349            input.cache_strategy.clone(),
350            metrics,
351        );
352
353        Ok(Box::pin(RecordBatchStreamWrapper::new(
354            input.mapper.output_schema(),
355            Box::pin(record_batch_stream),
356        )))
357    }
358
359    fn scan_batch_in_partition(
360        &self,
361        partition: usize,
362        part_metrics: PartitionMetrics,
363    ) -> Result<ScanBatchStream> {
364        ensure!(
365            partition < self.properties.partitions.len(),
366            PartitionOutOfRangeSnafu {
367                given: partition,
368                all: self.properties.partitions.len(),
369            }
370        );
371
372        if self.properties.partitions[partition].is_empty() {
373            return Ok(Box::pin(futures::stream::empty()));
374        }
375
376        let stream_ctx = self.stream_ctx.clone();
377        let semaphore = self.new_semaphore();
378        let partition_ranges = self.properties.partitions[partition].clone();
379        let compaction = self.stream_ctx.input.compaction;
380        let distinguish_range = self.properties.distinguish_partition_range;
381
382        let stream = try_stream! {
383            part_metrics.on_first_poll();
384
385            let range_builder_list = Arc::new(RangeBuilderList::new(
386                stream_ctx.input.num_memtables(),
387                stream_ctx.input.num_files(),
388            ));
389            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
390                reason: "Unexpected format",
391            })?;
392            // Scans each part.
393            for part_range in partition_ranges {
394                let mut sources = Vec::new();
395                build_sources(
396                    &stream_ctx,
397                    &part_range,
398                    compaction,
399                    &part_metrics,
400                    range_builder_list.clone(),
401                    &mut sources,
402                ).await?;
403
404                let mut metrics = ScannerMetrics::default();
405                let mut fetch_start = Instant::now();
406                let mut reader =
407                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
408                        .await?;
409                #[cfg(debug_assertions)]
410                let mut checker = crate::read::BatchChecker::default()
411                    .with_start(Some(part_range.start))
412                    .with_end(Some(part_range.end));
413
414                while let Some(batch) = reader.next_batch().await? {
415                    metrics.scan_cost += fetch_start.elapsed();
416                    metrics.num_batches += 1;
417                    metrics.num_rows += batch.num_rows();
418
419                    debug_assert!(!batch.is_empty());
420                    if batch.is_empty() {
421                        continue;
422                    }
423
424                    #[cfg(debug_assertions)]
425                    checker.ensure_part_range_batch(
426                        "SeqScan",
427                        _mapper.metadata().region_id,
428                        partition,
429                        part_range,
430                        &batch,
431                    );
432
433                    let yield_start = Instant::now();
434                    yield ScanBatch::Normal(batch);
435                    metrics.yield_cost += yield_start.elapsed();
436
437                    fetch_start = Instant::now();
438                }
439
440                // Yields an empty part to indicate this range is terminated.
441                // The query engine can use this to optimize some queries.
442                if distinguish_range {
443                    let yield_start = Instant::now();
444                    yield ScanBatch::Normal(Batch::empty());
445                    metrics.yield_cost += yield_start.elapsed();
446                }
447
448                metrics.scan_cost += fetch_start.elapsed();
449                part_metrics.merge_metrics(&metrics);
450            }
451
452            part_metrics.on_finish();
453        };
454        Ok(Box::pin(stream))
455    }
456
457    fn scan_flat_batch_in_partition(
458        &self,
459        partition: usize,
460        part_metrics: PartitionMetrics,
461    ) -> Result<ScanBatchStream> {
462        ensure!(
463            partition < self.properties.partitions.len(),
464            PartitionOutOfRangeSnafu {
465                given: partition,
466                all: self.properties.partitions.len(),
467            }
468        );
469
470        if self.properties.partitions[partition].is_empty() {
471            return Ok(Box::pin(futures::stream::empty()));
472        }
473
474        let stream_ctx = self.stream_ctx.clone();
475        let semaphore = self.new_semaphore();
476        let partition_ranges = self.properties.partitions[partition].clone();
477        let compaction = self.stream_ctx.input.compaction;
478
479        let stream = try_stream! {
480            part_metrics.on_first_poll();
481
482            let range_builder_list = Arc::new(RangeBuilderList::new(
483                stream_ctx.input.num_memtables(),
484                stream_ctx.input.num_files(),
485            ));
486            // Scans each part.
487            for part_range in partition_ranges {
488                let mut sources = Vec::new();
489                build_flat_sources(
490                    &stream_ctx,
491                    &part_range,
492                    compaction,
493                    &part_metrics,
494                    range_builder_list.clone(),
495                    &mut sources,
496                ).await?;
497
498                let mut metrics = ScannerMetrics::default();
499                let mut fetch_start = Instant::now();
500                let mut reader =
501                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone())
502                        .await?;
503
504                while let Some(record_batch) = reader.try_next().await? {
505                    metrics.scan_cost += fetch_start.elapsed();
506                    metrics.num_batches += 1;
507                    metrics.num_rows += record_batch.num_rows();
508
509                    debug_assert!(record_batch.num_rows() > 0);
510                    if record_batch.num_rows() == 0 {
511                        continue;
512                    }
513
514                    let yield_start = Instant::now();
515                    yield ScanBatch::RecordBatch(record_batch);
516                    metrics.yield_cost += yield_start.elapsed();
517
518                    fetch_start = Instant::now();
519                }
520
521                metrics.scan_cost += fetch_start.elapsed();
522                part_metrics.merge_metrics(&metrics);
523            }
524
525            part_metrics.on_finish();
526        };
527        Ok(Box::pin(stream))
528    }
529
530    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
531        if self.properties.target_partitions() > self.properties.num_partitions() {
532            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
533            // This semaphore is partition level.
534            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
535            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
536            // files in a part range.
537            Some(Arc::new(Semaphore::new(
538                self.properties.target_partitions() - self.properties.num_partitions() + 1,
539            )))
540        } else {
541            None
542        }
543    }
544
545    /// Creates a new partition metrics instance.
546    /// Sets the partition metrics for the given partition if it is not for compaction.
547    fn new_partition_metrics(
548        &self,
549        explain_verbose: bool,
550        metrics_set: &ExecutionPlanMetricsSet,
551        partition: usize,
552    ) -> PartitionMetrics {
553        let metrics = PartitionMetrics::new(
554            self.stream_ctx.input.mapper.metadata().region_id,
555            partition,
556            get_scanner_type(self.stream_ctx.input.compaction),
557            self.stream_ctx.query_start,
558            explain_verbose,
559            metrics_set,
560        );
561
562        if !self.stream_ctx.input.compaction {
563            self.metrics_list.set(partition, metrics.clone());
564        }
565
566        metrics
567    }
568
569    /// Finds the maximum number of files to read in a single partition range.
570    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
571        partition_ranges
572            .iter()
573            .map(|part_range| {
574                let range_meta = &ranges[part_range.identifier];
575                range_meta.indices.len()
576            })
577            .max()
578            .unwrap_or(0)
579    }
580
581    /// Checks resource limit for the scanner.
582    pub(crate) fn check_scan_limit(&self) -> Result<()> {
583        // Check max file count limit for all partitions since we scan them in parallel.
584        let total_max_files: usize = self
585            .properties
586            .partitions
587            .iter()
588            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
589            .sum();
590
591        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
592        if total_max_files > max_concurrent_files {
593            return TooManyFilesToReadSnafu {
594                actual: total_max_files,
595                max: max_concurrent_files,
596            }
597            .fail();
598        }
599
600        Ok(())
601    }
602}
603
604impl RegionScanner for SeqScan {
605    fn properties(&self) -> &ScannerProperties {
606        &self.properties
607    }
608
609    fn schema(&self) -> SchemaRef {
610        self.stream_ctx.input.mapper.output_schema()
611    }
612
613    fn metadata(&self) -> RegionMetadataRef {
614        self.stream_ctx.input.mapper.metadata().clone()
615    }
616
617    fn scan_partition(
618        &self,
619        ctx: &QueryScanContext,
620        metrics_set: &ExecutionPlanMetricsSet,
621        partition: usize,
622    ) -> Result<SendableRecordBatchStream, BoxedError> {
623        self.scan_partition_impl(ctx, metrics_set, partition)
624            .map_err(BoxedError::new)
625    }
626
627    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
628        self.properties.prepare(request);
629
630        self.check_scan_limit().map_err(BoxedError::new)?;
631
632        Ok(())
633    }
634
635    fn has_predicate_without_region(&self) -> bool {
636        let predicate = self
637            .stream_ctx
638            .input
639            .predicate_group()
640            .predicate_without_region();
641        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
642    }
643
644    fn set_logical_region(&mut self, logical_region: bool) {
645        self.properties.set_logical_region(logical_region);
646    }
647}
648
649impl DisplayAs for SeqScan {
650    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
651        write!(
652            f,
653            "SeqScan: region={}, ",
654            self.stream_ctx.input.mapper.metadata().region_id
655        )?;
656        match t {
657            // TODO(LFC): Implement all the "TreeRender" display format.
658            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
659                self.stream_ctx.format_for_explain(false, f)
660            }
661            DisplayFormatType::Verbose => {
662                self.stream_ctx.format_for_explain(true, f)?;
663                self.metrics_list.format_verbose_metrics(f)
664            }
665        }
666    }
667}
668
669impl fmt::Debug for SeqScan {
670    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
671        f.debug_struct("SeqScan")
672            .field("num_ranges", &self.stream_ctx.ranges.len())
673            .finish()
674    }
675}
676
677/// Builds sources for the partition range and push them to the `sources` vector.
678pub(crate) async fn build_sources(
679    stream_ctx: &Arc<StreamContext>,
680    part_range: &PartitionRange,
681    compaction: bool,
682    part_metrics: &PartitionMetrics,
683    range_builder_list: Arc<RangeBuilderList>,
684    sources: &mut Vec<Source>,
685) -> Result<()> {
686    // Gets range meta.
687    let range_meta = &stream_ctx.ranges[part_range.identifier];
688    #[cfg(debug_assertions)]
689    if compaction {
690        // Compaction expects input sources are not been split.
691        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
692        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
693            // It should scan all row groups.
694            debug_assert_eq!(
695                -1, row_group_idx.row_group_index,
696                "Expect {} range scan all row groups, given: {}",
697                i, row_group_idx.row_group_index,
698            );
699        }
700    }
701
702    sources.reserve(range_meta.row_group_indices.len());
703    for index in &range_meta.row_group_indices {
704        let stream = if stream_ctx.is_mem_range_index(*index) {
705            let stream = scan_mem_ranges(
706                stream_ctx.clone(),
707                part_metrics.clone(),
708                *index,
709                range_meta.time_range,
710            );
711            Box::pin(stream) as _
712        } else if stream_ctx.is_file_range_index(*index) {
713            let read_type = if compaction {
714                "compaction"
715            } else {
716                "seq_scan_files"
717            };
718            let stream = scan_file_ranges(
719                stream_ctx.clone(),
720                part_metrics.clone(),
721                *index,
722                read_type,
723                range_builder_list.clone(),
724            )
725            .await?;
726            Box::pin(stream) as _
727        } else {
728            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
729        };
730        sources.push(Source::Stream(stream));
731    }
732    Ok(())
733}
734
735/// Builds flat sources for the partition range and push them to the `sources` vector.
736pub(crate) async fn build_flat_sources(
737    stream_ctx: &Arc<StreamContext>,
738    part_range: &PartitionRange,
739    compaction: bool,
740    part_metrics: &PartitionMetrics,
741    range_builder_list: Arc<RangeBuilderList>,
742    sources: &mut Vec<BoxedRecordBatchStream>,
743) -> Result<()> {
744    // Gets range meta.
745    let range_meta = &stream_ctx.ranges[part_range.identifier];
746    #[cfg(debug_assertions)]
747    if compaction {
748        // Compaction expects input sources are not been split.
749        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
750        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
751            // It should scan all row groups.
752            debug_assert_eq!(
753                -1, row_group_idx.row_group_index,
754                "Expect {} range scan all row groups, given: {}",
755                i, row_group_idx.row_group_index,
756            );
757        }
758    }
759
760    sources.reserve(range_meta.row_group_indices.len());
761    for index in &range_meta.row_group_indices {
762        let stream = if stream_ctx.is_mem_range_index(*index) {
763            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
764            Box::pin(stream) as _
765        } else if stream_ctx.is_file_range_index(*index) {
766            let read_type = if compaction {
767                "compaction"
768            } else {
769                "seq_scan_files"
770            };
771            let stream = scan_flat_file_ranges(
772                stream_ctx.clone(),
773                part_metrics.clone(),
774                *index,
775                read_type,
776                range_builder_list.clone(),
777            )
778            .await?;
779            Box::pin(stream) as _
780        } else {
781            scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?
782        };
783        sources.push(stream);
784    }
785    Ok(())
786}
787
788#[cfg(test)]
789impl SeqScan {
790    /// Returns the input.
791    pub(crate) fn input(&self) -> &ScanInput {
792        &self.stream_ctx.input
793    }
794}
795
796/// Returns the scanner type.
797fn get_scanner_type(compaction: bool) -> &'static str {
798    if compaction {
799        "SeqScan(compaction)"
800    } else {
801        "SeqScan"
802    }
803}