mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49    should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58/// Scans a region and returns rows in a sorted sequence.
59///
60/// The output order is always `order by primary keys, time index` inside every
61/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
62pub struct SeqScan {
63    /// Properties of the scanner.
64    properties: ScannerProperties,
65    /// Context of streams.
66    stream_ctx: Arc<StreamContext>,
67    /// Metrics for each partition.
68    /// The scanner only sets in query and keeps it empty during compaction.
69    metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73    /// Creates a new [SeqScan] with the given input.
74    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            metrics_list: PartitionMetricsList::default(),
86        }
87    }
88
89    /// Builds a stream for the query.
90    ///
91    /// The returned stream is not partitioned and will contains all the data. If want
92    /// partitioned scan, use [`RegionScanner::scan_partition`].
93    #[tracing::instrument(
94        skip_all,
95        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96    )]
97    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98        let metrics_set = ExecutionPlanMetricsSet::new();
99        let streams = (0..self.properties.partitions.len())
100            .map(|partition: usize| {
101                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102            })
103            .collect::<Result<Vec<_>, _>>()?;
104
105        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106        Ok(Box::pin(aggr_stream))
107    }
108
109    /// Scan [`Batch`] in all partitions one by one.
110    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111        let metrics_set = ExecutionPlanMetricsSet::new();
112
113        let streams = (0..self.properties.partitions.len())
114            .map(|partition| {
115                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116                self.scan_batch_in_partition(partition, metrics)
117            })
118            .collect::<Result<Vec<_>>>()?;
119
120        Ok(Box::pin(futures::stream::iter(streams).flatten()))
121    }
122
123    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
124    ///
125    /// # Panics
126    /// Panics if the compaction flag is not set.
127    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128        assert!(self.stream_ctx.input.compaction);
129
130        let metrics_set = ExecutionPlanMetricsSet::new();
131        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132        debug_assert_eq!(1, self.properties.partitions.len());
133        let partition_ranges = &self.properties.partitions[0];
134
135        let reader = Self::merge_all_ranges_for_compaction(
136            &self.stream_ctx,
137            partition_ranges,
138            &part_metrics,
139        )
140        .await?;
141        Ok(Box::new(reader))
142    }
143
144    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
145    ///
146    /// # Panics
147    /// Panics if the compaction flag is not set.
148    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149        assert!(self.stream_ctx.input.compaction);
150
151        let metrics_set = ExecutionPlanMetricsSet::new();
152        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153        debug_assert_eq!(1, self.properties.partitions.len());
154        let partition_ranges = &self.properties.partitions[0];
155
156        let reader = Self::merge_all_flat_ranges_for_compaction(
157            &self.stream_ctx,
158            partition_ranges,
159            &part_metrics,
160        )
161        .await?;
162        Ok(reader)
163    }
164
165    /// Builds a merge reader that reads all ranges.
166    /// Callers MUST not split ranges before calling this method.
167    async fn merge_all_ranges_for_compaction(
168        stream_ctx: &Arc<StreamContext>,
169        partition_ranges: &[PartitionRange],
170        part_metrics: &PartitionMetrics,
171    ) -> Result<BoxedBatchReader> {
172        let mut sources = Vec::new();
173        let range_builder_list = Arc::new(RangeBuilderList::new(
174            stream_ctx.input.num_memtables(),
175            stream_ctx.input.num_files(),
176        ));
177        for part_range in partition_ranges {
178            build_sources(
179                stream_ctx,
180                part_range,
181                true,
182                part_metrics,
183                range_builder_list.clone(),
184                &mut sources,
185                None,
186            )
187            .await?;
188        }
189
190        common_telemetry::debug!(
191            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
192            stream_ctx.input.mapper.metadata().region_id,
193            partition_ranges.len(),
194            sources.len()
195        );
196        Self::build_reader_from_sources(stream_ctx, sources, None, None).await
197    }
198
199    /// Builds a merge reader that reads all flat ranges.
200    /// Callers MUST not split ranges before calling this method.
201    async fn merge_all_flat_ranges_for_compaction(
202        stream_ctx: &Arc<StreamContext>,
203        partition_ranges: &[PartitionRange],
204        part_metrics: &PartitionMetrics,
205    ) -> Result<BoxedRecordBatchStream> {
206        let mut sources = Vec::new();
207        let range_builder_list = Arc::new(RangeBuilderList::new(
208            stream_ctx.input.num_memtables(),
209            stream_ctx.input.num_files(),
210        ));
211        for part_range in partition_ranges {
212            build_flat_sources(
213                stream_ctx,
214                part_range,
215                true,
216                part_metrics,
217                range_builder_list.clone(),
218                &mut sources,
219                None,
220            )
221            .await?;
222        }
223
224        common_telemetry::debug!(
225            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
226            stream_ctx.input.mapper.metadata().region_id,
227            partition_ranges.len(),
228            sources.len()
229        );
230        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
231    }
232
233    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
234    /// if possible.
235    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
236    pub(crate) async fn build_reader_from_sources(
237        stream_ctx: &StreamContext,
238        mut sources: Vec<Source>,
239        semaphore: Option<Arc<Semaphore>>,
240        part_metrics: Option<&PartitionMetrics>,
241    ) -> Result<BoxedBatchReader> {
242        if let Some(semaphore) = semaphore.as_ref() {
243            // Read sources in parallel.
244            if sources.len() > 1 {
245                sources = stream_ctx
246                    .input
247                    .create_parallel_sources(sources, semaphore.clone())?;
248            }
249        }
250
251        let mut builder = MergeReaderBuilder::from_sources(sources);
252        if let Some(metrics) = part_metrics {
253            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
254        }
255        let reader = builder.build().await?;
256
257        let dedup = !stream_ctx.input.append_mode;
258        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
259        let reader = if dedup {
260            match stream_ctx.input.merge_mode {
261                MergeMode::LastRow => Box::new(DedupReader::new(
262                    reader,
263                    LastRow::new(stream_ctx.input.filter_deleted),
264                    dedup_metrics_reporter,
265                )) as _,
266                MergeMode::LastNonNull => Box::new(DedupReader::new(
267                    reader,
268                    LastNonNull::new(stream_ctx.input.filter_deleted),
269                    dedup_metrics_reporter,
270                )) as _,
271            }
272        } else {
273            Box::new(reader) as _
274        };
275
276        let reader = match &stream_ctx.input.series_row_selector {
277            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
278            None => reader,
279        };
280
281        Ok(reader)
282    }
283
284    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
285    /// if possible.
286    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
287    pub(crate) async fn build_flat_reader_from_sources(
288        stream_ctx: &StreamContext,
289        mut sources: Vec<BoxedRecordBatchStream>,
290        semaphore: Option<Arc<Semaphore>>,
291        part_metrics: Option<&PartitionMetrics>,
292    ) -> Result<BoxedRecordBatchStream> {
293        if let Some(semaphore) = semaphore.as_ref() {
294            // Read sources in parallel.
295            if sources.len() > 1 {
296                sources = stream_ctx
297                    .input
298                    .create_parallel_flat_sources(sources, semaphore.clone())?;
299            }
300        }
301
302        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
303        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
304
305        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
306        let reader =
307            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
308                .await?;
309
310        let dedup = !stream_ctx.input.append_mode;
311        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
312        let reader = if dedup {
313            match stream_ctx.input.merge_mode {
314                MergeMode::LastRow => Box::pin(
315                    FlatDedupReader::new(
316                        reader.into_stream().boxed(),
317                        FlatLastRow::new(stream_ctx.input.filter_deleted),
318                        dedup_metrics_reporter,
319                    )
320                    .into_stream(),
321                ) as _,
322                MergeMode::LastNonNull => Box::pin(
323                    FlatDedupReader::new(
324                        reader.into_stream().boxed(),
325                        FlatLastNonNull::new(
326                            mapper.field_column_start(),
327                            stream_ctx.input.filter_deleted,
328                        ),
329                        dedup_metrics_reporter,
330                    )
331                    .into_stream(),
332                ) as _,
333            }
334        } else {
335            Box::pin(reader.into_stream()) as _
336        };
337
338        Ok(reader)
339    }
340
341    /// Scans the given partition when the part list is set properly.
342    /// Otherwise the returned stream might not contains any data.
343    fn scan_partition_impl(
344        &self,
345        ctx: &QueryScanContext,
346        metrics_set: &ExecutionPlanMetricsSet,
347        partition: usize,
348    ) -> Result<SendableRecordBatchStream> {
349        if ctx.explain_verbose {
350            common_telemetry::info!(
351                "SeqScan partition {}, region_id: {}",
352                partition,
353                self.stream_ctx.input.region_metadata().region_id
354            );
355        }
356
357        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
358        let input = &self.stream_ctx.input;
359
360        let batch_stream = if input.flat_format {
361            // Use flat scan for bulk memtables
362            self.scan_flat_batch_in_partition(partition, metrics.clone())?
363        } else {
364            // Use regular batch scan for normal memtables
365            self.scan_batch_in_partition(partition, metrics.clone())?
366        };
367        let record_batch_stream = ConvertBatchStream::new(
368            batch_stream,
369            input.mapper.clone(),
370            input.cache_strategy.clone(),
371            metrics,
372        );
373
374        Ok(Box::pin(RecordBatchStreamWrapper::new(
375            input.mapper.output_schema(),
376            Box::pin(record_batch_stream),
377        )))
378    }
379
380    #[tracing::instrument(
381        skip_all,
382        fields(
383            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
384            partition = partition
385        )
386    )]
387    fn scan_batch_in_partition(
388        &self,
389        partition: usize,
390        part_metrics: PartitionMetrics,
391    ) -> Result<ScanBatchStream> {
392        ensure!(
393            partition < self.properties.partitions.len(),
394            PartitionOutOfRangeSnafu {
395                given: partition,
396                all: self.properties.partitions.len(),
397            }
398        );
399
400        if self.properties.partitions[partition].is_empty() {
401            return Ok(Box::pin(futures::stream::empty()));
402        }
403
404        let stream_ctx = self.stream_ctx.clone();
405        let semaphore = self.new_semaphore();
406        let partition_ranges = self.properties.partitions[partition].clone();
407        let compaction = self.stream_ctx.input.compaction;
408        let distinguish_range = self.properties.distinguish_partition_range;
409        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
410
411        let stream = try_stream! {
412            part_metrics.on_first_poll();
413
414            let range_builder_list = Arc::new(RangeBuilderList::new(
415                stream_ctx.input.num_memtables(),
416                stream_ctx.input.num_files(),
417            ));
418            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
419                reason: "Unexpected format",
420            })?;
421            // Scans each part.
422            for part_range in partition_ranges {
423                let mut sources = Vec::new();
424                build_sources(
425                    &stream_ctx,
426                    &part_range,
427                    compaction,
428                    &part_metrics,
429                    range_builder_list.clone(),
430                    &mut sources,
431                    file_scan_semaphore.clone(),
432                ).await?;
433
434                let mut metrics = ScannerMetrics::default();
435                let mut fetch_start = Instant::now();
436                let mut reader =
437                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
438                        .await?;
439                #[cfg(debug_assertions)]
440                let mut checker = crate::read::BatchChecker::default()
441                    .with_start(Some(part_range.start))
442                    .with_end(Some(part_range.end));
443
444                while let Some(batch) = reader.next_batch().await? {
445                    metrics.scan_cost += fetch_start.elapsed();
446                    metrics.num_batches += 1;
447                    metrics.num_rows += batch.num_rows();
448
449                    debug_assert!(!batch.is_empty());
450                    if batch.is_empty() {
451                        continue;
452                    }
453
454                    #[cfg(debug_assertions)]
455                    checker.ensure_part_range_batch(
456                        "SeqScan",
457                        _mapper.metadata().region_id,
458                        partition,
459                        part_range,
460                        &batch,
461                    );
462
463                    let yield_start = Instant::now();
464                    yield ScanBatch::Normal(batch);
465                    metrics.yield_cost += yield_start.elapsed();
466
467                    fetch_start = Instant::now();
468                }
469
470                // Yields an empty part to indicate this range is terminated.
471                // The query engine can use this to optimize some queries.
472                if distinguish_range {
473                    let yield_start = Instant::now();
474                    yield ScanBatch::Normal(Batch::empty());
475                    metrics.yield_cost += yield_start.elapsed();
476                }
477
478                metrics.scan_cost += fetch_start.elapsed();
479                part_metrics.merge_metrics(&metrics);
480            }
481
482            part_metrics.on_finish();
483        };
484        Ok(Box::pin(stream))
485    }
486
487    #[tracing::instrument(
488        skip_all,
489        fields(
490            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
491            partition = partition
492        )
493    )]
494    fn scan_flat_batch_in_partition(
495        &self,
496        partition: usize,
497        part_metrics: PartitionMetrics,
498    ) -> Result<ScanBatchStream> {
499        ensure!(
500            partition < self.properties.partitions.len(),
501            PartitionOutOfRangeSnafu {
502                given: partition,
503                all: self.properties.partitions.len(),
504            }
505        );
506
507        if self.properties.partitions[partition].is_empty() {
508            return Ok(Box::pin(futures::stream::empty()));
509        }
510
511        let stream_ctx = self.stream_ctx.clone();
512        let semaphore = self.new_semaphore();
513        let partition_ranges = self.properties.partitions[partition].clone();
514        let compaction = self.stream_ctx.input.compaction;
515        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
516
517        let stream = try_stream! {
518            part_metrics.on_first_poll();
519
520            let range_builder_list = Arc::new(RangeBuilderList::new(
521                stream_ctx.input.num_memtables(),
522                stream_ctx.input.num_files(),
523            ));
524            // Scans each part.
525            for part_range in partition_ranges {
526                let mut sources = Vec::new();
527                build_flat_sources(
528                    &stream_ctx,
529                    &part_range,
530                    compaction,
531                    &part_metrics,
532                    range_builder_list.clone(),
533                    &mut sources,
534                    file_scan_semaphore.clone(),
535                ).await?;
536
537                let mut metrics = ScannerMetrics::default();
538                let mut fetch_start = Instant::now();
539                let mut reader =
540                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
541                        .await?;
542
543                while let Some(record_batch) = reader.try_next().await? {
544                    metrics.scan_cost += fetch_start.elapsed();
545                    metrics.num_batches += 1;
546                    metrics.num_rows += record_batch.num_rows();
547
548                    debug_assert!(record_batch.num_rows() > 0);
549                    if record_batch.num_rows() == 0 {
550                        continue;
551                    }
552
553                    let yield_start = Instant::now();
554                    yield ScanBatch::RecordBatch(record_batch);
555                    metrics.yield_cost += yield_start.elapsed();
556
557                    fetch_start = Instant::now();
558                }
559
560                metrics.scan_cost += fetch_start.elapsed();
561                part_metrics.merge_metrics(&metrics);
562            }
563
564            part_metrics.on_finish();
565        };
566        Ok(Box::pin(stream))
567    }
568
569    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
570        if self.properties.target_partitions() > self.properties.num_partitions() {
571            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
572            // This semaphore is partition level.
573            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
574            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
575            // files in a part range.
576            Some(Arc::new(Semaphore::new(
577                self.properties.target_partitions() - self.properties.num_partitions() + 1,
578            )))
579        } else {
580            None
581        }
582    }
583
584    /// Creates a new partition metrics instance.
585    /// Sets the partition metrics for the given partition if it is not for compaction.
586    fn new_partition_metrics(
587        &self,
588        explain_verbose: bool,
589        metrics_set: &ExecutionPlanMetricsSet,
590        partition: usize,
591    ) -> PartitionMetrics {
592        let metrics = PartitionMetrics::new(
593            self.stream_ctx.input.mapper.metadata().region_id,
594            partition,
595            get_scanner_type(self.stream_ctx.input.compaction),
596            self.stream_ctx.query_start,
597            explain_verbose,
598            metrics_set,
599        );
600
601        if !self.stream_ctx.input.compaction {
602            self.metrics_list.set(partition, metrics.clone());
603        }
604
605        metrics
606    }
607
608    /// Finds the maximum number of files to read in a single partition range.
609    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
610        partition_ranges
611            .iter()
612            .map(|part_range| {
613                let range_meta = &ranges[part_range.identifier];
614                range_meta.indices.len()
615            })
616            .max()
617            .unwrap_or(0)
618    }
619
620    /// Checks resource limit for the scanner.
621    pub(crate) fn check_scan_limit(&self) -> Result<()> {
622        // Check max file count limit for all partitions since we scan them in parallel.
623        let total_max_files: usize = self
624            .properties
625            .partitions
626            .iter()
627            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
628            .sum();
629
630        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
631        if total_max_files > max_concurrent_files {
632            return TooManyFilesToReadSnafu {
633                actual: total_max_files,
634                max: max_concurrent_files,
635            }
636            .fail();
637        }
638
639        Ok(())
640    }
641}
642
643impl RegionScanner for SeqScan {
644    fn name(&self) -> &str {
645        "SeqScan"
646    }
647
648    fn properties(&self) -> &ScannerProperties {
649        &self.properties
650    }
651
652    fn schema(&self) -> SchemaRef {
653        self.stream_ctx.input.mapper.output_schema()
654    }
655
656    fn metadata(&self) -> RegionMetadataRef {
657        self.stream_ctx.input.mapper.metadata().clone()
658    }
659
660    fn scan_partition(
661        &self,
662        ctx: &QueryScanContext,
663        metrics_set: &ExecutionPlanMetricsSet,
664        partition: usize,
665    ) -> Result<SendableRecordBatchStream, BoxedError> {
666        self.scan_partition_impl(ctx, metrics_set, partition)
667            .map_err(BoxedError::new)
668    }
669
670    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
671        self.properties.prepare(request);
672
673        self.check_scan_limit().map_err(BoxedError::new)?;
674
675        Ok(())
676    }
677
678    fn has_predicate_without_region(&self) -> bool {
679        let predicate = self
680            .stream_ctx
681            .input
682            .predicate_group()
683            .predicate_without_region();
684        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
685    }
686
687    fn set_logical_region(&mut self, logical_region: bool) {
688        self.properties.set_logical_region(logical_region);
689    }
690}
691
692impl DisplayAs for SeqScan {
693    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
694        write!(
695            f,
696            "SeqScan: region={}, ",
697            self.stream_ctx.input.mapper.metadata().region_id
698        )?;
699        match t {
700            // TODO(LFC): Implement all the "TreeRender" display format.
701            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
702                self.stream_ctx.format_for_explain(false, f)
703            }
704            DisplayFormatType::Verbose => {
705                self.stream_ctx.format_for_explain(true, f)?;
706                self.metrics_list.format_verbose_metrics(f)
707            }
708        }
709    }
710}
711
712impl fmt::Debug for SeqScan {
713    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
714        f.debug_struct("SeqScan")
715            .field("num_ranges", &self.stream_ctx.ranges.len())
716            .finish()
717    }
718}
719
720/// Builds sources for the partition range and push them to the `sources` vector.
721pub(crate) async fn build_sources(
722    stream_ctx: &Arc<StreamContext>,
723    part_range: &PartitionRange,
724    compaction: bool,
725    part_metrics: &PartitionMetrics,
726    range_builder_list: Arc<RangeBuilderList>,
727    sources: &mut Vec<Source>,
728    semaphore: Option<Arc<Semaphore>>,
729) -> Result<()> {
730    // Gets range meta.
731    let range_meta = &stream_ctx.ranges[part_range.identifier];
732    #[cfg(debug_assertions)]
733    if compaction {
734        // Compaction expects input sources are not been split.
735        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
736        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
737            // It should scan all row groups.
738            debug_assert_eq!(
739                -1, row_group_idx.row_group_index,
740                "Expect {} range scan all row groups, given: {}",
741                i, row_group_idx.row_group_index,
742            );
743        }
744    }
745
746    let read_type = if compaction {
747        "compaction"
748    } else {
749        "seq_scan_files"
750    };
751    let num_indices = range_meta.row_group_indices.len();
752    if num_indices == 0 {
753        return Ok(());
754    }
755
756    sources.reserve(num_indices);
757    let mut ordered_sources = Vec::with_capacity(num_indices);
758    ordered_sources.resize_with(num_indices, || None);
759    let mut file_scan_tasks = Vec::new();
760
761    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
762        if stream_ctx.is_mem_range_index(*index) {
763            let stream = scan_mem_ranges(
764                stream_ctx.clone(),
765                part_metrics.clone(),
766                *index,
767                range_meta.time_range,
768            );
769            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
770        } else if stream_ctx.is_file_range_index(*index) {
771            if let Some(semaphore_ref) = semaphore.as_ref() {
772                // run in parallel, controlled by semaphore
773                let stream_ctx = stream_ctx.clone();
774                let part_metrics = part_metrics.clone();
775                let range_builder_list = range_builder_list.clone();
776                let semaphore = Arc::clone(semaphore_ref);
777                let row_group_index = *index;
778                file_scan_tasks.push(async move {
779                    let _permit = semaphore.acquire().await.unwrap();
780                    let stream = scan_file_ranges(
781                        stream_ctx,
782                        part_metrics,
783                        row_group_index,
784                        read_type,
785                        range_builder_list,
786                    )
787                    .await?;
788                    Ok((position, Source::Stream(Box::pin(stream) as _)))
789                });
790            } else {
791                // no semaphore, run sequentially
792                let stream = scan_file_ranges(
793                    stream_ctx.clone(),
794                    part_metrics.clone(),
795                    *index,
796                    read_type,
797                    range_builder_list.clone(),
798                )
799                .await?;
800                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
801            }
802        } else {
803            let stream =
804                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
805            ordered_sources[position] = Some(Source::Stream(stream));
806        }
807    }
808
809    if !file_scan_tasks.is_empty() {
810        let results = futures::future::try_join_all(file_scan_tasks).await?;
811        for (position, source) in results {
812            ordered_sources[position] = Some(source);
813        }
814    }
815
816    for source in ordered_sources.into_iter().flatten() {
817        sources.push(source);
818    }
819    Ok(())
820}
821
822/// Builds flat sources for the partition range and push them to the `sources` vector.
823pub(crate) async fn build_flat_sources(
824    stream_ctx: &Arc<StreamContext>,
825    part_range: &PartitionRange,
826    compaction: bool,
827    part_metrics: &PartitionMetrics,
828    range_builder_list: Arc<RangeBuilderList>,
829    sources: &mut Vec<BoxedRecordBatchStream>,
830    semaphore: Option<Arc<Semaphore>>,
831) -> Result<()> {
832    // Gets range meta.
833    let range_meta = &stream_ctx.ranges[part_range.identifier];
834    #[cfg(debug_assertions)]
835    if compaction {
836        // Compaction expects input sources are not been split.
837        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
838        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
839            // It should scan all row groups.
840            debug_assert_eq!(
841                -1, row_group_idx.row_group_index,
842                "Expect {} range scan all row groups, given: {}",
843                i, row_group_idx.row_group_index,
844            );
845        }
846    }
847
848    let read_type = if compaction {
849        "compaction"
850    } else {
851        "seq_scan_files"
852    };
853    let num_indices = range_meta.row_group_indices.len();
854    if num_indices == 0 {
855        return Ok(());
856    }
857
858    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
859    sources.reserve(num_indices);
860    let mut ordered_sources = Vec::with_capacity(num_indices);
861    ordered_sources.resize_with(num_indices, || None);
862    let mut file_scan_tasks = Vec::new();
863
864    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
865        if stream_ctx.is_mem_range_index(*index) {
866            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
867            ordered_sources[position] = Some(Box::pin(stream) as _);
868        } else if stream_ctx.is_file_range_index(*index) {
869            if let Some(semaphore_ref) = semaphore.as_ref() {
870                // run in parallel, controlled by semaphore
871                let stream_ctx = stream_ctx.clone();
872                let part_metrics = part_metrics.clone();
873                let range_builder_list = range_builder_list.clone();
874                let semaphore = Arc::clone(semaphore_ref);
875                let row_group_index = *index;
876                file_scan_tasks.push(async move {
877                    let _permit = semaphore.acquire().await.unwrap();
878                    let stream = scan_flat_file_ranges(
879                        stream_ctx,
880                        part_metrics,
881                        row_group_index,
882                        read_type,
883                        range_builder_list,
884                    )
885                    .await?;
886                    Ok((position, Box::pin(stream) as _))
887                });
888            } else {
889                // no semaphore, run sequentially
890                let stream = scan_flat_file_ranges(
891                    stream_ctx.clone(),
892                    part_metrics.clone(),
893                    *index,
894                    read_type,
895                    range_builder_list.clone(),
896                )
897                .await?;
898                ordered_sources[position] = Some(Box::pin(stream) as _);
899            }
900        } else {
901            let stream =
902                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
903            ordered_sources[position] = Some(stream);
904        }
905    }
906
907    if !file_scan_tasks.is_empty() {
908        let results = futures::future::try_join_all(file_scan_tasks).await?;
909        for (position, stream) in results {
910            ordered_sources[position] = Some(stream);
911        }
912    }
913
914    for stream in ordered_sources.into_iter().flatten() {
915        if should_split {
916            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
917        } else {
918            sources.push(stream);
919        }
920    }
921
922    if should_split {
923        common_telemetry::debug!(
924            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
925            stream_ctx.input.region_metadata().region_id,
926            sources.len(),
927            part_range,
928        );
929    }
930
931    Ok(())
932}
933
934#[cfg(test)]
935impl SeqScan {
936    /// Returns the input.
937    pub(crate) fn input(&self) -> &ScanInput {
938        &self.stream_ctx.input
939    }
940}
941
942/// Returns the scanner type.
943fn get_scanner_type(compaction: bool) -> &'static str {
944    if compaction {
945        "SeqScan(compaction)"
946    } else {
947        "SeqScan"
948    }
949}