mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::{OptionExt, ensure};
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu, UnexpectedSnafu};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
41use crate::read::flat_merge::FlatMergeReader;
42use crate::read::last_row::LastRowReader;
43use crate::read::merge::MergeReaderBuilder;
44use crate::read::range::{RangeBuilderList, RangeMeta, file_range_counts};
45use crate::read::scan_region::{ScanInput, StreamContext};
46use crate::read::scan_util::{
47    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, scan_file_ranges,
48    scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
49    should_split_flat_batches_for_merge,
50};
51use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
52use crate::read::{
53    Batch, BatchReader, BoxedBatchReader, BoxedRecordBatchStream, ScannerMetrics, Source, scan_util,
54};
55use crate::region::options::MergeMode;
56use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
57
58/// Scans a region and returns rows in a sorted sequence.
59///
60/// The output order is always `order by primary keys, time index` inside every
61/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
62pub struct SeqScan {
63    /// Properties of the scanner.
64    properties: ScannerProperties,
65    /// Context of streams.
66    stream_ctx: Arc<StreamContext>,
67    /// Metrics for each partition.
68    /// The scanner only sets in query and keeps it empty during compaction.
69    metrics_list: PartitionMetricsList,
70}
71
72impl SeqScan {
73    /// Creates a new [SeqScan] with the given input.
74    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
75    pub(crate) fn new(input: ScanInput) -> Self {
76        let mut properties = ScannerProperties::default()
77            .with_append_mode(input.append_mode)
78            .with_total_rows(input.total_rows());
79        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
80        properties.partitions = vec![stream_ctx.partition_ranges()];
81
82        Self {
83            properties,
84            stream_ctx,
85            metrics_list: PartitionMetricsList::default(),
86        }
87    }
88
89    /// Builds a stream for the query.
90    ///
91    /// The returned stream is not partitioned and will contains all the data. If want
92    /// partitioned scan, use [`RegionScanner::scan_partition`].
93    #[tracing::instrument(
94        skip_all,
95        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
96    )]
97    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
98        let metrics_set = ExecutionPlanMetricsSet::new();
99        let streams = (0..self.properties.partitions.len())
100            .map(|partition: usize| {
101                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
102            })
103            .collect::<Result<Vec<_>, _>>()?;
104
105        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
106        Ok(Box::pin(aggr_stream))
107    }
108
109    /// Scan [`Batch`] in all partitions one by one.
110    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
111        let metrics_set = ExecutionPlanMetricsSet::new();
112
113        let streams = (0..self.properties.partitions.len())
114            .map(|partition| {
115                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
116                self.scan_batch_in_partition(partition, metrics)
117            })
118            .collect::<Result<Vec<_>>>()?;
119
120        Ok(Box::pin(futures::stream::iter(streams).flatten()))
121    }
122
123    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
124    ///
125    /// # Panics
126    /// Panics if the compaction flag is not set.
127    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
128        assert!(self.stream_ctx.input.compaction);
129
130        let metrics_set = ExecutionPlanMetricsSet::new();
131        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
132        debug_assert_eq!(1, self.properties.partitions.len());
133        let partition_ranges = &self.properties.partitions[0];
134
135        let reader = Self::merge_all_ranges_for_compaction(
136            &self.stream_ctx,
137            partition_ranges,
138            &part_metrics,
139        )
140        .await?;
141        Ok(Box::new(reader))
142    }
143
144    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
145    ///
146    /// # Panics
147    /// Panics if the compaction flag is not set.
148    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
149        assert!(self.stream_ctx.input.compaction);
150
151        let metrics_set = ExecutionPlanMetricsSet::new();
152        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
153        debug_assert_eq!(1, self.properties.partitions.len());
154        let partition_ranges = &self.properties.partitions[0];
155
156        let reader = Self::merge_all_flat_ranges_for_compaction(
157            &self.stream_ctx,
158            partition_ranges,
159            &part_metrics,
160        )
161        .await?;
162        Ok(reader)
163    }
164
165    /// Builds a merge reader that reads all ranges.
166    /// Callers MUST not split ranges before calling this method.
167    async fn merge_all_ranges_for_compaction(
168        stream_ctx: &Arc<StreamContext>,
169        partition_ranges: &[PartitionRange],
170        part_metrics: &PartitionMetrics,
171    ) -> Result<BoxedBatchReader> {
172        let mut sources = Vec::new();
173        let counts = file_range_counts(
174            stream_ctx.input.num_memtables(),
175            stream_ctx.input.num_files(),
176            &stream_ctx.ranges,
177            partition_ranges.iter(),
178        );
179        let range_builder_list = Arc::new(RangeBuilderList::new(
180            stream_ctx.input.num_memtables(),
181            counts,
182        ));
183        for part_range in partition_ranges {
184            build_sources(
185                stream_ctx,
186                part_range,
187                true,
188                part_metrics,
189                range_builder_list.clone(),
190                &mut sources,
191                None,
192            )
193            .await?;
194        }
195
196        common_telemetry::debug!(
197            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
198            stream_ctx.input.mapper.metadata().region_id,
199            partition_ranges.len(),
200            sources.len()
201        );
202        Self::build_reader_from_sources(stream_ctx, sources, None, None).await
203    }
204
205    /// Builds a merge reader that reads all flat ranges.
206    /// Callers MUST not split ranges before calling this method.
207    async fn merge_all_flat_ranges_for_compaction(
208        stream_ctx: &Arc<StreamContext>,
209        partition_ranges: &[PartitionRange],
210        part_metrics: &PartitionMetrics,
211    ) -> Result<BoxedRecordBatchStream> {
212        let mut sources = Vec::new();
213        let counts = file_range_counts(
214            stream_ctx.input.num_memtables(),
215            stream_ctx.input.num_files(),
216            &stream_ctx.ranges,
217            partition_ranges.iter(),
218        );
219        let range_builder_list = Arc::new(RangeBuilderList::new(
220            stream_ctx.input.num_memtables(),
221            counts,
222        ));
223        for part_range in partition_ranges {
224            build_flat_sources(
225                stream_ctx,
226                part_range,
227                true,
228                part_metrics,
229                range_builder_list.clone(),
230                &mut sources,
231                None,
232            )
233            .await?;
234        }
235
236        common_telemetry::debug!(
237            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
238            stream_ctx.input.mapper.metadata().region_id,
239            partition_ranges.len(),
240            sources.len()
241        );
242        Self::build_flat_reader_from_sources(stream_ctx, sources, None, None).await
243    }
244
245    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
246    /// if possible.
247    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
248    pub(crate) async fn build_reader_from_sources(
249        stream_ctx: &StreamContext,
250        mut sources: Vec<Source>,
251        semaphore: Option<Arc<Semaphore>>,
252        part_metrics: Option<&PartitionMetrics>,
253    ) -> Result<BoxedBatchReader> {
254        if let Some(semaphore) = semaphore.as_ref() {
255            // Read sources in parallel.
256            if sources.len() > 1 {
257                sources = stream_ctx
258                    .input
259                    .create_parallel_sources(sources, semaphore.clone())?;
260            }
261        }
262
263        let mut builder = MergeReaderBuilder::from_sources(sources);
264        if let Some(metrics) = part_metrics {
265            builder.with_metrics_reporter(Some(metrics.merge_metrics_reporter()));
266        }
267        let reader = builder.build().await?;
268
269        let dedup = !stream_ctx.input.append_mode;
270        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
271        let reader = if dedup {
272            match stream_ctx.input.merge_mode {
273                MergeMode::LastRow => Box::new(DedupReader::new(
274                    reader,
275                    LastRow::new(stream_ctx.input.filter_deleted),
276                    dedup_metrics_reporter,
277                )) as _,
278                MergeMode::LastNonNull => Box::new(DedupReader::new(
279                    reader,
280                    LastNonNull::new(stream_ctx.input.filter_deleted),
281                    dedup_metrics_reporter,
282                )) as _,
283            }
284        } else {
285            Box::new(reader) as _
286        };
287
288        let reader = match &stream_ctx.input.series_row_selector {
289            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
290            None => reader,
291        };
292
293        Ok(reader)
294    }
295
296    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
297    /// if possible.
298    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
299    pub(crate) async fn build_flat_reader_from_sources(
300        stream_ctx: &StreamContext,
301        mut sources: Vec<BoxedRecordBatchStream>,
302        semaphore: Option<Arc<Semaphore>>,
303        part_metrics: Option<&PartitionMetrics>,
304    ) -> Result<BoxedRecordBatchStream> {
305        if let Some(semaphore) = semaphore.as_ref() {
306            // Read sources in parallel.
307            if sources.len() > 1 {
308                sources = stream_ctx
309                    .input
310                    .create_parallel_flat_sources(sources, semaphore.clone())?;
311            }
312        }
313
314        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
315        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
316
317        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
318        let reader =
319            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
320                .await?;
321
322        let dedup = !stream_ctx.input.append_mode;
323        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
324        let reader = if dedup {
325            match stream_ctx.input.merge_mode {
326                MergeMode::LastRow => Box::pin(
327                    FlatDedupReader::new(
328                        reader.into_stream().boxed(),
329                        FlatLastRow::new(stream_ctx.input.filter_deleted),
330                        dedup_metrics_reporter,
331                    )
332                    .into_stream(),
333                ) as _,
334                MergeMode::LastNonNull => Box::pin(
335                    FlatDedupReader::new(
336                        reader.into_stream().boxed(),
337                        FlatLastNonNull::new(
338                            mapper.field_column_start(),
339                            stream_ctx.input.filter_deleted,
340                        ),
341                        dedup_metrics_reporter,
342                    )
343                    .into_stream(),
344                ) as _,
345            }
346        } else {
347            Box::pin(reader.into_stream()) as _
348        };
349
350        Ok(reader)
351    }
352
353    /// Scans the given partition when the part list is set properly.
354    /// Otherwise the returned stream might not contains any data.
355    fn scan_partition_impl(
356        &self,
357        ctx: &QueryScanContext,
358        metrics_set: &ExecutionPlanMetricsSet,
359        partition: usize,
360    ) -> Result<SendableRecordBatchStream> {
361        if ctx.explain_verbose {
362            common_telemetry::info!(
363                "SeqScan partition {}, region_id: {}",
364                partition,
365                self.stream_ctx.input.region_metadata().region_id
366            );
367        }
368
369        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
370        let input = &self.stream_ctx.input;
371
372        let batch_stream = if input.flat_format {
373            // Use flat scan for bulk memtables
374            self.scan_flat_batch_in_partition(partition, metrics.clone())?
375        } else {
376            // Use regular batch scan for normal memtables
377            self.scan_batch_in_partition(partition, metrics.clone())?
378        };
379        let record_batch_stream = ConvertBatchStream::new(
380            batch_stream,
381            input.mapper.clone(),
382            input.cache_strategy.clone(),
383            metrics,
384        );
385
386        Ok(Box::pin(RecordBatchStreamWrapper::new(
387            input.mapper.output_schema(),
388            Box::pin(record_batch_stream),
389        )))
390    }
391
392    #[tracing::instrument(
393        skip_all,
394        fields(
395            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
396            partition = partition
397        )
398    )]
399    fn scan_batch_in_partition(
400        &self,
401        partition: usize,
402        part_metrics: PartitionMetrics,
403    ) -> Result<ScanBatchStream> {
404        ensure!(
405            partition < self.properties.partitions.len(),
406            PartitionOutOfRangeSnafu {
407                given: partition,
408                all: self.properties.partitions.len(),
409            }
410        );
411
412        if self.properties.partitions[partition].is_empty() {
413            return Ok(Box::pin(futures::stream::empty()));
414        }
415
416        let stream_ctx = self.stream_ctx.clone();
417        let semaphore = self.new_semaphore();
418        let partition_ranges = self.properties.partitions[partition].clone();
419        let compaction = self.stream_ctx.input.compaction;
420        let distinguish_range = self.properties.distinguish_partition_range;
421        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
422
423        let stream = try_stream! {
424            part_metrics.on_first_poll();
425            // Start fetch time before building sources so scan cost contains
426            // build part cost.
427            let mut fetch_start = Instant::now();
428
429            let counts = file_range_counts(
430                stream_ctx.input.num_memtables(),
431                stream_ctx.input.num_files(),
432                &stream_ctx.ranges,
433                partition_ranges.iter(),
434            );
435            let range_builder_list = Arc::new(RangeBuilderList::new(
436                stream_ctx.input.num_memtables(),
437                counts,
438            ));
439            let _mapper = stream_ctx.input.mapper.as_primary_key().context(UnexpectedSnafu {
440                reason: "Unexpected format",
441            })?;
442            // Scans each part.
443            for part_range in partition_ranges {
444                let mut sources = Vec::new();
445                build_sources(
446                    &stream_ctx,
447                    &part_range,
448                    compaction,
449                    &part_metrics,
450                    range_builder_list.clone(),
451                    &mut sources,
452                    file_scan_semaphore.clone(),
453                ).await?;
454
455                let mut reader =
456                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
457                        .await?;
458                #[cfg(debug_assertions)]
459                let mut checker = crate::read::BatchChecker::default()
460                    .with_start(Some(part_range.start))
461                    .with_end(Some(part_range.end));
462
463                let mut metrics = ScannerMetrics {
464                    scan_cost: fetch_start.elapsed(),
465                    ..Default::default()
466                };
467                fetch_start = Instant::now();
468
469                while let Some(batch) = reader.next_batch().await? {
470                    metrics.scan_cost += fetch_start.elapsed();
471                    metrics.num_batches += 1;
472                    metrics.num_rows += batch.num_rows();
473
474                    debug_assert!(!batch.is_empty());
475                    if batch.is_empty() {
476                        fetch_start = Instant::now();
477                        continue;
478                    }
479
480                    #[cfg(debug_assertions)]
481                    checker.ensure_part_range_batch(
482                        "SeqScan",
483                        _mapper.metadata().region_id,
484                        partition,
485                        part_range,
486                        &batch,
487                    );
488
489                    let yield_start = Instant::now();
490                    yield ScanBatch::Normal(batch);
491                    metrics.yield_cost += yield_start.elapsed();
492
493                    fetch_start = Instant::now();
494                }
495
496                // Yields an empty part to indicate this range is terminated.
497                // The query engine can use this to optimize some queries.
498                if distinguish_range {
499                    let yield_start = Instant::now();
500                    yield ScanBatch::Normal(Batch::empty());
501                    metrics.yield_cost += yield_start.elapsed();
502                }
503
504                metrics.scan_cost += fetch_start.elapsed();
505                fetch_start = Instant::now();
506                part_metrics.merge_metrics(&metrics);
507            }
508
509            part_metrics.on_finish();
510        };
511        Ok(Box::pin(stream))
512    }
513
514    #[tracing::instrument(
515        skip_all,
516        fields(
517            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
518            partition = partition
519        )
520    )]
521    fn scan_flat_batch_in_partition(
522        &self,
523        partition: usize,
524        part_metrics: PartitionMetrics,
525    ) -> Result<ScanBatchStream> {
526        ensure!(
527            partition < self.properties.partitions.len(),
528            PartitionOutOfRangeSnafu {
529                given: partition,
530                all: self.properties.partitions.len(),
531            }
532        );
533
534        if self.properties.partitions[partition].is_empty() {
535            return Ok(Box::pin(futures::stream::empty()));
536        }
537
538        let stream_ctx = self.stream_ctx.clone();
539        let semaphore = self.new_semaphore();
540        let partition_ranges = self.properties.partitions[partition].clone();
541        let compaction = self.stream_ctx.input.compaction;
542        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
543
544        let stream = try_stream! {
545            part_metrics.on_first_poll();
546            // Start fetch time before building sources so scan cost contains
547            // build part cost.
548            let mut fetch_start = Instant::now();
549
550            let counts = file_range_counts(
551                stream_ctx.input.num_memtables(),
552                stream_ctx.input.num_files(),
553                &stream_ctx.ranges,
554                partition_ranges.iter(),
555            );
556            let range_builder_list = Arc::new(RangeBuilderList::new(
557                stream_ctx.input.num_memtables(),
558                counts,
559            ));
560            // Scans each part.
561            for part_range in partition_ranges {
562                let mut sources = Vec::new();
563                build_flat_sources(
564                    &stream_ctx,
565                    &part_range,
566                    compaction,
567                    &part_metrics,
568                    range_builder_list.clone(),
569                    &mut sources,
570                    file_scan_semaphore.clone(),
571                ).await?;
572
573                let mut reader =
574                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics))
575                        .await?;
576
577                let mut metrics = ScannerMetrics {
578                    scan_cost: fetch_start.elapsed(),
579                    ..Default::default()
580                };
581                fetch_start = Instant::now();
582
583                while let Some(record_batch) = reader.try_next().await? {
584                    metrics.scan_cost += fetch_start.elapsed();
585                    metrics.num_batches += 1;
586                    metrics.num_rows += record_batch.num_rows();
587
588                    debug_assert!(record_batch.num_rows() > 0);
589                    if record_batch.num_rows() == 0 {
590                        fetch_start = Instant::now();
591                        continue;
592                    }
593
594                    let yield_start = Instant::now();
595                    yield ScanBatch::RecordBatch(record_batch);
596                    metrics.yield_cost += yield_start.elapsed();
597
598                    fetch_start = Instant::now();
599                }
600
601                metrics.scan_cost += fetch_start.elapsed();
602                fetch_start = Instant::now();
603                part_metrics.merge_metrics(&metrics);
604            }
605
606            part_metrics.on_finish();
607        };
608        Ok(Box::pin(stream))
609    }
610
611    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
612        if self.properties.target_partitions() > self.properties.num_partitions() {
613            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
614            // This semaphore is partition level.
615            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
616            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
617            // files in a part range.
618            Some(Arc::new(Semaphore::new(
619                self.properties.target_partitions() - self.properties.num_partitions() + 1,
620            )))
621        } else {
622            None
623        }
624    }
625
626    /// Creates a new partition metrics instance.
627    /// Sets the partition metrics for the given partition if it is not for compaction.
628    fn new_partition_metrics(
629        &self,
630        explain_verbose: bool,
631        metrics_set: &ExecutionPlanMetricsSet,
632        partition: usize,
633    ) -> PartitionMetrics {
634        let metrics = PartitionMetrics::new(
635            self.stream_ctx.input.mapper.metadata().region_id,
636            partition,
637            get_scanner_type(self.stream_ctx.input.compaction),
638            self.stream_ctx.query_start,
639            explain_verbose,
640            metrics_set,
641        );
642
643        if !self.stream_ctx.input.compaction {
644            self.metrics_list.set(partition, metrics.clone());
645        }
646
647        metrics
648    }
649
650    /// Finds the maximum number of files to read in a single partition range.
651    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
652        partition_ranges
653            .iter()
654            .map(|part_range| {
655                let range_meta = &ranges[part_range.identifier];
656                range_meta.indices.len()
657            })
658            .max()
659            .unwrap_or(0)
660    }
661
662    /// Checks resource limit for the scanner.
663    pub(crate) fn check_scan_limit(&self) -> Result<()> {
664        // Check max file count limit for all partitions since we scan them in parallel.
665        let total_max_files: usize = self
666            .properties
667            .partitions
668            .iter()
669            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
670            .sum();
671
672        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
673        if total_max_files > max_concurrent_files {
674            return TooManyFilesToReadSnafu {
675                actual: total_max_files,
676                max: max_concurrent_files,
677            }
678            .fail();
679        }
680
681        Ok(())
682    }
683}
684
685impl RegionScanner for SeqScan {
686    fn name(&self) -> &str {
687        "SeqScan"
688    }
689
690    fn properties(&self) -> &ScannerProperties {
691        &self.properties
692    }
693
694    fn schema(&self) -> SchemaRef {
695        self.stream_ctx.input.mapper.output_schema()
696    }
697
698    fn metadata(&self) -> RegionMetadataRef {
699        self.stream_ctx.input.mapper.metadata().clone()
700    }
701
702    fn scan_partition(
703        &self,
704        ctx: &QueryScanContext,
705        metrics_set: &ExecutionPlanMetricsSet,
706        partition: usize,
707    ) -> Result<SendableRecordBatchStream, BoxedError> {
708        self.scan_partition_impl(ctx, metrics_set, partition)
709            .map_err(BoxedError::new)
710    }
711
712    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
713        self.properties.prepare(request);
714
715        self.check_scan_limit().map_err(BoxedError::new)?;
716
717        Ok(())
718    }
719
720    fn has_predicate_without_region(&self) -> bool {
721        let predicate = self
722            .stream_ctx
723            .input
724            .predicate_group()
725            .predicate_without_region();
726        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
727    }
728
729    fn set_logical_region(&mut self, logical_region: bool) {
730        self.properties.set_logical_region(logical_region);
731    }
732}
733
734impl DisplayAs for SeqScan {
735    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
736        write!(
737            f,
738            "SeqScan: region={}, ",
739            self.stream_ctx.input.mapper.metadata().region_id
740        )?;
741        match t {
742            // TODO(LFC): Implement all the "TreeRender" display format.
743            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
744                self.stream_ctx.format_for_explain(false, f)
745            }
746            DisplayFormatType::Verbose => {
747                self.stream_ctx.format_for_explain(true, f)?;
748                self.metrics_list.format_verbose_metrics(f)
749            }
750        }
751    }
752}
753
754impl fmt::Debug for SeqScan {
755    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
756        f.debug_struct("SeqScan")
757            .field("num_ranges", &self.stream_ctx.ranges.len())
758            .finish()
759    }
760}
761
762/// Builds sources for the partition range and push them to the `sources` vector.
763pub(crate) async fn build_sources(
764    stream_ctx: &Arc<StreamContext>,
765    part_range: &PartitionRange,
766    compaction: bool,
767    part_metrics: &PartitionMetrics,
768    range_builder_list: Arc<RangeBuilderList>,
769    sources: &mut Vec<Source>,
770    semaphore: Option<Arc<Semaphore>>,
771) -> Result<()> {
772    // Gets range meta.
773    let range_meta = &stream_ctx.ranges[part_range.identifier];
774    #[cfg(debug_assertions)]
775    if compaction {
776        // Compaction expects input sources are not been split.
777        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
778        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
779            // It should scan all row groups.
780            debug_assert_eq!(
781                -1, row_group_idx.row_group_index,
782                "Expect {} range scan all row groups, given: {}",
783                i, row_group_idx.row_group_index,
784            );
785        }
786    }
787
788    let read_type = if compaction {
789        "compaction"
790    } else {
791        "seq_scan_files"
792    };
793    let num_indices = range_meta.row_group_indices.len();
794    if num_indices == 0 {
795        return Ok(());
796    }
797
798    sources.reserve(num_indices);
799    let mut ordered_sources = Vec::with_capacity(num_indices);
800    ordered_sources.resize_with(num_indices, || None);
801    let mut file_scan_tasks = Vec::new();
802
803    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
804        if stream_ctx.is_mem_range_index(*index) {
805            let stream = scan_mem_ranges(
806                stream_ctx.clone(),
807                part_metrics.clone(),
808                *index,
809                range_meta.time_range,
810            );
811            ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
812        } else if stream_ctx.is_file_range_index(*index) {
813            if let Some(semaphore_ref) = semaphore.as_ref() {
814                // run in parallel, controlled by semaphore
815                let stream_ctx = stream_ctx.clone();
816                let part_metrics = part_metrics.clone();
817                let range_builder_list = range_builder_list.clone();
818                let semaphore = Arc::clone(semaphore_ref);
819                let row_group_index = *index;
820                file_scan_tasks.push(async move {
821                    let _permit = semaphore.acquire().await.unwrap();
822                    let stream = scan_file_ranges(
823                        stream_ctx,
824                        part_metrics,
825                        row_group_index,
826                        read_type,
827                        range_builder_list,
828                    )
829                    .await?;
830                    Ok((position, Source::Stream(Box::pin(stream) as _)))
831                });
832            } else {
833                // no semaphore, run sequentially
834                let stream = scan_file_ranges(
835                    stream_ctx.clone(),
836                    part_metrics.clone(),
837                    *index,
838                    read_type,
839                    range_builder_list.clone(),
840                )
841                .await?;
842                ordered_sources[position] = Some(Source::Stream(Box::pin(stream) as _));
843            }
844        } else {
845            let stream =
846                scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?;
847            ordered_sources[position] = Some(Source::Stream(stream));
848        }
849    }
850
851    if !file_scan_tasks.is_empty() {
852        let results = futures::future::try_join_all(file_scan_tasks).await?;
853        for (position, source) in results {
854            ordered_sources[position] = Some(source);
855        }
856    }
857
858    for source in ordered_sources.into_iter().flatten() {
859        sources.push(source);
860    }
861    Ok(())
862}
863
864/// Builds flat sources for the partition range and push them to the `sources` vector.
865pub(crate) async fn build_flat_sources(
866    stream_ctx: &Arc<StreamContext>,
867    part_range: &PartitionRange,
868    compaction: bool,
869    part_metrics: &PartitionMetrics,
870    range_builder_list: Arc<RangeBuilderList>,
871    sources: &mut Vec<BoxedRecordBatchStream>,
872    semaphore: Option<Arc<Semaphore>>,
873) -> Result<()> {
874    // Gets range meta.
875    let range_meta = &stream_ctx.ranges[part_range.identifier];
876    #[cfg(debug_assertions)]
877    if compaction {
878        // Compaction expects input sources are not been split.
879        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
880        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
881            // It should scan all row groups.
882            debug_assert_eq!(
883                -1, row_group_idx.row_group_index,
884                "Expect {} range scan all row groups, given: {}",
885                i, row_group_idx.row_group_index,
886            );
887        }
888    }
889
890    let read_type = if compaction {
891        "compaction"
892    } else {
893        "seq_scan_files"
894    };
895    let num_indices = range_meta.row_group_indices.len();
896    if num_indices == 0 {
897        return Ok(());
898    }
899
900    let should_split = should_split_flat_batches_for_merge(stream_ctx, range_meta);
901    sources.reserve(num_indices);
902    let mut ordered_sources = Vec::with_capacity(num_indices);
903    ordered_sources.resize_with(num_indices, || None);
904    let mut file_scan_tasks = Vec::new();
905
906    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
907        if stream_ctx.is_mem_range_index(*index) {
908            let stream = scan_flat_mem_ranges(stream_ctx.clone(), part_metrics.clone(), *index);
909            ordered_sources[position] = Some(Box::pin(stream) as _);
910        } else if stream_ctx.is_file_range_index(*index) {
911            if let Some(semaphore_ref) = semaphore.as_ref() {
912                // run in parallel, controlled by semaphore
913                let stream_ctx = stream_ctx.clone();
914                let part_metrics = part_metrics.clone();
915                let range_builder_list = range_builder_list.clone();
916                let semaphore = Arc::clone(semaphore_ref);
917                let row_group_index = *index;
918                file_scan_tasks.push(async move {
919                    let _permit = semaphore.acquire().await.unwrap();
920                    let stream = scan_flat_file_ranges(
921                        stream_ctx,
922                        part_metrics,
923                        row_group_index,
924                        read_type,
925                        range_builder_list,
926                    )
927                    .await?;
928                    Ok((position, Box::pin(stream) as _))
929                });
930            } else {
931                // no semaphore, run sequentially
932                let stream = scan_flat_file_ranges(
933                    stream_ctx.clone(),
934                    part_metrics.clone(),
935                    *index,
936                    read_type,
937                    range_builder_list.clone(),
938                )
939                .await?;
940                ordered_sources[position] = Some(Box::pin(stream) as _);
941            }
942        } else {
943            let stream =
944                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
945            ordered_sources[position] = Some(stream);
946        }
947    }
948
949    if !file_scan_tasks.is_empty() {
950        let results = futures::future::try_join_all(file_scan_tasks).await?;
951        for (position, stream) in results {
952            ordered_sources[position] = Some(stream);
953        }
954    }
955
956    for stream in ordered_sources.into_iter().flatten() {
957        if should_split {
958            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
959        } else {
960            sources.push(stream);
961        }
962    }
963
964    if should_split {
965        common_telemetry::debug!(
966            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
967            stream_ctx.input.region_metadata().region_id,
968            sources.len(),
969            part_range,
970        );
971    }
972
973    Ok(())
974}
975
976#[cfg(test)]
977impl SeqScan {
978    /// Returns the input.
979    pub(crate) fn input(&self) -> &ScanInput {
980        &self.stream_ctx.input
981    }
982}
983
984/// Returns the scanner type.
985fn get_scanner_type(compaction: bool) -> &'static str {
986    if compaction {
987        "SeqScan(compaction)"
988    } else {
989        "SeqScan"
990    }
991}