mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::schema::SchemaRef;
27use futures::{Stream, StreamExt};
28use snafu::ensure;
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_engine::{
31    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
32};
33
34use crate::error::{PartitionOutOfRangeSnafu, Result};
35use crate::read::range::RangeBuilderList;
36use crate::read::scan_region::{ScanInput, StreamContext};
37use crate::read::scan_util::{
38    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
39};
40use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
41use crate::read::{scan_util, Batch, ScannerMetrics};
42
43/// Scans a region without providing any output ordering guarantee.
44///
45/// Only an append only table should use this scanner.
46pub struct UnorderedScan {
47    /// Properties of the scanner.
48    properties: ScannerProperties,
49    /// Context of streams.
50    stream_ctx: Arc<StreamContext>,
51    /// Metrics for each partition.
52    metrics_list: PartitionMetricsList,
53}
54
55impl UnorderedScan {
56    /// Creates a new [UnorderedScan].
57    pub(crate) fn new(input: ScanInput) -> Self {
58        let mut properties = ScannerProperties::default()
59            .with_append_mode(input.append_mode)
60            .with_total_rows(input.total_rows());
61        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
62        properties.partitions = vec![stream_ctx.partition_ranges()];
63
64        Self {
65            properties,
66            stream_ctx,
67            metrics_list: PartitionMetricsList::default(),
68        }
69    }
70
71    /// Scans the region and returns a stream.
72    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
73        let metrics_set = ExecutionPlanMetricsSet::new();
74        let part_num = self.properties.num_partitions();
75        let streams = (0..part_num)
76            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
77            .collect::<Result<Vec<_>, BoxedError>>()?;
78        let stream = stream! {
79            for mut stream in streams {
80                while let Some(rb) = stream.next().await {
81                    yield rb;
82                }
83            }
84        };
85        let stream = Box::pin(RecordBatchStreamWrapper::new(
86            self.schema(),
87            Box::pin(stream),
88        ));
89        Ok(stream)
90    }
91
92    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
93    fn scan_partition_range(
94        stream_ctx: Arc<StreamContext>,
95        part_range_id: usize,
96        part_metrics: PartitionMetrics,
97        range_builder_list: Arc<RangeBuilderList>,
98    ) -> impl Stream<Item = Result<Batch>> {
99        try_stream! {
100            // Gets range meta.
101            let range_meta = &stream_ctx.ranges[part_range_id];
102            for index in &range_meta.row_group_indices {
103                if stream_ctx.is_mem_range_index(*index) {
104                    let stream = scan_mem_ranges(
105                        stream_ctx.clone(),
106                        part_metrics.clone(),
107                        *index,
108                        range_meta.time_range,
109                    );
110                    for await batch in stream {
111                        yield batch?;
112                    }
113                } else if stream_ctx.is_file_range_index(*index) {
114                    let stream = scan_file_ranges(
115                        stream_ctx.clone(),
116                        part_metrics.clone(),
117                        *index,
118                        "unordered_scan_files",
119                        range_builder_list.clone(),
120                    ).await?;
121                    for await batch in stream {
122                        yield batch?;
123                    }
124                } else {
125                    let stream = scan_util::maybe_scan_other_ranges(
126                        &stream_ctx,
127                        *index,
128                        &part_metrics,
129                    ).await?;
130                    for await batch in stream {
131                        yield batch?;
132                    }
133                }
134            }
135        }
136    }
137
138    /// Scan [`Batch`] in all partitions one by one.
139    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
140        let metrics_set = ExecutionPlanMetricsSet::new();
141
142        let streams = (0..self.properties.partitions.len())
143            .map(|partition| {
144                let metrics = self.partition_metrics(false, partition, &metrics_set);
145                self.scan_batch_in_partition(partition, metrics)
146            })
147            .collect::<Result<Vec<_>>>()?;
148
149        Ok(Box::pin(futures::stream::iter(streams).flatten()))
150    }
151
152    fn partition_metrics(
153        &self,
154        explain_verbose: bool,
155        partition: usize,
156        metrics_set: &ExecutionPlanMetricsSet,
157    ) -> PartitionMetrics {
158        let part_metrics = PartitionMetrics::new(
159            self.stream_ctx.input.mapper.metadata().region_id,
160            partition,
161            "UnorderedScan",
162            self.stream_ctx.query_start,
163            explain_verbose,
164            metrics_set,
165        );
166        self.metrics_list.set(partition, part_metrics.clone());
167        part_metrics
168    }
169
170    fn scan_partition_impl(
171        &self,
172        ctx: &QueryScanContext,
173        metrics_set: &ExecutionPlanMetricsSet,
174        partition: usize,
175    ) -> Result<SendableRecordBatchStream> {
176        if ctx.explain_verbose {
177            common_telemetry::info!(
178                "UnorderedScan partition {}, region_id: {}",
179                partition,
180                self.stream_ctx.input.region_metadata().region_id
181            );
182        }
183
184        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
185
186        let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
187
188        let input = &self.stream_ctx.input;
189        let record_batch_stream = ConvertBatchStream::new(
190            batch_stream,
191            input.mapper.clone(),
192            input.cache_strategy.clone(),
193            metrics,
194        );
195
196        Ok(Box::pin(RecordBatchStreamWrapper::new(
197            input.mapper.output_schema(),
198            Box::pin(record_batch_stream),
199        )))
200    }
201
202    fn scan_batch_in_partition(
203        &self,
204        partition: usize,
205        part_metrics: PartitionMetrics,
206    ) -> Result<ScanBatchStream> {
207        ensure!(
208            partition < self.properties.partitions.len(),
209            PartitionOutOfRangeSnafu {
210                given: partition,
211                all: self.properties.partitions.len(),
212            }
213        );
214
215        let stream_ctx = self.stream_ctx.clone();
216        let part_ranges = self.properties.partitions[partition].clone();
217        let distinguish_range = self.properties.distinguish_partition_range;
218
219        let stream = try_stream! {
220            part_metrics.on_first_poll();
221
222            let range_builder_list = Arc::new(RangeBuilderList::new(
223                stream_ctx.input.num_memtables(),
224                stream_ctx.input.num_files(),
225            ));
226            // Scans each part.
227            for part_range in part_ranges {
228                let mut metrics = ScannerMetrics::default();
229                let mut fetch_start = Instant::now();
230                #[cfg(debug_assertions)]
231                let mut checker = crate::read::BatchChecker::default()
232                    .with_start(Some(part_range.start))
233                    .with_end(Some(part_range.end));
234
235                let stream = Self::scan_partition_range(
236                    stream_ctx.clone(),
237                    part_range.identifier,
238                    part_metrics.clone(),
239                    range_builder_list.clone(),
240                );
241                for await batch in stream {
242                    let batch = batch?;
243                    metrics.scan_cost += fetch_start.elapsed();
244                    metrics.num_batches += 1;
245                    metrics.num_rows += batch.num_rows();
246
247                    debug_assert!(!batch.is_empty());
248                    if batch.is_empty() {
249                        continue;
250                    }
251
252                    #[cfg(debug_assertions)]
253                    checker.ensure_part_range_batch(
254                        "UnorderedScan",
255                        stream_ctx.input.mapper.metadata().region_id,
256                        partition,
257                        part_range,
258                        &batch,
259                    );
260
261                    let yield_start = Instant::now();
262                    yield ScanBatch::Normal(batch);
263                    metrics.yield_cost += yield_start.elapsed();
264
265                    fetch_start = Instant::now();
266                }
267
268                // Yields an empty part to indicate this range is terminated.
269                // The query engine can use this to optimize some queries.
270                if distinguish_range {
271                    let yield_start = Instant::now();
272                    yield ScanBatch::Normal(Batch::empty());
273                    metrics.yield_cost += yield_start.elapsed();
274                }
275
276                metrics.scan_cost += fetch_start.elapsed();
277                part_metrics.merge_metrics(&metrics);
278            }
279
280            part_metrics.on_finish();
281        };
282        Ok(Box::pin(stream))
283    }
284}
285
286impl RegionScanner for UnorderedScan {
287    fn properties(&self) -> &ScannerProperties {
288        &self.properties
289    }
290
291    fn schema(&self) -> SchemaRef {
292        self.stream_ctx.input.mapper.output_schema()
293    }
294
295    fn metadata(&self) -> RegionMetadataRef {
296        self.stream_ctx.input.mapper.metadata().clone()
297    }
298
299    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
300        self.properties.prepare(request);
301        Ok(())
302    }
303
304    fn scan_partition(
305        &self,
306        ctx: &QueryScanContext,
307        metrics_set: &ExecutionPlanMetricsSet,
308        partition: usize,
309    ) -> Result<SendableRecordBatchStream, BoxedError> {
310        self.scan_partition_impl(ctx, metrics_set, partition)
311            .map_err(BoxedError::new)
312    }
313
314    fn has_predicate(&self) -> bool {
315        let predicate = self.stream_ctx.input.predicate();
316        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
317    }
318
319    fn set_logical_region(&mut self, logical_region: bool) {
320        self.properties.set_logical_region(logical_region);
321    }
322}
323
324impl DisplayAs for UnorderedScan {
325    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
326        write!(
327            f,
328            "UnorderedScan: region={}, ",
329            self.stream_ctx.input.mapper.metadata().region_id
330        )?;
331        match t {
332            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
333            DisplayFormatType::Verbose => {
334                self.stream_ctx.format_for_explain(true, f)?;
335                self.metrics_list.format_verbose_metrics(f)
336            }
337        }
338    }
339}
340
341impl fmt::Debug for UnorderedScan {
342    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
343        f.debug_struct("UnorderedScan")
344            .field("num_ranges", &self.stream_ctx.ranges.len())
345            .finish()
346    }
347}
348
349#[cfg(test)]
350impl UnorderedScan {
351    /// Returns the input.
352    pub(crate) fn input(&self) -> &ScanInput {
353        &self.stream_ctx.input
354    }
355}