Skip to main content

mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::{StreamExt, TryStreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result, TooManyFilesToReadSnafu};
39use crate::read::flat_dedup::{FlatDedupReader, FlatLastNonNull, FlatLastRow};
40use crate::read::flat_merge::FlatMergeReader;
41use crate::read::last_row::FlatLastRowReader;
42use crate::read::pruner::{PartitionPruner, Pruner};
43use crate::read::range::RangeMeta;
44use crate::read::scan_region::{ScanInput, StreamContext};
45use crate::read::scan_util::{
46    PartitionMetrics, PartitionMetricsList, SplitRecordBatchStream, compute_parallel_channel_size,
47    scan_flat_file_ranges, scan_flat_mem_ranges, should_split_flat_batches_for_merge,
48};
49use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
50use crate::read::{BoxedRecordBatchStream, ScannerMetrics, scan_util};
51use crate::region::options::MergeMode;
52use crate::sst::parquet::DEFAULT_READ_BATCH_SIZE;
53
54/// Scans a region and returns rows in a sorted sequence.
55///
56/// The output order is always `order by primary keys, time index` inside every
57/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
58pub struct SeqScan {
59    /// Properties of the scanner.
60    properties: ScannerProperties,
61    /// Context of streams.
62    stream_ctx: Arc<StreamContext>,
63    /// Shared pruner for file range building.
64    pruner: Arc<Pruner>,
65    /// Metrics for each partition.
66    /// The scanner only sets in query and keeps it empty during compaction.
67    metrics_list: PartitionMetricsList,
68}
69
70impl SeqScan {
71    /// Creates a new [SeqScan] with the given input.
72    /// If `input.compaction` is true, the scanner will not attempt to split ranges.
73    pub(crate) fn new(input: ScanInput) -> Self {
74        let mut properties = ScannerProperties::default()
75            .with_append_mode(input.append_mode)
76            .with_total_rows(input.total_rows());
77        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input));
78        properties.partitions = vec![stream_ctx.partition_ranges()];
79
80        // Create the shared pruner with number of workers equal to CPU cores.
81        let num_workers = common_stat::get_total_cpu_cores().max(1);
82        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
83
84        Self {
85            properties,
86            stream_ctx,
87            pruner,
88            metrics_list: PartitionMetricsList::default(),
89        }
90    }
91
92    /// Builds a stream for the query.
93    ///
94    /// The returned stream is not partitioned and will contains all the data. If want
95    /// partitioned scan, use [`RegionScanner::scan_partition`].
96    #[tracing::instrument(
97        skip_all,
98        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
99    )]
100    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
101        let metrics_set = ExecutionPlanMetricsSet::new();
102        let streams = (0..self.properties.partitions.len())
103            .map(|partition: usize| {
104                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
105            })
106            .collect::<Result<Vec<_>, _>>()?;
107
108        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
109        Ok(Box::pin(aggr_stream))
110    }
111
112    /// Scan [`Batch`] in all partitions one by one.
113    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
114        let metrics_set = ExecutionPlanMetricsSet::new();
115
116        let streams = (0..self.properties.partitions.len())
117            .map(|partition| {
118                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
119                self.scan_flat_batch_in_partition(partition, metrics)
120            })
121            .collect::<Result<Vec<_>>>()?;
122
123        Ok(Box::pin(futures::stream::iter(streams).flatten()))
124    }
125
126    /// Builds a [BoxedRecordBatchStream] from sequential scan for flat format compaction.
127    ///
128    /// # Panics
129    /// Panics if the compaction flag is not set.
130    pub async fn build_flat_reader_for_compaction(&self) -> Result<BoxedRecordBatchStream> {
131        assert!(self.stream_ctx.input.compaction);
132
133        let metrics_set = ExecutionPlanMetricsSet::new();
134        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
135        debug_assert_eq!(1, self.properties.partitions.len());
136        let partition_ranges = &self.properties.partitions[0];
137
138        let reader = Self::merge_all_flat_ranges_for_compaction(
139            &self.stream_ctx,
140            partition_ranges,
141            &part_metrics,
142            self.pruner.clone(),
143        )
144        .await?;
145        Ok(reader)
146    }
147
148    /// Builds a merge reader that reads all flat ranges.
149    /// Callers MUST not split ranges before calling this method.
150    async fn merge_all_flat_ranges_for_compaction(
151        stream_ctx: &Arc<StreamContext>,
152        partition_ranges: &[PartitionRange],
153        part_metrics: &PartitionMetrics,
154        pruner: Arc<Pruner>,
155    ) -> Result<BoxedRecordBatchStream> {
156        pruner.add_partition_ranges(partition_ranges);
157        let partition_pruner = Arc::new(PartitionPruner::new(pruner, partition_ranges));
158
159        let mut sources = Vec::new();
160        for part_range in partition_ranges {
161            build_flat_sources(
162                stream_ctx,
163                part_range,
164                true,
165                part_metrics,
166                partition_pruner.clone(),
167                &mut sources,
168                None,
169            )
170            .await?;
171        }
172
173        common_telemetry::debug!(
174            "Build flat reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
175            stream_ctx.input.mapper.metadata().region_id,
176            partition_ranges.len(),
177            sources.len()
178        );
179        Self::build_flat_reader_from_sources(
180            stream_ctx,
181            sources,
182            None,
183            None,
184            compute_parallel_channel_size(DEFAULT_READ_BATCH_SIZE),
185        )
186        .await
187    }
188
189    /// Builds a flat reader to read sources that returns RecordBatch. If `semaphore` is provided, reads sources in parallel
190    /// if possible.
191    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
192    pub(crate) async fn build_flat_reader_from_sources(
193        stream_ctx: &StreamContext,
194        mut sources: Vec<BoxedRecordBatchStream>,
195        semaphore: Option<Arc<Semaphore>>,
196        part_metrics: Option<&PartitionMetrics>,
197        channel_size: usize,
198    ) -> Result<BoxedRecordBatchStream> {
199        if let Some(semaphore) = semaphore.as_ref() {
200            // Read sources in parallel.
201            if sources.len() > 1 {
202                sources = stream_ctx.input.create_parallel_flat_sources(
203                    sources,
204                    semaphore.clone(),
205                    channel_size,
206                )?;
207            }
208        }
209
210        let mapper = stream_ctx.input.mapper.as_flat().unwrap();
211        let schema = mapper.input_arrow_schema(stream_ctx.input.compaction);
212
213        let metrics_reporter = part_metrics.map(|m| m.merge_metrics_reporter());
214        let reader =
215            FlatMergeReader::new(schema, sources, DEFAULT_READ_BATCH_SIZE, metrics_reporter)
216                .await?;
217
218        let dedup = !stream_ctx.input.append_mode;
219        let dedup_metrics_reporter = part_metrics.map(|m| m.dedup_metrics_reporter());
220        let reader = if dedup {
221            match stream_ctx.input.merge_mode {
222                MergeMode::LastRow => Box::pin(
223                    FlatDedupReader::new(
224                        reader.into_stream().boxed(),
225                        FlatLastRow::new(stream_ctx.input.filter_deleted),
226                        dedup_metrics_reporter,
227                    )
228                    .into_stream(),
229                ) as _,
230                MergeMode::LastNonNull => Box::pin(
231                    FlatDedupReader::new(
232                        reader.into_stream().boxed(),
233                        FlatLastNonNull::new(
234                            mapper.field_column_start(),
235                            stream_ctx.input.filter_deleted,
236                        ),
237                        dedup_metrics_reporter,
238                    )
239                    .into_stream(),
240                ) as _,
241            }
242        } else {
243            Box::pin(reader.into_stream()) as _
244        };
245
246        let reader = match &stream_ctx.input.series_row_selector {
247            Some(TimeSeriesRowSelector::LastRow) => {
248                Box::pin(FlatLastRowReader::new(reader).into_stream()) as _
249            }
250            None => reader,
251        };
252
253        Ok(reader)
254    }
255
256    /// Scans the given partition when the part list is set properly.
257    /// Otherwise the returned stream might not contains any data.
258    fn scan_partition_impl(
259        &self,
260        ctx: &QueryScanContext,
261        metrics_set: &ExecutionPlanMetricsSet,
262        partition: usize,
263    ) -> Result<SendableRecordBatchStream> {
264        if ctx.explain_verbose {
265            common_telemetry::info!(
266                "SeqScan partition {}, region_id: {}",
267                partition,
268                self.stream_ctx.input.region_metadata().region_id
269            );
270        }
271
272        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
273        let input = &self.stream_ctx.input;
274
275        let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
276        let record_batch_stream = ConvertBatchStream::new(
277            batch_stream,
278            input.mapper.clone(),
279            input.cache_strategy.clone(),
280            metrics,
281        );
282
283        Ok(Box::pin(RecordBatchStreamWrapper::new(
284            input.mapper.output_schema(),
285            Box::pin(record_batch_stream),
286        )))
287    }
288
289    #[tracing::instrument(
290        skip_all,
291        fields(
292            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
293            partition = partition
294        )
295    )]
296    fn scan_flat_batch_in_partition(
297        &self,
298        partition: usize,
299        part_metrics: PartitionMetrics,
300    ) -> Result<ScanBatchStream> {
301        ensure!(
302            partition < self.properties.partitions.len(),
303            PartitionOutOfRangeSnafu {
304                given: partition,
305                all: self.properties.partitions.len(),
306            }
307        );
308
309        if self.properties.partitions[partition].is_empty() {
310            return Ok(Box::pin(futures::stream::empty()));
311        }
312
313        let stream_ctx = self.stream_ctx.clone();
314        let semaphore = self.new_semaphore();
315        let partition_ranges = self.properties.partitions[partition].clone();
316        let compaction = self.stream_ctx.input.compaction;
317        let file_scan_semaphore = if compaction { None } else { semaphore.clone() };
318        let pruner = self.pruner.clone();
319        // Initializes ref counts for the pruner.
320        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
321        // then the ref count won't be decremented.
322        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
323        pruner.add_partition_ranges(&partition_ranges);
324        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &partition_ranges));
325
326        let stream = try_stream! {
327            part_metrics.on_first_poll();
328            // Start fetch time before building sources so scan cost contains
329            // build part cost.
330            let mut fetch_start = Instant::now();
331
332            // Scans each part.
333            for part_range in partition_ranges {
334                let mut sources = Vec::new();
335                let split_batch_size = build_flat_sources(
336                    &stream_ctx,
337                    &part_range,
338                    compaction,
339                    &part_metrics,
340                    partition_pruner.clone(),
341                    &mut sources,
342                    file_scan_semaphore.clone(),
343                ).await?;
344
345                let channel_size = compute_parallel_channel_size(
346                    split_batch_size.unwrap_or(DEFAULT_READ_BATCH_SIZE),
347                );
348                let mut reader =
349                    Self::build_flat_reader_from_sources(&stream_ctx, sources, semaphore.clone(), Some(&part_metrics), channel_size)
350                        .await?;
351
352                let mut metrics = ScannerMetrics {
353                    scan_cost: fetch_start.elapsed(),
354                    ..Default::default()
355                };
356                fetch_start = Instant::now();
357
358                while let Some(record_batch) = reader.try_next().await? {
359                    metrics.scan_cost += fetch_start.elapsed();
360                    metrics.num_batches += 1;
361                    metrics.num_rows += record_batch.num_rows();
362
363                    debug_assert!(record_batch.num_rows() > 0);
364                    if record_batch.num_rows() == 0 {
365                        fetch_start = Instant::now();
366                        continue;
367                    }
368
369                    let yield_start = Instant::now();
370                    yield ScanBatch::RecordBatch(record_batch);
371                    metrics.yield_cost += yield_start.elapsed();
372
373                    fetch_start = Instant::now();
374                }
375
376                metrics.scan_cost += fetch_start.elapsed();
377                fetch_start = Instant::now();
378                part_metrics.merge_metrics(&metrics);
379            }
380
381            part_metrics.on_finish();
382        };
383        Ok(Box::pin(stream))
384    }
385
386    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
387        if self.properties.target_partitions() > self.properties.num_partitions() {
388            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
389            // This semaphore is partition level.
390            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
391            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
392            // files in a part range.
393            Some(Arc::new(Semaphore::new(
394                self.properties.target_partitions() - self.properties.num_partitions() + 1,
395            )))
396        } else {
397            None
398        }
399    }
400
401    /// Creates a new partition metrics instance.
402    /// Sets the partition metrics for the given partition if it is not for compaction.
403    fn new_partition_metrics(
404        &self,
405        explain_verbose: bool,
406        metrics_set: &ExecutionPlanMetricsSet,
407        partition: usize,
408    ) -> PartitionMetrics {
409        let metrics = PartitionMetrics::new(
410            self.stream_ctx.input.mapper.metadata().region_id,
411            partition,
412            get_scanner_type(self.stream_ctx.input.compaction),
413            self.stream_ctx.query_start,
414            explain_verbose,
415            metrics_set,
416        );
417
418        if !self.stream_ctx.input.compaction {
419            self.metrics_list.set(partition, metrics.clone());
420        }
421
422        metrics
423    }
424
425    /// Finds the maximum number of files to read in a single partition range.
426    fn max_files_in_partition(ranges: &[RangeMeta], partition_ranges: &[PartitionRange]) -> usize {
427        partition_ranges
428            .iter()
429            .map(|part_range| {
430                let range_meta = &ranges[part_range.identifier];
431                range_meta.indices.len()
432            })
433            .max()
434            .unwrap_or(0)
435    }
436
437    /// Checks resource limit for the scanner.
438    pub(crate) fn check_scan_limit(&self) -> Result<()> {
439        // Check max file count limit for all partitions since we scan them in parallel.
440        let total_max_files: usize = self
441            .properties
442            .partitions
443            .iter()
444            .map(|partition| Self::max_files_in_partition(&self.stream_ctx.ranges, partition))
445            .sum();
446
447        let max_concurrent_files = self.stream_ctx.input.max_concurrent_scan_files;
448        if total_max_files > max_concurrent_files {
449            return TooManyFilesToReadSnafu {
450                actual: total_max_files,
451                max: max_concurrent_files,
452            }
453            .fail();
454        }
455
456        Ok(())
457    }
458}
459
460impl RegionScanner for SeqScan {
461    fn name(&self) -> &str {
462        "SeqScan"
463    }
464
465    fn properties(&self) -> &ScannerProperties {
466        &self.properties
467    }
468
469    fn schema(&self) -> SchemaRef {
470        self.stream_ctx.input.mapper.output_schema()
471    }
472
473    fn metadata(&self) -> RegionMetadataRef {
474        self.stream_ctx.input.mapper.metadata().clone()
475    }
476
477    fn scan_partition(
478        &self,
479        ctx: &QueryScanContext,
480        metrics_set: &ExecutionPlanMetricsSet,
481        partition: usize,
482    ) -> Result<SendableRecordBatchStream, BoxedError> {
483        self.scan_partition_impl(ctx, metrics_set, partition)
484            .map_err(BoxedError::new)
485    }
486
487    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
488        self.properties.prepare(request);
489
490        self.check_scan_limit().map_err(BoxedError::new)?;
491
492        Ok(())
493    }
494
495    fn has_predicate_without_region(&self) -> bool {
496        let predicate = self
497            .stream_ctx
498            .input
499            .predicate_group()
500            .predicate_without_region();
501        predicate.is_some()
502    }
503
504    fn add_dyn_filter_to_predicate(
505        &mut self,
506        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
507    ) -> Vec<bool> {
508        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
509    }
510
511    fn set_logical_region(&mut self, logical_region: bool) {
512        self.properties.set_logical_region(logical_region);
513    }
514}
515
516impl DisplayAs for SeqScan {
517    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
518        write!(
519            f,
520            "SeqScan: region={}, ",
521            self.stream_ctx.input.mapper.metadata().region_id
522        )?;
523        match t {
524            // TODO(LFC): Implement all the "TreeRender" display format.
525            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
526                self.stream_ctx.format_for_explain(false, f)
527            }
528            DisplayFormatType::Verbose => {
529                self.stream_ctx.format_for_explain(true, f)?;
530                self.metrics_list.format_verbose_metrics(f)
531            }
532        }
533    }
534}
535
536impl fmt::Debug for SeqScan {
537    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
538        f.debug_struct("SeqScan")
539            .field("num_ranges", &self.stream_ctx.ranges.len())
540            .finish()
541    }
542}
543
544/// Builds flat sources for the partition range and push them to the `sources` vector.
545/// Returns the estimated rows per batch after splitting if splitting is applied, or `None`.
546pub(crate) async fn build_flat_sources(
547    stream_ctx: &Arc<StreamContext>,
548    part_range: &PartitionRange,
549    compaction: bool,
550    part_metrics: &PartitionMetrics,
551    partition_pruner: Arc<PartitionPruner>,
552    sources: &mut Vec<BoxedRecordBatchStream>,
553    semaphore: Option<Arc<Semaphore>>,
554) -> Result<Option<usize>> {
555    // Gets range meta.
556    let range_meta = &stream_ctx.ranges[part_range.identifier];
557    #[cfg(debug_assertions)]
558    if compaction {
559        // Compaction expects input sources are not been split.
560        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
561        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
562            // It should scan all row groups.
563            debug_assert_eq!(
564                -1, row_group_idx.row_group_index,
565                "Expect {} range scan all row groups, given: {}",
566                i, row_group_idx.row_group_index,
567            );
568        }
569    }
570
571    let read_type = if compaction {
572        "compaction"
573    } else {
574        "seq_scan_files"
575    };
576    let num_indices = range_meta.row_group_indices.len();
577    if num_indices == 0 {
578        return Ok(None);
579    }
580
581    let split_batch_size = should_split_flat_batches_for_merge(stream_ctx, range_meta);
582    let should_split = split_batch_size.is_some();
583    sources.reserve(num_indices);
584    let mut ordered_sources = Vec::with_capacity(num_indices);
585    ordered_sources.resize_with(num_indices, || None);
586    let mut file_scan_tasks = Vec::new();
587
588    for (position, index) in range_meta.row_group_indices.iter().enumerate() {
589        if stream_ctx.is_mem_range_index(*index) {
590            let stream = scan_flat_mem_ranges(
591                stream_ctx.clone(),
592                part_metrics.clone(),
593                *index,
594                range_meta.time_range,
595            );
596            ordered_sources[position] = Some(Box::pin(stream) as _);
597        } else if stream_ctx.is_file_range_index(*index) {
598            if let Some(semaphore_ref) = semaphore.as_ref() {
599                // run in parallel, controlled by semaphore
600                let stream_ctx = stream_ctx.clone();
601                let part_metrics = part_metrics.clone();
602                let partition_pruner = partition_pruner.clone();
603                let semaphore = Arc::clone(semaphore_ref);
604                let row_group_index = *index;
605                file_scan_tasks.push(async move {
606                    let _permit = semaphore.acquire().await.unwrap();
607                    let stream = scan_flat_file_ranges(
608                        stream_ctx,
609                        part_metrics,
610                        row_group_index,
611                        read_type,
612                        partition_pruner,
613                    )
614                    .await?;
615                    Ok((position, Box::pin(stream) as _))
616                });
617            } else {
618                // no semaphore, run sequentially
619                let stream = scan_flat_file_ranges(
620                    stream_ctx.clone(),
621                    part_metrics.clone(),
622                    *index,
623                    read_type,
624                    partition_pruner.clone(),
625                )
626                .await?;
627                ordered_sources[position] = Some(Box::pin(stream) as _);
628            }
629        } else {
630            let stream =
631                scan_util::maybe_scan_flat_other_ranges(stream_ctx, *index, part_metrics).await?;
632            ordered_sources[position] = Some(stream);
633        }
634    }
635
636    if !file_scan_tasks.is_empty() {
637        let results = futures::future::try_join_all(file_scan_tasks).await?;
638        for (position, stream) in results {
639            ordered_sources[position] = Some(stream);
640        }
641    }
642
643    for stream in ordered_sources.into_iter().flatten() {
644        if should_split {
645            sources.push(Box::pin(SplitRecordBatchStream::new(stream)));
646        } else {
647            sources.push(stream);
648        }
649    }
650
651    if should_split {
652        common_telemetry::debug!(
653            "Splitting record batches, region: {}, sources: {}, part_range: {:?}",
654            stream_ctx.input.region_metadata().region_id,
655            sources.len(),
656            part_range,
657        );
658    }
659
660    Ok(split_batch_size)
661}
662
663#[cfg(test)]
664impl SeqScan {
665    /// Returns the input.
666    pub(crate) fn input(&self) -> &ScanInput {
667        &self.stream_ctx.input
668    }
669}
670
671/// Returns the scanner type.
672fn get_scanner_type(compaction: bool) -> &'static str {
673    if compaction {
674        "SeqScan(compaction)"
675    } else {
676        "SeqScan"
677    }
678}