mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49    should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58/// Scans a region and returns rows in a sorted sequence.
59///
60/// The output order is always `order by primary keys, time index` inside every
61/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
62pub struct SeqScan {
63    /// Properties of the scanner.
64    properties: ScannerProperties,
65    /// Context of streams.
66    stream_ctx: Arc<StreamContext>,
67    /// Metrics for each partition.
68    /// The scanner only sets in query and keeps it empty during compaction.
69    metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73    /// Creates a new [SeqScan] with the given input.
74    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            metrics_list: PartitionMetricsList::default(),
86        }
87    }
88
89    /// Builds a stream for the query.
90    ///
91    /// The returned stream is not partitioned and will contains all the data. If want
92    /// partitioned scan, use [`RegionScanner::scan_partition`].
93    #[tracing::instrument(
94        skip_all,
95        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96    )]
97    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98        let metrics_set = ExecutionPlanMetricsSet::new();
99        let streams = (0..self.properties.partitions.len())
100            .map(|partition: usize| {
101                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102            })
103            .collect::<Result<Vec<_>, _>>()?;
104
105        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106        Ok(Box::pin(aggr_stream))
107    }
108
109    /// Scan [`Batch`] in all partitions one by one.
110    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111        let metrics_set = ExecutionPlanMetricsSet::new();
112
113        let streams = (0..self.properties.partitions.len())
114            .map(|partition| {
115                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116                self.scan_batch_in_partition(partition, metrics)
117            })
118            .collect::<Result<Vec<_>>>()?;
119
120        Ok(Box::pin(futures::stream::iter(streams).flatten()))
121    }
122
123    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
124    ///
125    /// # Panics
126    /// Panics if the compaction flag is not set.
127    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128        assert!(self.stream_ctx.input.compaction);
129
130        let metrics_set = ExecutionPlanMetricsSet::new();
131        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132        debug_assert_eq!(1, self.properties.partitions.len());
133        let partition_ranges = &self.properties.partitions[0];
134
135        let reader = Self::merge_all_ranges_for_compaction(
136            &self.stream_ctx,
137            partition_ranges,
138            &part_metrics,
139        )
140        .await?;
141        Ok(Box::new(reader))
142    }
143
144    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
145    ///
146    /// # Panics
147    /// Panics if the compaction flag is not set.
148    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149        assert!(self.stream_ctx.input.compaction);
150
151        let metrics_set = ExecutionPlanMetricsSet::new();
152        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153        debug_assert_eq!(1, self.properties.partitions.len());
154        let partition_ranges = &self.properties.partitions[0];
155
156        let reader = Self::merge_all_flat_ranges_for_compaction(
157            &self.stream_ctx,
158            partition_ranges,
159            &part_metrics,
160        )
161        .await?;
162        Ok(reader)
163    }
164
165    /// Builds a merge reader that reads all ranges.
166    /// Callers MUST not split ranges before calling this method.
167    async fn merge_all_ranges_for_compaction(
168        stream_ctx: &Arc<StreamContext>,
169        partition_ranges: &[PartitionRange],
170        part_metrics: &PartitionMetrics,
171    ) -> Result<BoxedBatchReader> {
172        let mut sources = Vec::new();
173        let range_builder_list = Arc::new(RangeBuilderList::new(
174            stream_ctx.input.num_memtables(),
175            stream_ctx.input.num_files(),
176        ));
177        for part_range in partition_ranges {
178            build_sources(
179                stream_ctx,
180                part_range,
181                true,
182                part_metrics,
183                range_builder_list.clone(),
184                &mut sources,
185                None,
186            )
187            .await?;
188        }
189
190        common_telemetry::debug!(
191            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
192            stream_ctx.input.mapper.metadata().region_id,
193            partition_ranges.len(),
194            sources.len()
195        );
196        Self::build_reader_from_sources(stream_ctx, sources, None, None).await
197    }
198
199    /// Builds a merge reader that reads all flat ranges.
200    /// Callers MUST not split ranges before calling this method.
201    async fn merge_all_flat_ranges_for_compaction(
202        stream_ctx: &Arc<StreamContext>,
203        partition_ranges: &[PartitionRange],
204        part_metrics: &PartitionMetrics,
205    ) -> Result<BoxedRecordBatchStream> {
206        let mut sources = Vec::new();
207        let range_builder_list = Arc::new(RangeBuilderList::new(
208            stream_ctx.input.num_memtables(),
209            stream_ctx.input.num_files(),
210        ));
211        for part_range in partition_ranges {
212            build_flat_sources(
213                stream_ctx,
214                part_range,
215                true,
216                part_metrics,
217                range_builder_list.clone(),
218                &mut sources,
219                None,
220            )
221            .await?;
222        }
223
224        common_telemetry::debug!(
225            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
226            stream_ctx.input.mapper.metadata().region_id,
227            partition_ranges.len(),
228            sources.len()
229        );
230        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
231    }
232
233    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
234    /// if possible.
235    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
236    pub(crate) async fn build_reader_from_sources(
237        stream_ctx: &StreamContext,
238        mut sources: Vec<Source>,
239        semaphore: Option<Arc<Semaphore>>,
240        part_metrics: Option<&PartitionMetrics>,
241    ) -> Result<BoxedBatchReader> {
242        if let Some(semaphore) = semaphore.as_ref() {
243            // Read sources in parallel.
244            if sources.len() > 1 {
245                sources = stream_ctx
246                    .input
247                    .create_parallel_sources(sources, semaphore.clone())?;
248            }
249        }
250
251        let mut builder = MergeReaderBuilder::from_sources(sources);
252        if let Some(metrics) = part_metrics {
253            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
254        }
255        let reader = builder.build().await?;
256
257        let dedup = !stream_ctx.input.append_mode;
258        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
259        let reader = if dedup {
260            match stream_ctx.input.merge_mode {
261                MergeMode::LastRow => Box::new(DedupReader::new(
262                    reader,
263                    LastRow::new(stream_ctx.input.filter_deleted),
264                    dedup_metrics_reporter,
265                )) as _,
266                MergeMode::LastNonNull => Box::new(DedupReader::new(
267                    reader,
268                    LastNonNull::new(stream_ctx.input.filter_deleted),
269                    dedup_metrics_reporter,
270                )) as _,
271            }
272        } else {
273            Box::new(reader) as _
274        };
275
276        let reader = match &stream_ctx.input.series_row_selector {
277            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
278            None => reader,
279        };
280
281        Ok(reader)
282    }
283
284    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
285    /// if possible.
286    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
287    pub(crate) async fn build_flat_reader_from_sources(
288        stream_ctx: &StreamContext,
289        mut sources: Vec<BoxedRecordBatchStream>,
290        semaphore: Option<Arc<Semaphore>>,
291        part_metrics: Option<&PartitionMetrics>,
292    ) -> Result<BoxedRecordBatchStream> {
293        if let Some(semaphore) = semaphore.as_ref() {
294            // Read sources in parallel.
295            if sources.len() > 1 {
296                sources = stream_ctx
297                    .input
298                    .create_parallel_flat_sources(sources, semaphore.clone())?;
299            }
300        }
301
302        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
303        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
304
305        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
306        let reader =
307            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
308                .await?;
309
310        let dedup = !stream_ctx.input.append_mode;
311        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
312        let reader = if dedup {
313            match stream_ctx.input.merge_mode {
314                MergeMode::LastRow => Box::pin(
315                    FlatDedupReader::new(
316                        reader.into_stream().boxed(),
317                        FlatLastRow::new(stream_ctx.input.filter_deleted),
318                        dedup_metrics_reporter,
319                    )
320                    .into_stream(),
321                ) as _,
322                MergeMode::LastNonNull => Box::pin(
323                    FlatDedupReader::new(
324                        reader.into_stream().boxed(),
325                        FlatLastNonNull::new(
326                            mapper.field_column_start(),
327                            stream_ctx.input.filter_deleted,
328                        ),
329                        dedup_metrics_reporter,
330                    )
331                    .into_stream(),
332                ) as _,
333            }
334        } else {
335            Box::pin(reader.into_stream()) as _
336        };
337
338        Ok(reader)
339    }
340
341    /// Scans the given partition when the part list is set properly.
342    /// Otherwise the returned stream might not contains any data.
343    fn scan_partition_impl(
344        &self,
345        ctx: &QueryScanContext,
346        metrics_set: &ExecutionPlanMetricsSet,
347        partition: usize,
348    ) -> Result<SendableRecordBatchStream> {
349        if ctx.explain_verbose {
350            common_telemetry::info!(
351                "SeqScan partition {}, region_id: {}",
352                partition,
353                self.stream_ctx.input.region_metadata().region_id
354            );
355        }
356
357        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
358        let input = &self.stream_ctx.input;
359
360        let batch_stream = if input.flat_format {
361            // Use flat scan for bulk memtables
362            self.scan_flat_batch_in_partition(partition, metrics.clone())?
363        } else {
364            // Use regular batch scan for normal memtables
365            self.scan_batch_in_partition(partition, metrics.clone())?
366        };
367        let record_batch_stream = ConvertBatchStream::new(
368            batch_stream,
369            input.mapper.clone(),
370            input.cache_strategy.clone(),
371            metrics,
372        );
373
374        Ok(Box::pin(RecordBatchStreamWrapper::new(
375            input.mapper.output_schema(),
376            Box::pin(record_batch_stream),
377        )))
378    }
379
380    #[tracing::instrument(
381        skip_all,
382        fields(
383            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
384            partition = partition
385        )
386    )]
387    fn scan_batch_in_partition(
388        &self,
389        partition: usize,
390        part_metrics: PartitionMetrics,
391    ) -> Result<ScanBatchStream> {
392        ensure!(
393            partition < self.properties.partitions.len(),
394            PartitionOutOfRangeSnafu {
395                given: partition,
396                all: self.properties.partitions.len(),
397            }
398        );
399
400        if self.properties.partitions[partition].is_empty() {
401            return Ok(Box::pin(futures::stream::empty()));
402        }
403
404        let stream_ctx = self.stream_ctx.clone();
405        let semaphore = self.new_semaphore();
406        let partition_ranges = self.properties.partitions[partition].clone();
407        let compaction = self.stream_ctx.input.compaction;
408        let distinguish_range = self.properties.distinguish_partition_range;
409        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
410
411        let stream = try_stream! {
412            part_metrics.on_first_poll();
413            // Start fetch time before building sources so scan cost contains
414            // build part cost.
415            let mut fetch_start = Instant::now();
416
417            let range_builder_list = Arc::new(RangeBuilderList::new(
418                stream_ctx.input.num_memtables(),
419                stream_ctx.input.num_files(),
420            ));
421            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
422                reason: "Unexpected format",
423            })?;
424            // Scans each part.
425            for part_range in partition_ranges {
426                let mut sources = Vec::new();
427                build_sources(
428                    &stream_ctx,
429                    &part_range,
430                    compaction,
431                    &part_metrics,
432                    range_builder_list.clone(),
433                    &mut sources,
434                    file_scan_semaphore.clone(),
435                ).await?;
436
437                let mut reader =
438                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
439                        .await?;
440                #[cfg(debug_assertions)]
441                let mut checker = crate::read::BatchChecker::default()
442                    .with_start(Some(part_range.start))
443                    .with_end(Some(part_range.end));
444
445                let mut metrics = ScannerMetrics {
446                    scan_cost: fetch_start.elapsed(),
447                    ..Default::default()
448                };
449                fetch_start = Instant::now();
450
451                while let Some(batch) = reader.next_batch().await? {
452                    metrics.scan_cost += fetch_start.elapsed();
453                    metrics.num_batches += 1;
454                    metrics.num_rows += batch.num_rows();
455
456                    debug_assert!(!batch.is_empty());
457                    if batch.is_empty() {
458                        fetch_start = Instant::now();
459                        continue;
460                    }
461
462                    #[cfg(debug_assertions)]
463                    checker.ensure_part_range_batch(
464                        "SeqScan",
465                        _mapper.metadata().region_id,
466                        partition,
467                        part_range,
468                        &batch,
469                    );
470
471                    let yield_start = Instant::now();
472                    yield ScanBatch::Normal(batch);
473                    metrics.yield_cost += yield_start.elapsed();
474
475                    fetch_start = Instant::now();
476                }
477
478                // Yields an empty part to indicate this range is terminated.
479                // The query engine can use this to optimize some queries.
480                if distinguish_range {
481                    let yield_start = Instant::now();
482                    yield ScanBatch::Normal(Batch::empty());
483                    metrics.yield_cost += yield_start.elapsed();
484                }
485
486                metrics.scan_cost += fetch_start.elapsed();
487                fetch_start = Instant::now();
488                part_metrics.merge_metrics(&metrics);
489            }
490
491            part_metrics.on_finish();
492        };
493        Ok(Box::pin(stream))
494    }
495
496    #[tracing::instrument(
497        skip_all,
498        fields(
499            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
500            partition = partition
501        )
502    )]
503    fn scan_flat_batch_in_partition(
504        &self,
505        partition: usize,
506        part_metrics: PartitionMetrics,
507    ) -> Result<ScanBatchStream> {
508        ensure!(
509            partition < self.properties.partitions.len(),
510            PartitionOutOfRangeSnafu {
511                given: partition,
512                all: self.properties.partitions.len(),
513            }
514        );
515
516        if self.properties.partitions[partition].is_empty() {
517            return Ok(Box::pin(futures::stream::empty()));
518        }
519
520        let stream_ctx = self.stream_ctx.clone();
521        let semaphore = self.new_semaphore();
522        let partition_ranges = self.properties.partitions[partition].clone();
523        let compaction = self.stream_ctx.input.compaction;
524        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
525
526        let stream = try_stream! {
527            part_metrics.on_first_poll();
528            // Start fetch time before building sources so scan cost contains
529            // build part cost.
530            let mut fetch_start = Instant::now();
531
532            let range_builder_list = Arc::new(RangeBuilderList::new(
533                stream_ctx.input.num_memtables(),
534                stream_ctx.input.num_files(),
535            ));
536            // Scans each part.
537            for part_range in partition_ranges {
538                let mut sources = Vec::new();
539                build_flat_sources(
540                    &stream_ctx,
541                    &part_range,
542                    compaction,
543                    &part_metrics,
544                    range_builder_list.clone(),
545                    &mut sources,
546                    file_scan_semaphore.clone(),
547                ).await?;
548
549                let mut reader =
550                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
551                        .await?;
552
553                let mut metrics = ScannerMetrics {
554                    scan_cost: fetch_start.elapsed(),
555                    ..Default::default()
556                };
557                fetch_start = Instant::now();
558
559                while let Some(record_batch) = reader.try_next().await? {
560                    metrics.scan_cost += fetch_start.elapsed();
561                    metrics.num_batches += 1;
562                    metrics.num_rows += record_batch.num_rows();
563
564                    debug_assert!(record_batch.num_rows() > 0);
565                    if record_batch.num_rows() == 0 {
566                        fetch_start = Instant::now();
567                        continue;
568                    }
569
570                    let yield_start = Instant::now();
571                    yield ScanBatch::RecordBatch(record_batch);
572                    metrics.yield_cost += yield_start.elapsed();
573
574                    fetch_start = Instant::now();
575                }
576
577                metrics.scan_cost += fetch_start.elapsed();
578                fetch_start = Instant::now();
579                part_metrics.merge_metrics(&metrics);
580            }
581
582            part_metrics.on_finish();
583        };
584        Ok(Box::pin(stream))
585    }
586
587    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
588        if self.properties.target_partitions() > self.properties.num_partitions() {
589            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
590            // This semaphore is partition level.
591            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
592            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
593            // files in a part range.
594            Some(Arc::new(Semaphore::new(
595                self.properties.target_partitions() - self.properties.num_partitions() + 1,
596            )))
597        } else {
598            None
599        }
600    }
601
602    /// Creates a new partition metrics instance.
603    /// Sets the partition metrics for the given partition if it is not for compaction.
604    fn new_partition_metrics(
605        &self,
606        explain_verbose: bool,
607        metrics_set: &ExecutionPlanMetricsSet,
608        partition: usize,
609    ) -> PartitionMetrics {
610        let metrics = PartitionMetrics::new(
611            self.stream_ctx.input.mapper.metadata().region_id,
612            partition,
613            get_scanner_type(self.stream_ctx.input.compaction),
614            self.stream_ctx.query_start,
615            explain_verbose,
616            metrics_set,
617        );
618
619        if !self.stream_ctx.input.compaction {
620            self.metrics_list.set(partition, metrics.clone());
621        }
622
623        metrics
624    }
625
626    /// Finds the maximum number of files to read in a single partition range.
627    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
628        partition_ranges
629            .iter()
630            .map(|part_range| {
631                let range_meta = &ranges[part_range.identifier];
632                range_meta.indices.len()
633            })
634            .max()
635            .unwrap_or(0)
636    }
637
638    /// Checks resource limit for the scanner.
639    pub(crate) fn check_scan_limit(&self) -> Result<()> {
640        // Check max file count limit for all partitions since we scan them in parallel.
641        let total_max_files: usize = self
642            .properties
643            .partitions
644            .iter()
645            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
646            .sum();
647
648        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
649        if total_max_files > max_concurrent_files {
650            return TooManyFilesToReadSnafu {
651                actual: total_max_files,
652                max: max_concurrent_files,
653            }
654            .fail();
655        }
656
657        Ok(())
658    }
659}
660
661impl RegionScanner for SeqScan {
662    fn name(&self) -> &str {
663        "SeqScan"
664    }
665
666    fn properties(&self) -> &ScannerProperties {
667        &self.properties
668    }
669
670    fn schema(&self) -> SchemaRef {
671        self.stream_ctx.input.mapper.output_schema()
672    }
673
674    fn metadata(&self) -> RegionMetadataRef {
675        self.stream_ctx.input.mapper.metadata().clone()
676    }
677
678    fn scan_partition(
679        &self,
680        ctx: &QueryScanContext,
681        metrics_set: &ExecutionPlanMetricsSet,
682        partition: usize,
683    ) -> Result<SendableRecordBatchStream, BoxedError> {
684        self.scan_partition_impl(ctx, metrics_set, partition)
685            .map_err(BoxedError::new)
686    }
687
688    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
689        self.properties.prepare(request);
690
691        self.check_scan_limit().map_err(BoxedError::new)?;
692
693        Ok(())
694    }
695
696    fn has_predicate_without_region(&self) -> bool {
697        let predicate = self
698            .stream_ctx
699            .input
700            .predicate_group()
701            .predicate_without_region();
702        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
703    }
704
705    fn set_logical_region(&mut self, logical_region: bool) {
706        self.properties.set_logical_region(logical_region);
707    }
708}
709
710impl DisplayAs for SeqScan {
711    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
712        write!(
713            f,
714            "SeqScan: region={}, ",
715            self.stream_ctx.input.mapper.metadata().region_id
716        )?;
717        match t {
718            // TODO(LFC): Implement all the "TreeRender" display format.
719            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
720                self.stream_ctx.format_for_explain(false, f)
721            }
722            DisplayFormatType::Verbose => {
723                self.stream_ctx.format_for_explain(true, f)?;
724                self.metrics_list.format_verbose_metrics(f)
725            }
726        }
727    }
728}
729
730impl fmt::Debug for SeqScan {
731    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
732        f.debug_struct("SeqScan")
733            .field("num_ranges", &self.stream_ctx.ranges.len())
734            .finish()
735    }
736}
737
738/// Builds sources for the partition range and push them to the `sources` vector.
739pub(crate) async fn build_sources(
740    stream_ctx: &Arc<StreamContext>,
741    part_range: &PartitionRange,
742    compaction: bool,
743    part_metrics: &PartitionMetrics,
744    range_builder_list: Arc<RangeBuilderList>,
745    sources: &mut Vec<Source>,
746    semaphore: Option<Arc<Semaphore>>,
747) -> Result<()> {
748    // Gets range meta.
749    let range_meta = &stream_ctx.ranges[part_range.identifier];
750    #[cfg(debug_assertions)]
751    if compaction {
752        // Compaction expects input sources are not been split.
753        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
754        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
755            // It should scan all row groups.
756            debug_assert_eq!(
757                -1, row_group_idx.row_group_index,
758                "Expect {} range scan all row groups, given: {}",
759                i, row_group_idx.row_group_index,
760            );
761        }
762    }
763
764    let read_type = if compaction {
765        "compaction"
766    } else {
767        "seq_scan_files"
768    };
769    let num_indices = range_meta.row_group_indices.len();
770    if num_indices == 0 {
771        return Ok(());
772    }
773
774    sources.reserve(num_indices);
775    let mut ordered_sources = Vec::with_capacity(num_indices);
776    ordered_sources.resize_with(num_indices, || None);
777    let mut file_scan_tasks = Vec::new();
778
779    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
780        if stream_ctx.is_mem_range_index(*index) {
781            let stream = scan_mem_ranges(
782                stream_ctx.clone(),
783                part_metrics.clone(),
784                *index,
785                range_meta.time_range,
786            );
787            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
788        } else if stream_ctx.is_file_range_index(*index) {
789            if let Some(semaphore_ref) = semaphore.as_ref() {
790                // run in parallel, controlled by semaphore
791                let stream_ctx = stream_ctx.clone();
792                let part_metrics = part_metrics.clone();
793                let range_builder_list = range_builder_list.clone();
794                let semaphore = Arc::clone(semaphore_ref);
795                let row_group_index = *index;
796                file_scan_tasks.push(async move {
797                    let _permit = semaphore.acquire().await.unwrap();
798                    let stream = scan_file_ranges(
799                        stream_ctx,
800                        part_metrics,
801                        row_group_index,
802                        read_type,
803                        range_builder_list,
804                    )
805                    .await?;
806                    Ok((position, Source::Stream(Box::pin(stream) as _)))
807                });
808            } else {
809                // no semaphore, run sequentially
810                let stream = scan_file_ranges(
811                    stream_ctx.clone(),
812                    part_metrics.clone(),
813                    *index,
814                    read_type,
815                    range_builder_list.clone(),
816                )
817                .await?;
818                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
819            }
820        } else {
821            let stream =
822                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
823            ordered_sources[position] = Some(Source::Stream(stream));
824        }
825    }
826
827    if !file_scan_tasks.is_empty() {
828        let results = futures::future::try_join_all(file_scan_tasks).await?;
829        for (position, source) in results {
830            ordered_sources[position] = Some(source);
831        }
832    }
833
834    for source in ordered_sources.into_iter().flatten() {
835        sources.push(source);
836    }
837    Ok(())
838}
839
840/// Builds flat sources for the partition range and push them to the `sources` vector.
841pub(crate) async fn build_flat_sources(
842    stream_ctx: &Arc<StreamContext>,
843    part_range: &PartitionRange,
844    compaction: bool,
845    part_metrics: &PartitionMetrics,
846    range_builder_list: Arc<RangeBuilderList>,
847    sources: &mut Vec<BoxedRecordBatchStream>,
848    semaphore: Option<Arc<Semaphore>>,
849) -> Result<()> {
850    // Gets range meta.
851    let range_meta = &stream_ctx.ranges[part_range.identifier];
852    #[cfg(debug_assertions)]
853    if compaction {
854        // Compaction expects input sources are not been split.
855        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
856        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
857            // It should scan all row groups.
858            debug_assert_eq!(
859                -1, row_group_idx.row_group_index,
860                "Expect {} range scan all row groups, given: {}",
861                i, row_group_idx.row_group_index,
862            );
863        }
864    }
865
866    let read_type = if compaction {
867        "compaction"
868    } else {
869        "seq_scan_files"
870    };
871    let num_indices = range_meta.row_group_indices.len();
872    if num_indices == 0 {
873        return Ok(());
874    }
875
876    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
877    sources.reserve(num_indices);
878    let mut ordered_sources = Vec::with_capacity(num_indices);
879    ordered_sources.resize_with(num_indices, || None);
880    let mut file_scan_tasks = Vec::new();
881
882    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
883        if stream_ctx.is_mem_range_index(*index) {
884            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
885            ordered_sources[position] = Some(Box::pin(stream) as _);
886        } else if stream_ctx.is_file_range_index(*index) {
887            if let Some(semaphore_ref) = semaphore.as_ref() {
888                // run in parallel, controlled by semaphore
889                let stream_ctx = stream_ctx.clone();
890                let part_metrics = part_metrics.clone();
891                let range_builder_list = range_builder_list.clone();
892                let semaphore = Arc::clone(semaphore_ref);
893                let row_group_index = *index;
894                file_scan_tasks.push(async move {
895                    let _permit = semaphore.acquire().await.unwrap();
896                    let stream = scan_flat_file_ranges(
897                        stream_ctx,
898                        part_metrics,
899                        row_group_index,
900                        read_type,
901                        range_builder_list,
902                    )
903                    .await?;
904                    Ok((position, Box::pin(stream) as _))
905                });
906            } else {
907                // no semaphore, run sequentially
908                let stream = scan_flat_file_ranges(
909                    stream_ctx.clone(),
910                    part_metrics.clone(),
911                    *index,
912                    read_type,
913                    range_builder_list.clone(),
914                )
915                .await?;
916                ordered_sources[position] = Some(Box::pin(stream) as _);
917            }
918        } else {
919            let stream =
920                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
921            ordered_sources[position] = Some(stream);
922        }
923    }
924
925    if !file_scan_tasks.is_empty() {
926        let results = futures::future::try_join_all(file_scan_tasks).await?;
927        for (position, stream) in results {
928            ordered_sources[position] = Some(stream);
929        }
930    }
931
932    for stream in ordered_sources.into_iter().flatten() {
933        if should_split {
934            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
935        } else {
936            sources.push(stream);
937        }
938    }
939
940    if should_split {
941        common_telemetry::debug!(
942            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
943            stream_ctx.input.region_metadata().region_id,
944            sources.len(),
945            part_range,
946        );
947    }
948
949    Ok(())
950}
951
952#[cfg(test)]
953impl SeqScan {
954    /// Returns the input.
955    pub(crate) fn input(&self) -> &ScanInput {
956        &self.stream_ctx.input
957    }
958}
959
960/// Returns the scanner type.
961fn get_scanner_type(compaction: bool) -> &'static str {
962    if compaction {
963        "SeqScan(compaction)"
964    } else {
965        "SeqScan"
966    }
967}