mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
26use common_telemetry::tracing;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::schema::SchemaRef;
30use snafu::ResultExt;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::TimeSeriesRowSelector;
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
46use crate::region::options::MergeMode;
47
48/// Scans a region and returns rows in a sorted sequence.
49///
50/// The output order is always `order by primary keys, time index` inside every
51/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
52pub struct SeqScan {
53    /// Properties of the scanner.
54    properties: ScannerProperties,
55    /// Context of streams.
56    stream_ctx: Arc<StreamContext>,
57    /// The scanner is used for compaction.
58    compaction: bool,
59    /// Metrics for each partition.
60    /// The scanner only sets in query and keeps it empty during compaction.
61    metrics_list: PartitionMetricsList,
62}
63
64impl SeqScan {
65    /// Creates a new [SeqScan] with the given input and compaction flag.
66    /// If `compaction` is true, the scanner will not attempt to split ranges.
67    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
68        let mut properties = ScannerProperties::default()
69            .with_append_mode(input.append_mode)
70            .with_total_rows(input.total_rows());
71        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
72        properties.partitions = vec![stream_ctx.partition_ranges()];
73
74        Self {
75            properties,
76            stream_ctx,
77            compaction,
78            metrics_list: PartitionMetricsList::default(),
79        }
80    }
81
82    /// Builds a stream for the query.
83    ///
84    /// The returned stream is not partitioned and will contains all the data. If want
85    /// partitioned scan, use [`RegionScanner::scan_partition`].
86    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87        let metrics_set = ExecutionPlanMetricsSet::new();
88        let streams = (0..self.properties.partitions.len())
89            .map(|partition: usize| self.scan_partition(&metrics_set, partition))
90            .collect::<Result<Vec<_>, _>>()?;
91
92        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
93        Ok(Box::pin(aggr_stream))
94    }
95
96    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
97    ///
98    /// # Panics
99    /// Panics if the compaction flag is not set.
100    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
101        assert!(self.compaction);
102
103        let metrics_set = ExecutionPlanMetricsSet::new();
104        let part_metrics = self.new_partition_metrics(&metrics_set, 0);
105        debug_assert_eq!(1, self.properties.partitions.len());
106        let partition_ranges = &self.properties.partitions[0];
107
108        let reader = Self::merge_all_ranges_for_compaction(
109            &self.stream_ctx,
110            partition_ranges,
111            &part_metrics,
112        )
113        .await?;
114        Ok(Box::new(reader))
115    }
116
117    /// Builds a merge reader that reads all ranges.
118    /// Callers MUST not split ranges before calling this method.
119    async fn merge_all_ranges_for_compaction(
120        stream_ctx: &Arc<StreamContext>,
121        partition_ranges: &[PartitionRange],
122        part_metrics: &PartitionMetrics,
123    ) -> Result<BoxedBatchReader> {
124        let mut sources = Vec::new();
125        let range_builder_list = Arc::new(RangeBuilderList::new(
126            stream_ctx.input.num_memtables(),
127            stream_ctx.input.num_files(),
128        ));
129        for part_range in partition_ranges {
130            build_sources(
131                stream_ctx,
132                part_range,
133                true,
134                part_metrics,
135                range_builder_list.clone(),
136                &mut sources,
137            );
138        }
139
140        common_telemetry::debug!(
141            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
142            stream_ctx.input.mapper.metadata().region_id,
143            partition_ranges.len(),
144            sources.len()
145        );
146        Self::build_reader_from_sources(stream_ctx, sources, None).await
147    }
148
149    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
150    /// if possible.
151    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
152    pub(crate) async fn build_reader_from_sources(
153        stream_ctx: &StreamContext,
154        mut sources: Vec<Source>,
155        semaphore: Option<Arc<Semaphore>>,
156    ) -> Result<BoxedBatchReader> {
157        if let Some(semaphore) = semaphore.as_ref() {
158            // Read sources in parallel.
159            if sources.len() > 1 {
160                sources = stream_ctx
161                    .input
162                    .create_parallel_sources(sources, semaphore.clone())?;
163            }
164        }
165
166        let mut builder = MergeReaderBuilder::from_sources(sources);
167        let reader = builder.build().await?;
168
169        let dedup = !stream_ctx.input.append_mode;
170        let reader = if dedup {
171            match stream_ctx.input.merge_mode {
172                MergeMode::LastRow => Box::new(DedupReader::new(
173                    reader,
174                    LastRow::new(stream_ctx.input.filter_deleted),
175                )) as _,
176                MergeMode::LastNonNull => Box::new(DedupReader::new(
177                    reader,
178                    LastNonNull::new(stream_ctx.input.filter_deleted),
179                )) as _,
180            }
181        } else {
182            Box::new(reader) as _
183        };
184
185        let reader = match &stream_ctx.input.series_row_selector {
186            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
187            None => reader,
188        };
189
190        Ok(reader)
191    }
192
193    /// Scans the given partition when the part list is set properly.
194    /// Otherwise the returned stream might not contains any data.
195    fn scan_partition_impl(
196        &self,
197        metrics_set: &ExecutionPlanMetricsSet,
198        partition: usize,
199    ) -> Result<SendableRecordBatchStream, BoxedError> {
200        if partition >= self.properties.partitions.len() {
201            return Err(BoxedError::new(
202                PartitionOutOfRangeSnafu {
203                    given: partition,
204                    all: self.properties.partitions.len(),
205                }
206                .build(),
207            ));
208        }
209        if self.properties.partitions[partition].is_empty() {
210            return Ok(Box::pin(RecordBatchStreamWrapper::new(
211                self.stream_ctx.input.mapper.output_schema(),
212                common_recordbatch::EmptyRecordBatchStream::new(
213                    self.stream_ctx.input.mapper.output_schema(),
214                ),
215            )));
216        }
217
218        let stream_ctx = self.stream_ctx.clone();
219        let semaphore = self.new_semaphore();
220        let partition_ranges = self.properties.partitions[partition].clone();
221        let compaction = self.compaction;
222        let distinguish_range = self.properties.distinguish_partition_range;
223        let part_metrics = self.new_partition_metrics(metrics_set, partition);
224
225        let stream = try_stream! {
226            part_metrics.on_first_poll();
227
228            let range_builder_list = Arc::new(RangeBuilderList::new(
229                stream_ctx.input.num_memtables(),
230                stream_ctx.input.num_files(),
231            ));
232            // Scans each part.
233            for part_range in partition_ranges {
234                let mut sources = Vec::new();
235                build_sources(
236                    &stream_ctx,
237                    &part_range,
238                    compaction,
239                    &part_metrics,
240                    range_builder_list.clone(),
241                    &mut sources,
242                );
243
244                let mut metrics = ScannerMetrics::default();
245                let mut fetch_start = Instant::now();
246                let mut reader =
247                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
248                        .await
249                        .map_err(BoxedError::new)
250                        .context(ExternalSnafu)?;
251                let cache = &stream_ctx.input.cache_strategy;
252                #[cfg(debug_assertions)]
253                let mut checker = crate::read::BatchChecker::default()
254                    .with_start(Some(part_range.start))
255                    .with_end(Some(part_range.end));
256
257                while let Some(batch) = reader
258                    .next_batch()
259                    .await
260                    .map_err(BoxedError::new)
261                    .context(ExternalSnafu)?
262                {
263                    metrics.scan_cost += fetch_start.elapsed();
264                    metrics.num_batches += 1;
265                    metrics.num_rows += batch.num_rows();
266
267                    debug_assert!(!batch.is_empty());
268                    if batch.is_empty() {
269                        continue;
270                    }
271
272                    #[cfg(debug_assertions)]
273                    checker.ensure_part_range_batch(
274                        "SeqScan",
275                        stream_ctx.input.mapper.metadata().region_id,
276                        partition,
277                        part_range,
278                        &batch,
279                    );
280
281                    let convert_start = Instant::now();
282                    let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
283                    metrics.convert_cost += convert_start.elapsed();
284                    let yield_start = Instant::now();
285                    yield record_batch;
286                    metrics.yield_cost += yield_start.elapsed();
287
288                    fetch_start = Instant::now();
289                }
290
291                // Yields an empty part to indicate this range is terminated.
292                // The query engine can use this to optimize some queries.
293                if distinguish_range {
294                    let yield_start = Instant::now();
295                    yield stream_ctx.input.mapper.empty_record_batch();
296                    metrics.yield_cost += yield_start.elapsed();
297                }
298
299                metrics.scan_cost += fetch_start.elapsed();
300                part_metrics.merge_metrics(&metrics);
301            }
302
303            part_metrics.on_finish();
304        };
305
306        let stream = Box::pin(RecordBatchStreamWrapper::new(
307            self.stream_ctx.input.mapper.output_schema(),
308            Box::pin(stream),
309        ));
310
311        Ok(stream)
312    }
313
314    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
315        if self.properties.target_partitions() > self.properties.num_partitions() {
316            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
317            // This semaphore is partition level.
318            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
319            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
320            // files in a part range.
321            Some(Arc::new(Semaphore::new(
322                self.properties.target_partitions() - self.properties.num_partitions() + 1,
323            )))
324        } else {
325            None
326        }
327    }
328
329    /// Creates a new partition metrics instance.
330    /// Sets the partition metrics for the given partition if it is not for compaction.
331    fn new_partition_metrics(
332        &self,
333        metrics_set: &ExecutionPlanMetricsSet,
334        partition: usize,
335    ) -> PartitionMetrics {
336        let metrics = PartitionMetrics::new(
337            self.stream_ctx.input.mapper.metadata().region_id,
338            partition,
339            get_scanner_type(self.compaction),
340            self.stream_ctx.query_start,
341            metrics_set,
342        );
343
344        if !self.compaction {
345            self.metrics_list.set(partition, metrics.clone());
346        }
347
348        metrics
349    }
350}
351
352impl RegionScanner for SeqScan {
353    fn properties(&self) -> &ScannerProperties {
354        &self.properties
355    }
356
357    fn schema(&self) -> SchemaRef {
358        self.stream_ctx.input.mapper.output_schema()
359    }
360
361    fn metadata(&self) -> RegionMetadataRef {
362        self.stream_ctx.input.mapper.metadata().clone()
363    }
364
365    fn scan_partition(
366        &self,
367        metrics_set: &ExecutionPlanMetricsSet,
368        partition: usize,
369    ) -> Result<SendableRecordBatchStream, BoxedError> {
370        self.scan_partition_impl(metrics_set, partition)
371    }
372
373    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
374        self.properties.prepare(request);
375        Ok(())
376    }
377
378    fn has_predicate(&self) -> bool {
379        let predicate = self.stream_ctx.input.predicate();
380        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
381    }
382
383    fn set_logical_region(&mut self, logical_region: bool) {
384        self.properties.set_logical_region(logical_region);
385    }
386}
387
388impl DisplayAs for SeqScan {
389    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
390        write!(
391            f,
392            "SeqScan: region={}, ",
393            self.stream_ctx.input.mapper.metadata().region_id
394        )?;
395        match t {
396            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
397            DisplayFormatType::Verbose => {
398                self.stream_ctx.format_for_explain(true, f)?;
399                self.metrics_list.format_verbose_metrics(f)
400            }
401        }
402    }
403}
404
405impl fmt::Debug for SeqScan {
406    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
407        f.debug_struct("SeqScan")
408            .field("num_ranges", &self.stream_ctx.ranges.len())
409            .finish()
410    }
411}
412
413/// Builds sources for the partition range and push them to the `sources` vector.
414pub(crate) fn build_sources(
415    stream_ctx: &Arc<StreamContext>,
416    part_range: &PartitionRange,
417    compaction: bool,
418    part_metrics: &PartitionMetrics,
419    range_builder_list: Arc<RangeBuilderList>,
420    sources: &mut Vec<Source>,
421) {
422    // Gets range meta.
423    let range_meta = &stream_ctx.ranges[part_range.identifier];
424    #[cfg(debug_assertions)]
425    if compaction {
426        // Compaction expects input sources are not been split.
427        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
428        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
429            // It should scan all row groups.
430            debug_assert_eq!(
431                -1, row_group_idx.row_group_index,
432                "Expect {} range scan all row groups, given: {}",
433                i, row_group_idx.row_group_index,
434            );
435        }
436    }
437
438    sources.reserve(range_meta.row_group_indices.len());
439    for index in &range_meta.row_group_indices {
440        let stream = if stream_ctx.is_mem_range_index(*index) {
441            let stream = scan_mem_ranges(
442                stream_ctx.clone(),
443                part_metrics.clone(),
444                *index,
445                range_meta.time_range,
446            );
447            Box::pin(stream) as _
448        } else {
449            let read_type = if compaction {
450                "compaction"
451            } else {
452                "seq_scan_files"
453            };
454            let stream = scan_file_ranges(
455                stream_ctx.clone(),
456                part_metrics.clone(),
457                *index,
458                read_type,
459                range_builder_list.clone(),
460            );
461            Box::pin(stream) as _
462        };
463        sources.push(Source::Stream(stream));
464    }
465}
466
467#[cfg(test)]
468impl SeqScan {
469    /// Returns the input.
470    pub(crate) fn input(&self) -> &ScanInput {
471        &self.stream_ctx.input
472    }
473}
474
475/// Returns the scanner type.
476fn get_scanner_type(compaction: bool) -> &'static str {
477    if compaction {
478        "SeqScan(compaction)"
479    } else {
480        "SeqScan"
481    }
482}