mito2/read/
seq_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Sequential scan.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::try_stream;
22use common_error::ext::BoxedError;
23use common_recordbatch::error::ExternalSnafu;
24use common_recordbatch::util::ChainedRecordBatchStream;
25use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
26use common_telemetry::tracing;
27use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
28use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
29use datatypes::schema::SchemaRef;
30use snafu::ResultExt;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{PartitionRange, PrepareRequest, RegionScanner, ScannerProperties};
33use store_api::storage::{TimeSeriesDistribution, TimeSeriesRowSelector};
34use tokio::sync::Semaphore;
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
38use crate::read::last_row::LastRowReader;
39use crate::read::merge::MergeReaderBuilder;
40use crate::read::range::RangeBuilderList;
41use crate::read::scan_region::{ScanInput, StreamContext};
42use crate::read::scan_util::{
43    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
44};
45use crate::read::{BatchReader, BoxedBatchReader, ScannerMetrics, Source};
46use crate::region::options::MergeMode;
47
48/// Scans a region and returns rows in a sorted sequence.
49///
50/// The output order is always `order by primary keys, time index` inside every
51/// [`PartitionRange`]. Each "partition" may contains many [`PartitionRange`]s.
52pub struct SeqScan {
53    /// Properties of the scanner.
54    properties: ScannerProperties,
55    /// Context of streams.
56    stream_ctx: Arc<StreamContext>,
57    /// The scanner is used for compaction.
58    compaction: bool,
59    /// Metrics for each partition.
60    /// The scanner only sets in query and keeps it empty during compaction.
61    metrics_list: PartitionMetricsList,
62}
63
64impl SeqScan {
65    /// Creates a new [SeqScan] with the given input and compaction flag.
66    /// If `compaction` is true, the scanner will not attempt to split ranges.
67    pub(crate) fn new(input: ScanInput, compaction: bool) -> Self {
68        let mut properties = ScannerProperties::default()
69            .with_append_mode(input.append_mode)
70            .with_total_rows(input.total_rows());
71        let stream_ctx = Arc::new(StreamContext::seq_scan_ctx(input, compaction));
72        properties.partitions = vec![stream_ctx.partition_ranges()];
73
74        Self {
75            properties,
76            stream_ctx,
77            compaction,
78            metrics_list: PartitionMetricsList::default(),
79        }
80    }
81
82    /// Builds a stream for the query.
83    ///
84    /// The returned stream is not partitioned and will contains all the data. If want
85    /// partitioned scan, use [`RegionScanner::scan_partition`].
86    pub fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87        let metrics_set = ExecutionPlanMetricsSet::new();
88        let streams = (0..self.properties.partitions.len())
89            .map(|partition: usize| self.scan_partition(&metrics_set, partition))
90            .collect::<Result<Vec<_>, _>>()?;
91
92        let aggr_stream = ChainedRecordBatchStream::new(streams).map_err(BoxedError::new)?;
93        Ok(Box::pin(aggr_stream))
94    }
95
96    /// Builds a [BoxedBatchReader] from sequential scan for compaction.
97    ///
98    /// # Panics
99    /// Panics if the compaction flag is not set.
100    pub async fn build_reader_for_compaction(&self) -> Result<BoxedBatchReader> {
101        assert!(self.compaction);
102
103        let metrics_set = ExecutionPlanMetricsSet::new();
104        let part_metrics = self.new_partition_metrics(&metrics_set, 0);
105        debug_assert_eq!(1, self.properties.partitions.len());
106        let partition_ranges = &self.properties.partitions[0];
107
108        let reader = Self::merge_all_ranges_for_compaction(
109            &self.stream_ctx,
110            partition_ranges,
111            &part_metrics,
112        )
113        .await?;
114        Ok(Box::new(reader))
115    }
116
117    /// Builds a merge reader that reads all ranges.
118    /// Callers MUST not split ranges before calling this method.
119    async fn merge_all_ranges_for_compaction(
120        stream_ctx: &Arc<StreamContext>,
121        partition_ranges: &[PartitionRange],
122        part_metrics: &PartitionMetrics,
123    ) -> Result<BoxedBatchReader> {
124        let mut sources = Vec::new();
125        let range_builder_list = Arc::new(RangeBuilderList::new(
126            stream_ctx.input.num_memtables(),
127            stream_ctx.input.num_files(),
128        ));
129        for part_range in partition_ranges {
130            build_sources(
131                stream_ctx,
132                part_range,
133                true,
134                part_metrics,
135                range_builder_list.clone(),
136                &mut sources,
137            );
138        }
139
140        common_telemetry::debug!(
141            "Build reader to read all parts, region_id: {}, num_part_ranges: {}, num_sources: {}",
142            stream_ctx.input.mapper.metadata().region_id,
143            partition_ranges.len(),
144            sources.len()
145        );
146        Self::build_reader_from_sources(stream_ctx, sources, None).await
147    }
148
149    /// Builds a reader to read sources. If `semaphore` is provided, reads sources in parallel
150    /// if possible.
151    #[tracing::instrument(level = tracing::Level::DEBUG, skip_all)]
152    async fn build_reader_from_sources(
153        stream_ctx: &StreamContext,
154        mut sources: Vec<Source>,
155        semaphore: Option<Arc<Semaphore>>,
156    ) -> Result<BoxedBatchReader> {
157        if let Some(semaphore) = semaphore.as_ref() {
158            // Read sources in parallel.
159            if sources.len() > 1 {
160                sources = stream_ctx
161                    .input
162                    .create_parallel_sources(sources, semaphore.clone())?;
163            }
164        }
165
166        let mut builder = MergeReaderBuilder::from_sources(sources);
167        let reader = builder.build().await?;
168
169        let dedup = !stream_ctx.input.append_mode;
170        let reader = if dedup {
171            match stream_ctx.input.merge_mode {
172                MergeMode::LastRow => Box::new(DedupReader::new(
173                    reader,
174                    LastRow::new(stream_ctx.input.filter_deleted),
175                )) as _,
176                MergeMode::LastNonNull => Box::new(DedupReader::new(
177                    reader,
178                    LastNonNull::new(stream_ctx.input.filter_deleted),
179                )) as _,
180            }
181        } else {
182            Box::new(reader) as _
183        };
184
185        let reader = match &stream_ctx.input.series_row_selector {
186            Some(TimeSeriesRowSelector::LastRow) => Box::new(LastRowReader::new(reader)) as _,
187            None => reader,
188        };
189
190        Ok(reader)
191    }
192
193    /// Scans the given partition when the part list is set properly.
194    /// Otherwise the returned stream might not contains any data.
195    fn scan_partition_impl(
196        &self,
197        metrics_set: &ExecutionPlanMetricsSet,
198        partition: usize,
199    ) -> Result<SendableRecordBatchStream, BoxedError> {
200        if partition >= self.properties.partitions.len() {
201            return Err(BoxedError::new(
202                PartitionOutOfRangeSnafu {
203                    given: partition,
204                    all: self.properties.partitions.len(),
205                }
206                .build(),
207            ));
208        }
209
210        if self.stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
211            return self.scan_partition_by_series(metrics_set, partition);
212        }
213
214        let stream_ctx = self.stream_ctx.clone();
215        let semaphore = self.new_semaphore();
216        let partition_ranges = self.properties.partitions[partition].clone();
217        let compaction = self.compaction;
218        let distinguish_range = self.properties.distinguish_partition_range;
219        let part_metrics = self.new_partition_metrics(metrics_set, partition);
220
221        let stream = try_stream! {
222            part_metrics.on_first_poll();
223
224            let range_builder_list = Arc::new(RangeBuilderList::new(
225                stream_ctx.input.num_memtables(),
226                stream_ctx.input.num_files(),
227            ));
228            // Scans each part.
229            for part_range in partition_ranges {
230                let mut sources = Vec::new();
231                build_sources(
232                    &stream_ctx,
233                    &part_range,
234                    compaction,
235                    &part_metrics,
236                    range_builder_list.clone(),
237                    &mut sources,
238                );
239
240                let mut reader =
241                    Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
242                        .await
243                        .map_err(BoxedError::new)
244                        .context(ExternalSnafu)?;
245                let cache = &stream_ctx.input.cache_strategy;
246                let mut metrics = ScannerMetrics::default();
247                let mut fetch_start = Instant::now();
248                #[cfg(debug_assertions)]
249                let mut checker = crate::read::BatchChecker::default()
250                    .with_start(Some(part_range.start))
251                    .with_end(Some(part_range.end));
252
253                while let Some(batch) = reader
254                    .next_batch()
255                    .await
256                    .map_err(BoxedError::new)
257                    .context(ExternalSnafu)?
258                {
259                    metrics.scan_cost += fetch_start.elapsed();
260                    metrics.num_batches += 1;
261                    metrics.num_rows += batch.num_rows();
262
263                    debug_assert!(!batch.is_empty());
264                    if batch.is_empty() {
265                        continue;
266                    }
267
268                    #[cfg(debug_assertions)]
269                    checker.ensure_part_range_batch(
270                        "SeqScan",
271                        stream_ctx.input.mapper.metadata().region_id,
272                        partition,
273                        part_range,
274                        &batch,
275                    );
276
277                    let convert_start = Instant::now();
278                    let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
279                    metrics.convert_cost += convert_start.elapsed();
280                    let yield_start = Instant::now();
281                    yield record_batch;
282                    metrics.yield_cost += yield_start.elapsed();
283
284                    fetch_start = Instant::now();
285                }
286
287                // Yields an empty part to indicate this range is terminated.
288                // The query engine can use this to optimize some queries.
289                if distinguish_range {
290                    let yield_start = Instant::now();
291                    yield stream_ctx.input.mapper.empty_record_batch();
292                    metrics.yield_cost += yield_start.elapsed();
293                }
294
295                metrics.scan_cost += fetch_start.elapsed();
296                part_metrics.merge_metrics(&metrics);
297            }
298
299            part_metrics.on_finish();
300        };
301
302        let stream = Box::pin(RecordBatchStreamWrapper::new(
303            self.stream_ctx.input.mapper.output_schema(),
304            Box::pin(stream),
305        ));
306
307        Ok(stream)
308    }
309
310    /// Scans all ranges in the given partition and merge by time series.
311    /// Otherwise the returned stream might not contains any data.
312    fn scan_partition_by_series(
313        &self,
314        metrics_set: &ExecutionPlanMetricsSet,
315        partition: usize,
316    ) -> Result<SendableRecordBatchStream, BoxedError> {
317        let stream_ctx = self.stream_ctx.clone();
318        let semaphore = self.new_semaphore();
319        let partition_ranges = self.properties.partitions[partition].clone();
320        let distinguish_range = self.properties.distinguish_partition_range;
321        let part_metrics = self.new_partition_metrics(metrics_set, partition);
322        debug_assert!(!self.compaction);
323
324        let stream = try_stream! {
325            part_metrics.on_first_poll();
326
327            let range_builder_list = Arc::new(RangeBuilderList::new(
328                stream_ctx.input.num_memtables(),
329                stream_ctx.input.num_files(),
330            ));
331            // Scans all parts.
332            let mut sources = Vec::with_capacity(partition_ranges.len());
333            for part_range in partition_ranges {
334                build_sources(
335                    &stream_ctx,
336                    &part_range,
337                    false,
338                    &part_metrics,
339                    range_builder_list.clone(),
340                    &mut sources,
341                );
342            }
343
344            // Builds a reader that merge sources from all parts.
345            let mut reader =
346                Self::build_reader_from_sources(&stream_ctx, sources, semaphore.clone())
347                    .await
348                    .map_err(BoxedError::new)
349                    .context(ExternalSnafu)?;
350            let cache = &stream_ctx.input.cache_strategy;
351            let mut metrics = ScannerMetrics::default();
352            let mut fetch_start = Instant::now();
353
354            while let Some(batch) = reader
355                .next_batch()
356                .await
357                .map_err(BoxedError::new)
358                .context(ExternalSnafu)?
359            {
360                metrics.scan_cost += fetch_start.elapsed();
361                metrics.num_batches += 1;
362                metrics.num_rows += batch.num_rows();
363
364                debug_assert!(!batch.is_empty());
365                if batch.is_empty() {
366                    continue;
367                }
368
369                let convert_start = Instant::now();
370                let record_batch = stream_ctx.input.mapper.convert(&batch, cache)?;
371                metrics.convert_cost += convert_start.elapsed();
372                let yield_start = Instant::now();
373                yield record_batch;
374                metrics.yield_cost += yield_start.elapsed();
375
376                fetch_start = Instant::now();
377            }
378
379            // Yields an empty part to indicate this range is terminated.
380            // The query engine can use this to optimize some queries.
381            if distinguish_range {
382                let yield_start = Instant::now();
383                yield stream_ctx.input.mapper.empty_record_batch();
384                metrics.yield_cost += yield_start.elapsed();
385            }
386
387            metrics.scan_cost += fetch_start.elapsed();
388            part_metrics.merge_metrics(&metrics);
389
390            part_metrics.on_finish();
391        };
392
393        let stream = Box::pin(RecordBatchStreamWrapper::new(
394            self.stream_ctx.input.mapper.output_schema(),
395            Box::pin(stream),
396        ));
397
398        Ok(stream)
399    }
400
401    fn new_semaphore(&self) -> Option<Arc<Semaphore>> {
402        if self.properties.target_partitions() > self.properties.num_partitions() {
403            // We can use additional tasks to read the data if we have more target partitions than actual partitions.
404            // This semaphore is partition level.
405            // We don't use a global semaphore to avoid a partition waiting for others. The final concurrency
406            // of tasks usually won't exceed the target partitions a lot as compaction can reduce the number of
407            // files in a part range.
408            Some(Arc::new(Semaphore::new(
409                self.properties.target_partitions() - self.properties.num_partitions() + 1,
410            )))
411        } else {
412            None
413        }
414    }
415
416    /// Creates a new partition metrics instance.
417    /// Sets the partition metrics for the given partition if it is not for compaction.
418    fn new_partition_metrics(
419        &self,
420        metrics_set: &ExecutionPlanMetricsSet,
421        partition: usize,
422    ) -> PartitionMetrics {
423        let metrics = PartitionMetrics::new(
424            self.stream_ctx.input.mapper.metadata().region_id,
425            partition,
426            get_scanner_type(self.compaction),
427            self.stream_ctx.query_start,
428            metrics_set,
429        );
430
431        if !self.compaction {
432            self.metrics_list.set(partition, metrics.clone());
433        }
434
435        metrics
436    }
437}
438
439impl RegionScanner for SeqScan {
440    fn properties(&self) -> &ScannerProperties {
441        &self.properties
442    }
443
444    fn schema(&self) -> SchemaRef {
445        self.stream_ctx.input.mapper.output_schema()
446    }
447
448    fn metadata(&self) -> RegionMetadataRef {
449        self.stream_ctx.input.mapper.metadata().clone()
450    }
451
452    fn scan_partition(
453        &self,
454        metrics_set: &ExecutionPlanMetricsSet,
455        partition: usize,
456    ) -> Result<SendableRecordBatchStream, BoxedError> {
457        self.scan_partition_impl(metrics_set, partition)
458    }
459
460    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
461        self.properties.prepare(request);
462        Ok(())
463    }
464
465    fn has_predicate(&self) -> bool {
466        let predicate = self.stream_ctx.input.predicate();
467        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
468    }
469
470    fn set_logical_region(&mut self, logical_region: bool) {
471        self.properties.set_logical_region(logical_region);
472    }
473}
474
475impl DisplayAs for SeqScan {
476    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
477        write!(
478            f,
479            "SeqScan: region={}, ",
480            self.stream_ctx.input.mapper.metadata().region_id
481        )?;
482        match t {
483            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
484            DisplayFormatType::Verbose => {
485                self.stream_ctx.format_for_explain(true, f)?;
486                self.metrics_list.format_verbose_metrics(f)
487            }
488        }
489    }
490}
491
492impl fmt::Debug for SeqScan {
493    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
494        f.debug_struct("SeqScan")
495            .field("num_ranges", &self.stream_ctx.ranges.len())
496            .finish()
497    }
498}
499
500/// Builds sources for the partition range and push them to the `sources` vector.
501fn build_sources(
502    stream_ctx: &Arc<StreamContext>,
503    part_range: &PartitionRange,
504    compaction: bool,
505    part_metrics: &PartitionMetrics,
506    range_builder_list: Arc<RangeBuilderList>,
507    sources: &mut Vec<Source>,
508) {
509    // Gets range meta.
510    let range_meta = &stream_ctx.ranges[part_range.identifier];
511    #[cfg(debug_assertions)]
512    if compaction || stream_ctx.input.distribution == Some(TimeSeriesDistribution::PerSeries) {
513        // Compaction or per series distribution expects input sources are not been split.
514        debug_assert_eq!(range_meta.indices.len(), range_meta.row_group_indices.len());
515        for (i, row_group_idx) in range_meta.row_group_indices.iter().enumerate() {
516            // It should scan all row groups.
517            debug_assert_eq!(
518                -1, row_group_idx.row_group_index,
519                "Expect {} range scan all row groups, given: {}",
520                i, row_group_idx.row_group_index,
521            );
522        }
523    }
524
525    sources.reserve(range_meta.row_group_indices.len());
526    for index in &range_meta.row_group_indices {
527        let stream = if stream_ctx.is_mem_range_index(*index) {
528            let stream = scan_mem_ranges(
529                stream_ctx.clone(),
530                part_metrics.clone(),
531                *index,
532                range_meta.time_range,
533            );
534            Box::pin(stream) as _
535        } else {
536            let read_type = if compaction {
537                "compaction"
538            } else {
539                "seq_scan_files"
540            };
541            let stream = scan_file_ranges(
542                stream_ctx.clone(),
543                part_metrics.clone(),
544                *index,
545                read_type,
546                range_builder_list.clone(),
547            );
548            Box::pin(stream) as _
549        };
550        sources.push(Source::Stream(stream));
551    }
552}
553
554#[cfg(test)]
555impl SeqScan {
556    /// Returns the input.
557    pub(crate) fn input(&self) -> &ScanInput {
558        &self.stream_ctx.input
559    }
560}
561
562/// Returns the scanner type.
563fn get_scanner_type(compaction: bool) -> &'static str {
564    if compaction {
565        "SeqScan(compaction)"
566    } else {
567        "SeqScan"
568    }
569}