mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::pruner::{PartitionPruner, Pruner};
45use crate::read::range::RangeMeta;
46use crate::read::scan_region::{ScanInput, StreamContext};
47use crate::read::scan_util::{
48    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
49    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
50    should_split_flat_batches_for_merge,
51};
52use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
53use crate::read::{
54    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
55};
56use crate::region::options::MergeMode;
57use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
58
59/// Scans a region and returns rows in a sorted sequence.
60///
61/// The output order is always `order by primary keys, time index` inside every
62/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
63pub struct SeqScan {
64    /// Properties of the scanner.
65    properties: ScannerProperties,
66    /// Context of streams.
67    stream_ctx: Arc<StreamContext>,
68    /// Shared pruner for file range building.
69    pruner: Arc<Pruner>,
70    /// Metrics for each partition.
71    /// The scanner only sets in query and keeps it empty during compaction.
72    metrics_list: PartitionMetricsList,
73}
74
75impl SeqScan {
76    /// Creates a new [SeqScan] with the given input.
77    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
78    pub(crate) fn new(input: ScanInput) -> Self {
79        let mut properties = ScannerProperties::default()
80            .with_append_mode(input.append_mode)
81            .with_total_rows(input.total_rows());
82        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
83        properties.partitions = vec![stream_ctx.partition_ranges()];
84
85        // Create the shared pruner with number of workers equal to CPU cores.
86        let num_workers = common_stat::get_total_cpu_cores().max(1);
87        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
88
89        Self {
90            properties,
91            stream_ctx,
92            pruner,
93            metrics_list: PartitionMetricsList::default(),
94        }
95    }
96
97    /// Builds a stream for the query.
98    ///
99    /// The returned stream is not partitioned and will contains all the data. If want
100    /// partitioned scan, use [`RegionScanner::scan_partition`].
101    #[tracing::instrument(
102        skip_all,
103        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
104    )]
105    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
106        let metrics_set = ExecutionPlanMetricsSet::new();
107        let streams = (0..self.properties.partitions.len())
108            .map(|partition: usize| {
109                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
110            })
111            .collect::<Result<Vec<_>, _>>()?;
112
113        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
114        Ok(Box::pin(aggr_stream))
115    }
116
117    /// Scan [`Batch`] in all partitions one by one.
118    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
119        let metrics_set = ExecutionPlanMetricsSet::new();
120
121        let streams = (0..self.properties.partitions.len())
122            .map(|partition| {
123                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
124                self.scan_batch_in_partition(partition, metrics)
125            })
126            .collect::<Result<Vec<_>>>()?;
127
128        Ok(Box::pin(futures::stream::iter(streams).flatten()))
129    }
130
131    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
132    ///
133    /// # Panics
134    /// Panics if the compaction flag is not set.
135    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
136        assert!(self.stream_ctx.input.compaction);
137
138        let metrics_set = ExecutionPlanMetricsSet::new();
139        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
140        debug_assert_eq!(1, self.properties.partitions.len());
141        let partition_ranges = &self.properties.partitions[0];
142
143        let reader = Self::merge_all_ranges_for_compaction(
144            &self.stream_ctx,
145            partition_ranges,
146            &part_metrics,
147            self.pruner.clone(),
148        )
149        .await?;
150        Ok(Box::new(reader))
151    }
152
153    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
154    ///
155    /// # Panics
156    /// Panics if the compaction flag is not set.
157    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
158        assert!(self.stream_ctx.input.compaction);
159
160        let metrics_set = ExecutionPlanMetricsSet::new();
161        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
162        debug_assert_eq!(1, self.properties.partitions.len());
163        let partition_ranges = &self.properties.partitions[0];
164
165        let reader = Self::merge_all_flat_ranges_for_compaction(
166            &self.stream_ctx,
167            partition_ranges,
168            &part_metrics,
169            self.pruner.clone(),
170        )
171        .await?;
172        Ok(reader)
173    }
174
175    /// Builds a merge reader that reads all ranges.
176    /// Callers MUST not split ranges before calling this method.
177    async fn merge_all_ranges_for_compaction(
178        stream_ctx: &Arc<StreamContext>,
179        partition_ranges: &[PartitionRange],
180        part_metrics: &PartitionMetrics,
181        pruner: Arc<Pruner>,
182    ) -> Result<BoxedBatchReader> {
183        pruner.add_partition_ranges(partition_ranges);
184        let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
185
186        let mut sources = Vec::new();
187        for part_range in partition_ranges {
188            build_sources(
189                stream_ctx,
190                part_range,
191                true,
192                part_metrics,
193                partition_pruner.clone(),
194                &mut sources,
195                None,
196            )
197            .await?;
198        }
199
200        common_telemetry::debug!(
201            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
202            stream_ctx.input.mapper.metadata().region_id,
203            partition_ranges.len(),
204            sources.len()
205        );
206        Self::build_reader_from_sources(stream_ctx, sources, None, None).await
207    }
208
209    /// Builds a merge reader that reads all flat ranges.
210    /// Callers MUST not split ranges before calling this method.
211    async fn merge_all_flat_ranges_for_compaction(
212        stream_ctx: &Arc<StreamContext>,
213        partition_ranges: &[PartitionRange],
214        part_metrics: &PartitionMetrics,
215        pruner: Arc<Pruner>,
216    ) -> Result<BoxedRecordBatchStream> {
217        pruner.add_partition_ranges(partition_ranges);
218        let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
219
220        let mut sources = Vec::new();
221        for part_range in partition_ranges {
222            build_flat_sources(
223                stream_ctx,
224                part_range,
225                true,
226                part_metrics,
227                partition_pruner.clone(),
228                &mut sources,
229                None,
230            )
231            .await?;
232        }
233
234        common_telemetry::debug!(
235            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
236            stream_ctx.input.mapper.metadata().region_id,
237            partition_ranges.len(),
238            sources.len()
239        );
240        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
241    }
242
243    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
244    /// if possible.
245    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
246    pub(crate) async fn build_reader_from_sources(
247        stream_ctx: &StreamContext,
248        mut sources: Vec<Source>,
249        semaphore: Option<Arc<Semaphore>>,
250        part_metrics: Option<&PartitionMetrics>,
251    ) -> Result<BoxedBatchReader> {
252        if let Some(semaphore) = semaphore.as_ref() {
253            // Read sources in parallel.
254            if sources.len() > 1 {
255                sources = stream_ctx
256                    .input
257                    .create_parallel_sources(sources, semaphore.clone())?;
258            }
259        }
260
261        let mut builder = MergeReaderBuilder::from_sources(sources);
262        if let Some(metrics) = part_metrics {
263            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
264        }
265        let reader = builder.build().await?;
266
267        let dedup = !stream_ctx.input.append_mode;
268        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
269        let reader = if dedup {
270            match stream_ctx.input.merge_mode {
271                MergeMode::LastRow => Box::new(DedupReader::new(
272                    reader,
273                    LastRow::new(stream_ctx.input.filter_deleted),
274                    dedup_metrics_reporter,
275                )) as _,
276                MergeMode::LastNonNull => Box::new(DedupReader::new(
277                    reader,
278                    LastNonNull::new(stream_ctx.input.filter_deleted),
279                    dedup_metrics_reporter,
280                )) as _,
281            }
282        } else {
283            Box::new(reader) as _
284        };
285
286        let reader = match &stream_ctx.input.series_row_selector {
287            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
288            None => reader,
289        };
290
291        Ok(reader)
292    }
293
294    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
295    /// if possible.
296    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
297    pub(crate) async fn build_flat_reader_from_sources(
298        stream_ctx: &StreamContext,
299        mut sources: Vec<BoxedRecordBatchStream>,
300        semaphore: Option<Arc<Semaphore>>,
301        part_metrics: Option<&PartitionMetrics>,
302    ) -> Result<BoxedRecordBatchStream> {
303        if let Some(semaphore) = semaphore.as_ref() {
304            // Read sources in parallel.
305            if sources.len() > 1 {
306                sources = stream_ctx
307                    .input
308                    .create_parallel_flat_sources(sources, semaphore.clone())?;
309            }
310        }
311
312        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
313        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
314
315        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
316        let reader =
317            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
318                .await?;
319
320        let dedup = !stream_ctx.input.append_mode;
321        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
322        let reader = if dedup {
323            match stream_ctx.input.merge_mode {
324                MergeMode::LastRow => Box::pin(
325                    FlatDedupReader::new(
326                        reader.into_stream().boxed(),
327                        FlatLastRow::new(stream_ctx.input.filter_deleted),
328                        dedup_metrics_reporter,
329                    )
330                    .into_stream(),
331                ) as _,
332                MergeMode::LastNonNull => Box::pin(
333                    FlatDedupReader::new(
334                        reader.into_stream().boxed(),
335                        FlatLastNonNull::new(
336                            mapper.field_column_start(),
337                            stream_ctx.input.filter_deleted,
338                        ),
339                        dedup_metrics_reporter,
340                    )
341                    .into_stream(),
342                ) as _,
343            }
344        } else {
345            Box::pin(reader.into_stream()) as _
346        };
347
348        Ok(reader)
349    }
350
351    /// Scans the given partition when the part list is set properly.
352    /// Otherwise the returned stream might not contains any data.
353    fn scan_partition_impl(
354        &self,
355        ctx: &QueryScanContext,
356        metrics_set: &ExecutionPlanMetricsSet,
357        partition: usize,
358    ) -> Result<SendableRecordBatchStream> {
359        if ctx.explain_verbose {
360            common_telemetry::info!(
361                "SeqScan partition {}, region_id: {}",
362                partition,
363                self.stream_ctx.input.region_metadata().region_id
364            );
365        }
366
367        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
368        let input = &self.stream_ctx.input;
369
370        let batch_stream = if input.flat_format {
371            // Use flat scan for bulk memtables
372            self.scan_flat_batch_in_partition(partition, metrics.clone())?
373        } else {
374            // Use regular batch scan for normal memtables
375            self.scan_batch_in_partition(partition, metrics.clone())?
376        };
377        let record_batch_stream = ConvertBatchStream::new(
378            batch_stream,
379            input.mapper.clone(),
380            input.cache_strategy.clone(),
381            metrics,
382        );
383
384        Ok(Box::pin(RecordBatchStreamWrapper::new(
385            input.mapper.output_schema(),
386            Box::pin(record_batch_stream),
387        )))
388    }
389
390    #[tracing::instrument(
391        skip_all,
392        fields(
393            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
394            partition = partition
395        )
396    )]
397    fn scan_batch_in_partition(
398        &self,
399        partition: usize,
400        part_metrics: PartitionMetrics,
401    ) -> Result<ScanBatchStream> {
402        ensure!(
403            partition < self.properties.partitions.len(),
404            PartitionOutOfRangeSnafu {
405                given: partition,
406                all: self.properties.partitions.len(),
407            }
408        );
409
410        if self.properties.partitions[partition].is_empty() {
411            return Ok(Box::pin(futures::stream::empty()));
412        }
413
414        let stream_ctx = self.stream_ctx.clone();
415        let semaphore = self.new_semaphore();
416        let partition_ranges = self.properties.partitions[partition].clone();
417        let compaction = self.stream_ctx.input.compaction;
418        let distinguish_range = self.properties.distinguish_partition_range;
419        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
420        let pruner = self.pruner.clone();
421        // Initializes ref counts for the pruner.
422        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
423        // then the ref count won't be decremented.
424        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
425        pruner.add_partition_ranges(&partition_ranges);
426        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
427
428        let stream = try_stream! {
429            part_metrics.on_first_poll();
430            // Start fetch time before building sources so scan cost contains
431            // build part cost.
432            let mut fetch_start = Instant::now();
433
434            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
435                reason: "Unexpected format",
436            })?;
437            // Scans each part.
438            for part_range in partition_ranges {
439                let mut sources = Vec::new();
440                build_sources(
441                    &stream_ctx,
442                    &part_range,
443                    compaction,
444                    &part_metrics,
445                    partition_pruner.clone(),
446                    &mut sources,
447                    file_scan_semaphore.clone(),
448                ).await?;
449
450                let mut reader =
451                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
452                        .await?;
453                #[cfg(debug_assertions)]
454                let mut checker = crate::read::BatchChecker::default()
455                    .with_start(Some(part_range.start))
456                    .with_end(Some(part_range.end));
457
458                let mut metrics = ScannerMetrics {
459                    scan_cost: fetch_start.elapsed(),
460                    ..Default::default()
461                };
462                fetch_start = Instant::now();
463
464                while let Some(batch) = reader.next_batch().await? {
465                    metrics.scan_cost += fetch_start.elapsed();
466                    metrics.num_batches += 1;
467                    metrics.num_rows += batch.num_rows();
468
469                    debug_assert!(!batch.is_empty());
470                    if batch.is_empty() {
471                        fetch_start = Instant::now();
472                        continue;
473                    }
474
475                    #[cfg(debug_assertions)]
476                    checker.ensure_part_range_batch(
477                        "SeqScan",
478                        _mapper.metadata().region_id,
479                        partition,
480                        part_range,
481                        &batch,
482                    );
483
484                    let yield_start = Instant::now();
485                    yield ScanBatch::Normal(batch);
486                    metrics.yield_cost += yield_start.elapsed();
487
488                    fetch_start = Instant::now();
489                }
490
491                // Yields an empty part to indicate this range is terminated.
492                // The query engine can use this to optimize some queries.
493                if distinguish_range {
494                    let yield_start = Instant::now();
495                    yield ScanBatch::Normal(Batch::empty());
496                    metrics.yield_cost += yield_start.elapsed();
497                }
498
499                metrics.scan_cost += fetch_start.elapsed();
500                fetch_start = Instant::now();
501                part_metrics.merge_metrics(&metrics);
502            }
503
504            part_metrics.on_finish();
505        };
506        Ok(Box::pin(stream))
507    }
508
509    #[tracing::instrument(
510        skip_all,
511        fields(
512            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
513            partition = partition
514        )
515    )]
516    fn scan_flat_batch_in_partition(
517        &self,
518        partition: usize,
519        part_metrics: PartitionMetrics,
520    ) -> Result<ScanBatchStream> {
521        ensure!(
522            partition < self.properties.partitions.len(),
523            PartitionOutOfRangeSnafu {
524                given: partition,
525                all: self.properties.partitions.len(),
526            }
527        );
528
529        if self.properties.partitions[partition].is_empty() {
530            return Ok(Box::pin(futures::stream::empty()));
531        }
532
533        let stream_ctx = self.stream_ctx.clone();
534        let semaphore = self.new_semaphore();
535        let partition_ranges = self.properties.partitions[partition].clone();
536        let compaction = self.stream_ctx.input.compaction;
537        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
538        let pruner = self.pruner.clone();
539        // Initializes ref counts for the pruner.
540        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
541        // then the ref count won't be decremented.
542        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
543        pruner.add_partition_ranges(&partition_ranges);
544        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
545
546        let stream = try_stream! {
547            part_metrics.on_first_poll();
548            // Start fetch time before building sources so scan cost contains
549            // build part cost.
550            let mut fetch_start = Instant::now();
551
552            // Scans each part.
553            for part_range in partition_ranges {
554                let mut sources = Vec::new();
555                build_flat_sources(
556                    &stream_ctx,
557                    &part_range,
558                    compaction,
559                    &part_metrics,
560                    partition_pruner.clone(),
561                    &mut sources,
562                    file_scan_semaphore.clone(),
563                ).await?;
564
565                let mut reader =
566                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
567                        .await?;
568
569                let mut metrics = ScannerMetrics {
570                    scan_cost: fetch_start.elapsed(),
571                    ..Default::default()
572                };
573                fetch_start = Instant::now();
574
575                while let Some(record_batch) = reader.try_next().await? {
576                    metrics.scan_cost += fetch_start.elapsed();
577                    metrics.num_batches += 1;
578                    metrics.num_rows += record_batch.num_rows();
579
580                    debug_assert!(record_batch.num_rows() > 0);
581                    if record_batch.num_rows() == 0 {
582                        fetch_start = Instant::now();
583                        continue;
584                    }
585
586                    let yield_start = Instant::now();
587                    yield ScanBatch::RecordBatch(record_batch);
588                    metrics.yield_cost += yield_start.elapsed();
589
590                    fetch_start = Instant::now();
591                }
592
593                metrics.scan_cost += fetch_start.elapsed();
594                fetch_start = Instant::now();
595                part_metrics.merge_metrics(&metrics);
596            }
597
598            part_metrics.on_finish();
599        };
600        Ok(Box::pin(stream))
601    }
602
603    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
604        if self.properties.target_partitions() > self.properties.num_partitions() {
605            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
606            // This semaphore is partition level.
607            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
608            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
609            // files in a part range.
610            Some(Arc::new(Semaphore::new(
611                self.properties.target_partitions() - self.properties.num_partitions() + 1,
612            )))
613        } else {
614            None
615        }
616    }
617
618    /// Creates a new partition metrics instance.
619    /// Sets the partition metrics for the given partition if it is not for compaction.
620    fn new_partition_metrics(
621        &self,
622        explain_verbose: bool,
623        metrics_set: &ExecutionPlanMetricsSet,
624        partition: usize,
625    ) -> PartitionMetrics {
626        let metrics = PartitionMetrics::new(
627            self.stream_ctx.input.mapper.metadata().region_id,
628            partition,
629            get_scanner_type(self.stream_ctx.input.compaction),
630            self.stream_ctx.query_start,
631            explain_verbose,
632            metrics_set,
633        );
634
635        if !self.stream_ctx.input.compaction {
636            self.metrics_list.set(partition, metrics.clone());
637        }
638
639        metrics
640    }
641
642    /// Finds the maximum number of files to read in a single partition range.
643    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
644        partition_ranges
645            .iter()
646            .map(|part_range| {
647                let range_meta = &ranges[part_range.identifier];
648                range_meta.indices.len()
649            })
650            .max()
651            .unwrap_or(0)
652    }
653
654    /// Checks resource limit for the scanner.
655    pub(crate) fn check_scan_limit(&self) -> Result<()> {
656        // Check max file count limit for all partitions since we scan them in parallel.
657        let total_max_files: usize = self
658            .properties
659            .partitions
660            .iter()
661            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
662            .sum();
663
664        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
665        if total_max_files > max_concurrent_files {
666            return TooManyFilesToReadSnafu {
667                actual: total_max_files,
668                max: max_concurrent_files,
669            }
670            .fail();
671        }
672
673        Ok(())
674    }
675}
676
677impl RegionScanner for SeqScan {
678    fn name(&self) -> &str {
679        "SeqScan"
680    }
681
682    fn properties(&self) -> &ScannerProperties {
683        &self.properties
684    }
685
686    fn schema(&self) -> SchemaRef {
687        self.stream_ctx.input.mapper.output_schema()
688    }
689
690    fn metadata(&self) -> RegionMetadataRef {
691        self.stream_ctx.input.mapper.metadata().clone()
692    }
693
694    fn scan_partition(
695        &self,
696        ctx: &QueryScanContext,
697        metrics_set: &ExecutionPlanMetricsSet,
698        partition: usize,
699    ) -> Result<SendableRecordBatchStream, BoxedError> {
700        self.scan_partition_impl(ctx, metrics_set, partition)
701            .map_err(BoxedError::new)
702    }
703
704    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
705        self.properties.prepare(request);
706
707        self.check_scan_limit().map_err(BoxedError::new)?;
708
709        Ok(())
710    }
711
712    fn has_predicate_without_region(&self) -> bool {
713        let predicate = self
714            .stream_ctx
715            .input
716            .predicate_group()
717            .predicate_without_region();
718        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
719    }
720
721    fn set_logical_region(&mut self, logical_region: bool) {
722        self.properties.set_logical_region(logical_region);
723    }
724}
725
726impl DisplayAs for SeqScan {
727    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
728        write!(
729            f,
730            "SeqScan: region={}, ",
731            self.stream_ctx.input.mapper.metadata().region_id
732        )?;
733        match t {
734            // TODO(LFC): Implement all the "TreeRender" display format.
735            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
736                self.stream_ctx.format_for_explain(false, f)
737            }
738            DisplayFormatType::Verbose => {
739                self.stream_ctx.format_for_explain(true, f)?;
740                self.metrics_list.format_verbose_metrics(f)
741            }
742        }
743    }
744}
745
746impl fmt::Debug for SeqScan {
747    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
748        f.debug_struct("SeqScan")
749            .field("num_ranges", &self.stream_ctx.ranges.len())
750            .finish()
751    }
752}
753
754/// Builds sources for the partition range and push them to the `sources` vector.
755pub(crate) async fn build_sources(
756    stream_ctx: &Arc<StreamContext>,
757    part_range: &PartitionRange,
758    compaction: bool,
759    part_metrics: &PartitionMetrics,
760    partition_pruner: Arc<PartitionPruner>,
761    sources: &mut Vec<Source>,
762    semaphore: Option<Arc<Semaphore>>,
763) -> Result<()> {
764    // Gets range meta.
765    let range_meta = &stream_ctx.ranges[part_range.identifier];
766    #[cfg(debug_assertions)]
767    if compaction {
768        // Compaction expects input sources are not been split.
769        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
770        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
771            // It should scan all row groups.
772            debug_assert_eq!(
773                -1, row_group_idx.row_group_index,
774                "Expect {} range scan all row groups, given: {}",
775                i, row_group_idx.row_group_index,
776            );
777        }
778    }
779
780    let read_type = if compaction {
781        "compaction"
782    } else {
783        "seq_scan_files"
784    };
785    let num_indices = range_meta.row_group_indices.len();
786    if num_indices == 0 {
787        return Ok(());
788    }
789
790    sources.reserve(num_indices);
791    let mut ordered_sources = Vec::with_capacity(num_indices);
792    ordered_sources.resize_with(num_indices, || None);
793    let mut file_scan_tasks = Vec::new();
794
795    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
796        if stream_ctx.is_mem_range_index(*index) {
797            let stream = scan_mem_ranges(
798                stream_ctx.clone(),
799                part_metrics.clone(),
800                *index,
801                range_meta.time_range,
802            );
803            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
804        } else if stream_ctx.is_file_range_index(*index) {
805            if let Some(semaphore_ref) = semaphore.as_ref() {
806                // run in parallel, controlled by semaphore
807                let stream_ctx = stream_ctx.clone();
808                let part_metrics = part_metrics.clone();
809                let partition_pruner = partition_pruner.clone();
810                let semaphore = Arc::clone(semaphore_ref);
811                let row_group_index = *index;
812                file_scan_tasks.push(async move {
813                    let _permit = semaphore.acquire().await.unwrap();
814                    let stream = scan_file_ranges(
815                        stream_ctx,
816                        part_metrics,
817                        row_group_index,
818                        read_type,
819                        partition_pruner,
820                    )
821                    .await?;
822                    Ok((position, Source::Stream(Box::pin(stream) as _)))
823                });
824            } else {
825                // no semaphore, run sequentially
826                let stream = scan_file_ranges(
827                    stream_ctx.clone(),
828                    part_metrics.clone(),
829                    *index,
830                    read_type,
831                    partition_pruner.clone(),
832                )
833                .await?;
834                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
835            }
836        } else {
837            let stream =
838                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
839            ordered_sources[position] = Some(Source::Stream(stream));
840        }
841    }
842
843    if !file_scan_tasks.is_empty() {
844        let results = futures::future::try_join_all(file_scan_tasks).await?;
845        for (position, source) in results {
846            ordered_sources[position] = Some(source);
847        }
848    }
849
850    for source in ordered_sources.into_iter().flatten() {
851        sources.push(source);
852    }
853    Ok(())
854}
855
856/// Builds flat sources for the partition range and push them to the `sources` vector.
857pub(crate) async fn build_flat_sources(
858    stream_ctx: &Arc<StreamContext>,
859    part_range: &PartitionRange,
860    compaction: bool,
861    part_metrics: &PartitionMetrics,
862    partition_pruner: Arc<PartitionPruner>,
863    sources: &mut Vec<BoxedRecordBatchStream>,
864    semaphore: Option<Arc<Semaphore>>,
865) -> Result<()> {
866    // Gets range meta.
867    let range_meta = &stream_ctx.ranges[part_range.identifier];
868    #[cfg(debug_assertions)]
869    if compaction {
870        // Compaction expects input sources are not been split.
871        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
872        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
873            // It should scan all row groups.
874            debug_assert_eq!(
875                -1, row_group_idx.row_group_index,
876                "Expect {} range scan all row groups, given: {}",
877                i, row_group_idx.row_group_index,
878            );
879        }
880    }
881
882    let read_type = if compaction {
883        "compaction"
884    } else {
885        "seq_scan_files"
886    };
887    let num_indices = range_meta.row_group_indices.len();
888    if num_indices == 0 {
889        return Ok(());
890    }
891
892    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
893    sources.reserve(num_indices);
894    let mut ordered_sources = Vec::with_capacity(num_indices);
895    ordered_sources.resize_with(num_indices, || None);
896    let mut file_scan_tasks = Vec::new();
897
898    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
899        if stream_ctx.is_mem_range_index(*index) {
900            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
901            ordered_sources[position] = Some(Box::pin(stream) as _);
902        } else if stream_ctx.is_file_range_index(*index) {
903            if let Some(semaphore_ref) = semaphore.as_ref() {
904                // run in parallel, controlled by semaphore
905                let stream_ctx = stream_ctx.clone();
906                let part_metrics = part_metrics.clone();
907                let partition_pruner = partition_pruner.clone();
908                let semaphore = Arc::clone(semaphore_ref);
909                let row_group_index = *index;
910                file_scan_tasks.push(async move {
911                    let _permit = semaphore.acquire().await.unwrap();
912                    let stream = scan_flat_file_ranges(
913                        stream_ctx,
914                        part_metrics,
915                        row_group_index,
916                        read_type,
917                        partition_pruner,
918                    )
919                    .await?;
920                    Ok((position, Box::pin(stream) as _))
921                });
922            } else {
923                // no semaphore, run sequentially
924                let stream = scan_flat_file_ranges(
925                    stream_ctx.clone(),
926                    part_metrics.clone(),
927                    *index,
928                    read_type,
929                    partition_pruner.clone(),
930                )
931                .await?;
932                ordered_sources[position] = Some(Box::pin(stream) as _);
933            }
934        } else {
935            let stream =
936                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
937            ordered_sources[position] = Some(stream);
938        }
939    }
940
941    if !file_scan_tasks.is_empty() {
942        let results = futures::future::try_join_all(file_scan_tasks).await?;
943        for (position, stream) in results {
944            ordered_sources[position] = Some(stream);
945        }
946    }
947
948    for stream in ordered_sources.into_iter().flatten() {
949        if should_split {
950            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
951        } else {
952            sources.push(stream);
953        }
954    }
955
956    if should_split {
957        common_telemetry::debug!(
958            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
959            stream_ctx.input.region_metadata().region_id,
960            sources.len(),
961            part_range,
962        );
963    }
964
965    Ok(())
966}
967
968#[cfg(test)]
969impl SeqScan {
970    /// Returns the input.
971    pub(crate) fn input(&self) -> &ScanInput {
972        &self.stream_ctx.input
973    }
974}
975
976/// Returns the scanner type.
977fn get_scanner_type(compaction: bool) -> &'static str {
978    if compaction {
979        "SeqScan(compaction)"
980    } else {
981        "SeqScan"
982    }
983}