mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::util::ChainedRecordBatchStream;
24use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
25use common_telemetry::tracing;
26use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
27use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
28use datatypes::schema::SchemaRef;
29use futures::StreamExt;
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PartitionRange, PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35use store_api::storage::TimeSeriesRowSelector;
36use tokio::sync::Semaphore;
37
38use crate::error::{PartitionOutOfRangeSnafu, Result};
39use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
40use crate::read::last_row::LastRowReader;
41use crate::read::merge::MergeReaderBuilder;
42use crate::read::range::RangeBuilderList;
43use crate::read::scan_region::{ScanInput, StreamContext};
44use crate::read::scan_util::{
45    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
46};
47use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
48use crate::read::{scan_util, Batch, BatchReader, BoxedBatchReader, ScannerMetrics, Source};
49use crate::region::options::MergeMode;
50
51/// Scans a region and returns rows in a sorted sequence.
52///
53/// The output order is always `order by primary keys, time index` inside every
54/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
55pub struct SeqScan {
56    /// Properties of the scanner.
57    properties: ScannerProperties,
58    /// Context of streams.
59    stream_ctx: Arc<StreamContext>,
60    /// The scanner is used for compaction.
61    compaction: bool,
62    /// Metrics for each partition.
63    /// The scanner only sets in query and keeps it empty during compaction.
64    metrics_list: PartitionMetricsList,
65}
66
67impl SeqScan {
68    /// Creates a new [SeqScan] with the given input and compaction flag.
69    /// If `compaction` is true, the scanner will not attempt to split ranges.
70    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
71        let mut properties = ScannerProperties::default()
72            .with_append_mode(input.append_mode)
73            .with_total_rows(input.total_rows());
74        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
75        properties.partitions = vec![stream_ctx.partition_ranges()];
76
77        Self {
78            properties,
79            stream_ctx,
80            compaction,
81            metrics_list: PartitionMetricsList::default(),
82        }
83    }
84
85    /// Builds a stream for the query.
86    ///
87    /// The returned stream is not partitioned and will contains all the data. If want
88    /// partitioned scan, use [`RegionScanner::scan_partition`].
89    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
90        let metrics_set = ExecutionPlanMetricsSet::new();
91        let streams = (0..self.properties.partitions.len())
92            .map(|partition: usize| {
93                self.scan_partition(&QueryScanContext::default(), &metrics_set, partition)
94            })
95            .collect::<Result<Vec<_>, _>>()?;
96
97        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
98        Ok(Box::pin(aggr_stream))
99    }
100
101    /// Scan [`Batch`] in all partitions one by one.
102    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
103        let metrics_set = ExecutionPlanMetricsSet::new();
104
105        let streams = (0..self.properties.partitions.len())
106            .map(|partition| {
107                let metrics = self.new_partition_metrics(false, &metrics_set, partition);
108                self.scan_batch_in_partition(partition, metrics)
109            })
110            .collect::<Result<Vec<_>>>()?;
111
112        Ok(Box::pin(futures::stream::iter(streams).flatten()))
113    }
114
115    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
116    ///
117    /// # Panics
118    /// Panics if the compaction flag is not set.
119    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
120        assert!(self.compaction);
121
122        let metrics_set = ExecutionPlanMetricsSet::new();
123        let part_metrics = self.new_partition_metrics(false, &metrics_set, 0);
124        debug_assert_eq!(1, self.properties.partitions.len());
125        let partition_ranges = &self.properties.partitions[0];
126
127        let reader = Self::merge_all_ranges_for_compaction(
128            &self.stream_ctx,
129            partition_ranges,
130            &part_metrics,
131        )
132        .await?;
133        Ok(Box::new(reader))
134    }
135
136    /// Builds a merge reader that reads all ranges.
137    /// Callers MUST not split ranges before calling this method.
138    async fn merge_all_ranges_for_compaction(
139        stream_ctx: &Arc<StreamContext>,
140        partition_ranges: &[PartitionRange],
141        part_metrics: &PartitionMetrics,
142    ) -> Result<BoxedBatchReader> {
143        let mut sources = Vec::new();
144        let range_builder_list = Arc::new(RangeBuilderList::new(
145            stream_ctx.input.num_memtables(),
146            stream_ctx.input.num_files(),
147        ));
148        for part_range in partition_ranges {
149            build_sources(
150                stream_ctx,
151                part_range,
152                true,
153                part_metrics,
154                range_builder_list.clone(),
155                &mut sources,
156            )
157            .await?;
158        }
159
160        common_telemetry::debug!(
161            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
162            stream_ctx.input.mapper.metadata().region_id,
163            partition_ranges.len(),
164            sources.len()
165        );
166        Self::build_reader_from_sources(stream_ctx, sources, None).await
167    }
168
169    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
170    /// if possible.
171    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
172    pub(crate) async fn build_reader_from_sources(
173        stream_ctx: &StreamContext,
174        mut sources: Vec<Source>,
175        semaphore: Option<Arc<Semaphore>>,
176    ) -> Result<BoxedBatchReader> {
177        if let Some(semaphore) = semaphore.as_ref() {
178            // Read sources in parallel.
179            if sources.len() > 1 {
180                sources = stream_ctx
181                    .input
182                    .create_parallel_sources(sources, semaphore.clone())?;
183            }
184        }
185
186        let mut builder = MergeReaderBuilder::from_sources(sources);
187        let reader = builder.build().await?;
188
189        let dedup = !stream_ctx.input.append_mode;
190        let reader = if dedup {
191            match stream_ctx.input.merge_mode {
192                MergeMode::LastRow => Box::new(DedupReader::new(
193                    reader,
194                    LastRow::new(stream_ctx.input.filter_deleted),
195                )) as _,
196                MergeMode::LastNonNull => Box::new(DedupReader::new(
197                    reader,
198                    LastNonNull::new(stream_ctx.input.filter_deleted),
199                )) as _,
200            }
201        } else {
202            Box::new(reader) as _
203        };
204
205        let reader = match &stream_ctx.input.series_row_selector {
206            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
207            None => reader,
208        };
209
210        Ok(reader)
211    }
212
213    /// Scans the given partition when the part list is set properly.
214    /// Otherwise the returned stream might not contains any data.
215    fn scan_partition_impl(
216        &self,
217        ctx: &QueryScanContext,
218        metrics_set: &ExecutionPlanMetricsSet,
219        partition: usize,
220    ) -> Result<SendableRecordBatchStream> {
221        if ctx.explain_verbose {
222            common_telemetry::info!(
223                "SeqScan partition {}, region_id: {}",
224                partition,
225                self.stream_ctx.input.region_metadata().region_id
226            );
227        }
228
229        let metrics = self.new_partition_metrics(ctx.explain_verbose, metrics_set, partition);
230
231        let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
232
233        let input = &self.stream_ctx.input;
234        let record_batch_stream = ConvertBatchStream::new(
235            batch_stream,
236            input.mapper.clone(),
237            input.cache_strategy.clone(),
238            metrics,
239        );
240
241        Ok(Box::pin(RecordBatchStreamWrapper::new(
242            input.mapper.output_schema(),
243            Box::pin(record_batch_stream),
244        )))
245    }
246
247    fn scan_batch_in_partition(
248        &self,
249        partition: usize,
250        part_metrics: PartitionMetrics,
251    ) -> Result<ScanBatchStream> {
252        ensure!(
253            partition < self.properties.partitions.len(),
254            PartitionOutOfRangeSnafu {
255                given: partition,
256                all: self.properties.partitions.len(),
257            }
258        );
259
260        if self.properties.partitions[partition].is_empty() {
261            return Ok(Box::pin(futures::stream::empty()));
262        }
263
264        let stream_ctx = self.stream_ctx.clone();
265        let semaphore = self.new_semaphore();
266        let partition_ranges = self.properties.partitions[partition].clone();
267        let compaction = self.compaction;
268        let distinguish_range = self.properties.distinguish_partition_range;
269
270        let stream = try_stream! {
271            part_metrics.on_first_poll();
272
273            let range_builder_list = Arc::new(RangeBuilderList::new(
274                stream_ctx.input.num_memtables(),
275                stream_ctx.input.num_files(),
276            ));
277            // Scans each part.
278            for part_range in partition_ranges {
279                let mut sources = Vec::new();
280                build_sources(
281                    &stream_ctx,
282                    &part_range,
283                    compaction,
284                    &part_metrics,
285                    range_builder_list.clone(),
286                    &mut sources,
287                ).await?;
288
289                let mut metrics = ScannerMetrics::default();
290                let mut fetch_start = Instant::now();
291                let mut reader =
292                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
293                        .await?;
294                #[cfg(debug_assertions)]
295                let mut checker = crate::read::BatchChecker::default()
296                    .with_start(Some(part_range.start))
297                    .with_end(Some(part_range.end));
298
299                while let Some(batch) = reader.next_batch().await? {
300                    metrics.scan_cost += fetch_start.elapsed();
301                    metrics.num_batches += 1;
302                    metrics.num_rows += batch.num_rows();
303
304                    debug_assert!(!batch.is_empty());
305                    if batch.is_empty() {
306                        continue;
307                    }
308
309                    #[cfg(debug_assertions)]
310                    checker.ensure_part_range_batch(
311                        "SeqScan",
312                        stream_ctx.input.mapper.metadata().region_id,
313                        partition,
314                        part_range,
315                        &batch,
316                    );
317
318                    let yield_start = Instant::now();
319                    yield ScanBatch::Normal(batch);
320                    metrics.yield_cost += yield_start.elapsed();
321
322                    fetch_start = Instant::now();
323                }
324
325                // Yields an empty part to indicate this range is terminated.
326                // The query engine can use this to optimize some queries.
327                if distinguish_range {
328                    let yield_start = Instant::now();
329                    yield ScanBatch::Normal(Batch::empty());
330                    metrics.yield_cost += yield_start.elapsed();
331                }
332
333                metrics.scan_cost += fetch_start.elapsed();
334                part_metrics.merge_metrics(&metrics);
335            }
336
337            part_metrics.on_finish();
338        };
339        Ok(Box::pin(stream))
340    }
341
342    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
343        if self.properties.target_partitions() > self.properties.num_partitions() {
344            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
345            // This semaphore is partition level.
346            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
347            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
348            // files in a part range.
349            Some(Arc::new(Semaphore::new(
350                self.properties.target_partitions() - self.properties.num_partitions() + 1,
351            )))
352        } else {
353            None
354        }
355    }
356
357    /// Creates a new partition metrics instance.
358    /// Sets the partition metrics for the given partition if it is not for compaction.
359    fn new_partition_metrics(
360        &self,
361        explain_verbose: bool,
362        metrics_set: &ExecutionPlanMetricsSet,
363        partition: usize,
364    ) -> PartitionMetrics {
365        let metrics = PartitionMetrics::new(
366            self.stream_ctx.input.mapper.metadata().region_id,
367            partition,
368            get_scanner_type(self.compaction),
369            self.stream_ctx.query_start,
370            explain_verbose,
371            metrics_set,
372        );
373
374        if !self.compaction {
375            self.metrics_list.set(partition, metrics.clone());
376        }
377
378        metrics
379    }
380}
381
382impl RegionScanner for SeqScan {
383    fn properties(&self) -> &ScannerProperties {
384        &self.properties
385    }
386
387    fn schema(&self) -> SchemaRef {
388        self.stream_ctx.input.mapper.output_schema()
389    }
390
391    fn metadata(&self) -> RegionMetadataRef {
392        self.stream_ctx.input.mapper.metadata().clone()
393    }
394
395    fn scan_partition(
396        &self,
397        ctx: &QueryScanContext,
398        metrics_set: &ExecutionPlanMetricsSet,
399        partition: usize,
400    ) -> Result<SendableRecordBatchStream, BoxedError> {
401        self.scan_partition_impl(ctx, metrics_set, partition)
402            .map_err(BoxedError::new)
403    }
404
405    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
406        self.properties.prepare(request);
407        Ok(())
408    }
409
410    fn has_predicate(&self) -> bool {
411        let predicate = self.stream_ctx.input.predicate();
412        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
413    }
414
415    fn set_logical_region(&mut self, logical_region: bool) {
416        self.properties.set_logical_region(logical_region);
417    }
418}
419
420impl DisplayAs for SeqScan {
421    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
422        write!(
423            f,
424            "SeqScan: region={}, ",
425            self.stream_ctx.input.mapper.metadata().region_id
426        )?;
427        match t {
428            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
429            DisplayFormatType::Verbose => {
430                self.stream_ctx.format_for_explain(true, f)?;
431                self.metrics_list.format_verbose_metrics(f)
432            }
433        }
434    }
435}
436
437impl fmt::Debug for SeqScan {
438    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
439        f.debug_struct("SeqScan")
440            .field("num_ranges", &self.stream_ctx.ranges.len())
441            .finish()
442    }
443}
444
445/// Builds sources for the partition range and push them to the `sources` vector.
446pub(crate) async fn build_sources(
447    stream_ctx: &Arc<StreamContext>,
448    part_range: &PartitionRange,
449    compaction: bool,
450    part_metrics: &PartitionMetrics,
451    range_builder_list: Arc<RangeBuilderList>,
452    sources: &mut Vec<Source>,
453) -> Result<()> {
454    // Gets range meta.
455    let range_meta = &stream_ctx.ranges[part_range.identifier];
456    #[cfg(debug_assertions)]
457    if compaction {
458        // Compaction expects input sources are not been split.
459        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
460        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
461            // It should scan all row groups.
462            debug_assert_eq!(
463                -1, row_group_idx.row_group_index,
464                "Expect {} range scan all row groups, given: {}",
465                i, row_group_idx.row_group_index,
466            );
467        }
468    }
469
470    sources.reserve(range_meta.row_group_indices.len());
471    for index in &range_meta.row_group_indices {
472        let stream = if stream_ctx.is_mem_range_index(*index) {
473            let stream = scan_mem_ranges(
474                stream_ctx.clone(),
475                part_metrics.clone(),
476                *index,
477                range_meta.time_range,
478            );
479            Box::pin(stream) as _
480        } else if stream_ctx.is_file_range_index(*index) {
481            let read_type = if compaction {
482                "compaction"
483            } else {
484                "seq_scan_files"
485            };
486            let stream = scan_file_ranges(
487                stream_ctx.clone(),
488                part_metrics.clone(),
489                *index,
490                read_type,
491                range_builder_list.clone(),
492            )
493            .await?;
494            Box::pin(stream) as _
495        } else {
496            scan_util::maybe_scan_other_ranges(stream_ctx, *index, part_metrics).await?
497        };
498        sources.push(Source::Stream(stream));
499    }
500    Ok(())
501}
502
503#[cfg(test)]
504impl SeqScan {
505    /// Returns the input.
506    pub(crate) fn input(&self) -> &ScanInput {
507        &self.stream_ctx.input
508    }
509}
510
511/// Returns the scanner type.
512fn get_scanner_type(compaction: bool) -> &'static str {
513    if compaction {
514        "SeqScan(compaction)"
515    } else {
516        "SeqScan"
517    }
518}