mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::StreamExt;
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::TimeSeriesRowSelector;
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
46use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
47use crate::region::options::MergeMode;
48
49/// Scans a region and returns rows in a sorted sequence.
50///
51/// The output order is always `order by primary keys, time index` inside every
52/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
53pub struct SeqScan {
54    /// Properties of the scanner.
55    properties: ScannerProperties,
56    /// Context of streams.
57    stream_ctx: Arc<StreamContext>,
58    /// The scanner is used for compaction.
59    compaction: bool,
60    /// Metrics for each partition.
61    /// The scanner only sets in query and keeps it empty during compaction.
62    metrics_list: PartitionMetricsList,
63}
64
65impl SeqScan {
66    /// Creates a new [SeqScan] with the given input and compaction flag.
67    /// If `compaction` is true, the scanner will not attempt to split ranges.
68    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
69        let mut properties = ScannerProperties::default()
70            .with_append_mode(input.append_mode)
71            .with_total_rows(input.total_rows());
72        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
73        properties.partitions = vec![stream_ctx.partition_ranges()];
74
75        Self {
76            properties,
77            stream_ctx,
78            compaction,
79            metrics_list: PartitionMetricsList::default(),
80        }
81    }
82
83    /// Builds a stream for the query.
84    ///
85    /// The returned stream is not partitioned and will contains all the data. If want
86    /// partitioned scan, use [`RegionScanner::scan_partition`].
87    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
88        let metrics_set = ExecutionPlanMetricsSet::new();
89        let streams = (0..self.properties.partitions.len())
90            .map(|partition: usize| self.scan_partition(&metrics_set, partition))
91            .collect::<Result<Vec<_>, _>>()?;
92
93        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
94        Ok(Box::pin(aggr_stream))
95    }
96
97    /// Scan [`Batch`] in all partitions one by one.
98    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
99        let metrics_set = ExecutionPlanMetricsSet::new();
100
101        let streams = (0..self.properties.partitions.len())
102            .map(|partition| {
103                let metrics = self.new_partition_metrics(&metrics_set, partition);
104                self.scan_batch_in_partition(partition, metrics)
105            })
106            .collect::<Result<Vec<_>>>()?;
107
108        Ok(Box::pin(futures::stream::iter(streams).flatten()))
109    }
110
111    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
112    ///
113    /// # Panics
114    /// Panics if the compaction flag is not set.
115    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
116        assert!(self.compaction);
117
118        let metrics_set = ExecutionPlanMetricsSet::new();
119        let part_metrics = self.new_partition_metrics(&metrics_set, 0);
120        debug_assert_eq!(1, self.properties.partitions.len());
121        let partition_ranges = &self.properties.partitions[0];
122
123        let reader = Self::merge_all_ranges_for_compaction(
124            &self.stream_ctx,
125            partition_ranges,
126            &part_metrics,
127        )
128        .await?;
129        Ok(Box::new(reader))
130    }
131
132    /// Builds a merge reader that reads all ranges.
133    /// Callers MUST not split ranges before calling this method.
134    async fn merge_all_ranges_for_compaction(
135        stream_ctx: &Arc<StreamContext>,
136        partition_ranges: &[PartitionRange],
137        part_metrics: &PartitionMetrics,
138    ) -> Result<BoxedBatchReader> {
139        let mut sources = Vec::new();
140        let range_builder_list = Arc::new(RangeBuilderList::new(
141            stream_ctx.input.num_memtables(),
142            stream_ctx.input.num_files(),
143        ));
144        for part_range in partition_ranges {
145            build_sources(
146                stream_ctx,
147                part_range,
148                true,
149                part_metrics,
150                range_builder_list.clone(),
151                &mut sources,
152            )
153            .await?;
154        }
155
156        common_telemetry::debug!(
157            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
158            stream_ctx.input.mapper.metadata().region_id,
159            partition_ranges.len(),
160            sources.len()
161        );
162        Self::build_reader_from_sources(stream_ctx, sources, None).await
163    }
164
165    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
166    /// if possible.
167    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
168    pub(crate) async fn build_reader_from_sources(
169        stream_ctx: &StreamContext,
170        mut sources: Vec<Source>,
171        semaphore: Option<Arc<Semaphore>>,
172    ) -> Result<BoxedBatchReader> {
173        if let Some(semaphore) = semaphore.as_ref() {
174            // Read sources in parallel.
175            if sources.len() > 1 {
176                sources = stream_ctx
177                    .input
178                    .create_parallel_sources(sources, semaphore.clone())?;
179            }
180        }
181
182        let mut builder = MergeReaderBuilder::from_sources(sources);
183        let reader = builder.build().await?;
184
185        let dedup = !stream_ctx.input.append_mode;
186        let reader = if dedup {
187            match stream_ctx.input.merge_mode {
188                MergeMode::LastRow => Box::new(DedupReader::new(
189                    reader,
190                    LastRow::new(stream_ctx.input.filter_deleted),
191                )) as _,
192                MergeMode::LastNonNull => Box::new(DedupReader::new(
193                    reader,
194                    LastNonNull::new(stream_ctx.input.filter_deleted),
195                )) as _,
196            }
197        } else {
198            Box::new(reader) as _
199        };
200
201        let reader = match &stream_ctx.input.series_row_selector {
202            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
203            None => reader,
204        };
205
206        Ok(reader)
207    }
208
209    /// Scans the given partition when the part list is set properly.
210    /// Otherwise the returned stream might not contains any data.
211    fn scan_partition_impl(
212        &self,
213        metrics_set: &ExecutionPlanMetricsSet,
214        partition: usize,
215    ) -> Result<SendableRecordBatchStream> {
216        let metrics = self.new_partition_metrics(metrics_set, partition);
217
218        let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
219
220        let input = &self.stream_ctx.input;
221        let record_batch_stream = ConvertBatchStream::new(
222            batch_stream,
223            input.mapper.clone(),
224            input.cache_strategy.clone(),
225            metrics,
226        );
227
228        Ok(Box::pin(RecordBatchStreamWrapper::new(
229            input.mapper.output_schema(),
230            Box::pin(record_batch_stream),
231        )))
232    }
233
234    fn scan_batch_in_partition(
235        &self,
236        partition: usize,
237        part_metrics: PartitionMetrics,
238    ) -> Result<ScanBatchStream> {
239        ensure!(
240            partition < self.properties.partitions.len(),
241            PartitionOutOfRangeSnafu {
242                given: partition,
243                all: self.properties.partitions.len(),
244            }
245        );
246
247        if self.properties.partitions[partition].is_empty() {
248            return Ok(Box::pin(futures::stream::empty()));
249        }
250
251        let stream_ctx = self.stream_ctx.clone();
252        let semaphore = self.new_semaphore();
253        let partition_ranges = self.properties.partitions[partition].clone();
254        let compaction = self.compaction;
255        let distinguish_range = self.properties.distinguish_partition_range;
256
257        let stream = try_stream! {
258            part_metrics.on_first_poll();
259
260            let range_builder_list = Arc::new(RangeBuilderList::new(
261                stream_ctx.input.num_memtables(),
262                stream_ctx.input.num_files(),
263            ));
264            // Scans each part.
265            for part_range in partition_ranges {
266                let mut sources = Vec::new();
267                build_sources(
268                    &stream_ctx,
269                    &part_range,
270                    compaction,
271                    &part_metrics,
272                    range_builder_list.clone(),
273                    &mut sources,
274                ).await?;
275
276                let mut metrics = ScannerMetrics::default();
277                let mut fetch_start = Instant::now();
278                let mut reader =
279                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
280                        .await?;
281                #[cfg(debug_assertions)]
282                let mut checker = crate::read::BatchChecker::default()
283                    .with_start(Some(part_range.start))
284                    .with_end(Some(part_range.end));
285
286                while let Some(batch) = reader.next_batch().await? {
287                    metrics.scan_cost += fetch_start.elapsed();
288                    metrics.num_batches += 1;
289                    metrics.num_rows += batch.num_rows();
290
291                    debug_assert!(!batch.is_empty());
292                    if batch.is_empty() {
293                        continue;
294                    }
295
296                    #[cfg(debug_assertions)]
297                    checker.ensure_part_range_batch(
298                        "SeqScan",
299                        stream_ctx.input.mapper.metadata().region_id,
300                        partition,
301                        part_range,
302                        &batch,
303                    );
304
305                    let yield_start = Instant::now();
306                    yield ScanBatch::Normal(batch);
307                    metrics.yield_cost += yield_start.elapsed();
308
309                    fetch_start = Instant::now();
310                }
311
312                // Yields an empty part to indicate this range is terminated.
313                // The query engine can use this to optimize some queries.
314                if distinguish_range {
315                    let yield_start = Instant::now();
316                    yield ScanBatch::Normal(Batch::empty());
317                    metrics.yield_cost += yield_start.elapsed();
318                }
319
320                metrics.scan_cost += fetch_start.elapsed();
321                part_metrics.merge_metrics(&metrics);
322            }
323
324            part_metrics.on_finish();
325        };
326        Ok(Box::pin(stream))
327    }
328
329    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
330        if self.properties.target_partitions() > self.properties.num_partitions() {
331            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
332            // This semaphore is partition level.
333            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
334            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
335            // files in a part range.
336            Some(Arc::new(Semaphore::new(
337                self.properties.target_partitions() - self.properties.num_partitions() + 1,
338            )))
339        } else {
340            None
341        }
342    }
343
344    /// Creates a new partition metrics instance.
345    /// Sets the partition metrics for the given partition if it is not for compaction.
346    fn new_partition_metrics(
347        &self,
348        metrics_set: &ExecutionPlanMetricsSet,
349        partition: usize,
350    ) -> PartitionMetrics {
351        let metrics = PartitionMetrics::new(
352            self.stream_ctx.input.mapper.metadata().region_id,
353            partition,
354            get_scanner_type(self.compaction),
355            self.stream_ctx.query_start,
356            metrics_set,
357        );
358
359        if !self.compaction {
360            self.metrics_list.set(partition, metrics.clone());
361        }
362
363        metrics
364    }
365}
366
367impl RegionScanner for SeqScan {
368    fn properties(&self) -> &ScannerProperties {
369        &self.properties
370    }
371
372    fn schema(&self) -> SchemaRef {
373        self.stream_ctx.input.mapper.output_schema()
374    }
375
376    fn metadata(&self) -> RegionMetadataRef {
377        self.stream_ctx.input.mapper.metadata().clone()
378    }
379
380    fn scan_partition(
381        &self,
382        metrics_set: &ExecutionPlanMetricsSet,
383        partition: usize,
384    ) -> Result<SendableRecordBatchStream, BoxedError> {
385        self.scan_partition_impl(metrics_set, partition)
386            .map_err(BoxedError::new)
387    }
388
389    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
390        self.properties.prepare(request);
391        Ok(())
392    }
393
394    fn has_predicate(&self) -> bool {
395        let predicate = self.stream_ctx.input.predicate();
396        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
397    }
398
399    fn set_logical_region(&mut self, logical_region: bool) {
400        self.properties.set_logical_region(logical_region);
401    }
402}
403
404impl DisplayAs for SeqScan {
405    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
406        write!(
407            f,
408            "SeqScan: region={}, ",
409            self.stream_ctx.input.mapper.metadata().region_id
410        )?;
411        match t {
412            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
413            DisplayFormatType::Verbose => {
414                self.stream_ctx.format_for_explain(true, f)?;
415                self.metrics_list.format_verbose_metrics(f)
416            }
417        }
418    }
419}
420
421impl fmt::Debug for SeqScan {
422    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
423        f.debug_struct("SeqScan")
424            .field("num_ranges", &self.stream_ctx.ranges.len())
425            .finish()
426    }
427}
428
429/// Builds sources for the partition range and push them to the `sources` vector.
430pub(crate) async fn build_sources(
431    stream_ctx: &Arc<StreamContext>,
432    part_range: &PartitionRange,
433    compaction: bool,
434    part_metrics: &PartitionMetrics,
435    range_builder_list: Arc<RangeBuilderList>,
436    sources: &mut Vec<Source>,
437) -> Result<()> {
438    // Gets range meta.
439    let range_meta = &stream_ctx.ranges[part_range.identifier];
440    #[cfg(debug_assertions)]
441    if compaction {
442        // Compaction expects input sources are not been split.
443        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
444        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
445            // It should scan all row groups.
446            debug_assert_eq!(
447                -1, row_group_idx.row_group_index,
448                "Expect {} range scan all row groups, given: {}",
449                i, row_group_idx.row_group_index,
450            );
451        }
452    }
453
454    sources.reserve(range_meta.row_group_indices.len());
455    for index in &range_meta.row_group_indices {
456        let stream = if stream_ctx.is_mem_range_index(*index) {
457            let stream = scan_mem_ranges(
458                stream_ctx.clone(),
459                part_metrics.clone(),
460                *index,
461                range_meta.time_range,
462            );
463            Box::pin(stream) as _
464        } else if stream_ctx.is_file_range_index(*index) {
465            let read_type = if compaction {
466                "compaction"
467            } else {
468                "seq_scan_files"
469            };
470            let stream = scan_file_ranges(
471                stream_ctx.clone(),
472                part_metrics.clone(),
473                *index,
474                read_type,
475                range_builder_list.clone(),
476            )
477            .await?;
478            Box::pin(stream) as _
479        } else {
480            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
481        };
482        sources.push(Source::Stream(stream));
483    }
484    Ok(())
485}
486
487#[cfg(test)]
488impl SeqScan {
489    /// Returns the input.
490    pub(crate) fn input(&self) -> &ScanInput {
491        &self.stream_ctx.input
492    }
493}
494
495/// Returns the scanner type.
496fn get_scanner_type(compaction: bool) -> &'static str {
497    if compaction {
498        "SeqScan(compaction)"
499    } else {
500        "SeqScan"
501    }
502}