mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::schema::SchemaRef;
27use futures::{Stream, StreamExt};
28use snafu::ensure;
29use store_api::metadata::RegionMetadataRef;
30use store_api::region_engine::{PrepareRequest, RegionScanner, ScannerProperties};
31
32use crate::error::{PartitionOutOfRangeSnafu, Result};
33use crate::read::range::RangeBuilderList;
34use crate::read::scan_region::{ScanInput, StreamContext};
35use crate::read::scan_util::{
36    scan_file_ranges, scan_mem_ranges, PartitionMetrics, PartitionMetricsList,
37};
38use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
39use crate::read::{scan_util, Batch, ScannerMetrics};
40
41/// Scans a region without providing any output ordering guarantee.
42///
43/// Only an append only table should use this scanner.
44pub struct UnorderedScan {
45    /// Properties of the scanner.
46    properties: ScannerProperties,
47    /// Context of streams.
48    stream_ctx: Arc<StreamContext>,
49    /// Metrics for each partition.
50    metrics_list: PartitionMetricsList,
51}
52
53impl UnorderedScan {
54    /// Creates a new [UnorderedScan].
55    pub(crate) fn new(input: ScanInput) -> Self {
56        let mut properties = ScannerProperties::default()
57            .with_append_mode(input.append_mode)
58            .with_total_rows(input.total_rows());
59        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
60        properties.partitions = vec![stream_ctx.partition_ranges()];
61
62        Self {
63            properties,
64            stream_ctx,
65            metrics_list: PartitionMetricsList::default(),
66        }
67    }
68
69    /// Scans the region and returns a stream.
70    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
71        let metrics_set = ExecutionPlanMetricsSet::new();
72        let part_num = self.properties.num_partitions();
73        let streams = (0..part_num)
74            .map(|i| self.scan_partition(&metrics_set, i))
75            .collect::<Result<Vec<_>, BoxedError>>()?;
76        let stream = stream! {
77            for mut stream in streams {
78                while let Some(rb) = stream.next().await {
79                    yield rb;
80                }
81            }
82        };
83        let stream = Box::pin(RecordBatchStreamWrapper::new(
84            self.schema(),
85            Box::pin(stream),
86        ));
87        Ok(stream)
88    }
89
90    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
91    fn scan_partition_range(
92        stream_ctx: Arc<StreamContext>,
93        part_range_id: usize,
94        part_metrics: PartitionMetrics,
95        range_builder_list: Arc<RangeBuilderList>,
96    ) -> impl Stream<Item = Result<Batch>> {
97        try_stream! {
98            // Gets range meta.
99            let range_meta = &stream_ctx.ranges[part_range_id];
100            for index in &range_meta.row_group_indices {
101                if stream_ctx.is_mem_range_index(*index) {
102                    let stream = scan_mem_ranges(
103                        stream_ctx.clone(),
104                        part_metrics.clone(),
105                        *index,
106                        range_meta.time_range,
107                    );
108                    for await batch in stream {
109                        yield batch?;
110                    }
111                } else if stream_ctx.is_file_range_index(*index) {
112                    let stream = scan_file_ranges(
113                        stream_ctx.clone(),
114                        part_metrics.clone(),
115                        *index,
116                        "unordered_scan_files",
117                        range_builder_list.clone(),
118                    ).await?;
119                    for await batch in stream {
120                        yield batch?;
121                    }
122                } else {
123                    let stream = scan_util::maybe_scan_other_ranges(
124                        &stream_ctx,
125                        *index,
126                        &part_metrics,
127                    ).await?;
128                    for await batch in stream {
129                        yield batch?;
130                    }
131                }
132            }
133        }
134    }
135
136    /// Scan [`Batch`] in all partitions one by one.
137    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
138        let metrics_set = ExecutionPlanMetricsSet::new();
139
140        let streams = (0..self.properties.partitions.len())
141            .map(|partition| {
142                let metrics = self.partition_metrics(partition, &metrics_set);
143                self.scan_batch_in_partition(partition, metrics)
144            })
145            .collect::<Result<Vec<_>>>()?;
146
147        Ok(Box::pin(futures::stream::iter(streams).flatten()))
148    }
149
150    fn partition_metrics(
151        &self,
152        partition: usize,
153        metrics_set: &ExecutionPlanMetricsSet,
154    ) -> PartitionMetrics {
155        let part_metrics = PartitionMetrics::new(
156            self.stream_ctx.input.mapper.metadata().region_id,
157            partition,
158            "UnorderedScan",
159            self.stream_ctx.query_start,
160            metrics_set,
161        );
162        self.metrics_list.set(partition, part_metrics.clone());
163        part_metrics
164    }
165
166    fn scan_partition_impl(
167        &self,
168        metrics_set: &ExecutionPlanMetricsSet,
169        partition: usize,
170    ) -> Result<SendableRecordBatchStream> {
171        let metrics = self.partition_metrics(partition, metrics_set);
172
173        let batch_stream = self.scan_batch_in_partition(partition, metrics.clone())?;
174
175        let input = &self.stream_ctx.input;
176        let record_batch_stream = ConvertBatchStream::new(
177            batch_stream,
178            input.mapper.clone(),
179            input.cache_strategy.clone(),
180            metrics,
181        );
182
183        Ok(Box::pin(RecordBatchStreamWrapper::new(
184            input.mapper.output_schema(),
185            Box::pin(record_batch_stream),
186        )))
187    }
188
189    fn scan_batch_in_partition(
190        &self,
191        partition: usize,
192        part_metrics: PartitionMetrics,
193    ) -> Result<ScanBatchStream> {
194        ensure!(
195            partition < self.properties.partitions.len(),
196            PartitionOutOfRangeSnafu {
197                given: partition,
198                all: self.properties.partitions.len(),
199            }
200        );
201
202        let stream_ctx = self.stream_ctx.clone();
203        let part_ranges = self.properties.partitions[partition].clone();
204        let distinguish_range = self.properties.distinguish_partition_range;
205
206        let stream = try_stream! {
207            part_metrics.on_first_poll();
208
209            let range_builder_list = Arc::new(RangeBuilderList::new(
210                stream_ctx.input.num_memtables(),
211                stream_ctx.input.num_files(),
212            ));
213            // Scans each part.
214            for part_range in part_ranges {
215                let mut metrics = ScannerMetrics::default();
216                let mut fetch_start = Instant::now();
217                #[cfg(debug_assertions)]
218                let mut checker = crate::read::BatchChecker::default()
219                    .with_start(Some(part_range.start))
220                    .with_end(Some(part_range.end));
221
222                let stream = Self::scan_partition_range(
223                    stream_ctx.clone(),
224                    part_range.identifier,
225                    part_metrics.clone(),
226                    range_builder_list.clone(),
227                );
228                for await batch in stream {
229                    let batch = batch?;
230                    metrics.scan_cost += fetch_start.elapsed();
231                    metrics.num_batches += 1;
232                    metrics.num_rows += batch.num_rows();
233
234                    debug_assert!(!batch.is_empty());
235                    if batch.is_empty() {
236                        continue;
237                    }
238
239                    #[cfg(debug_assertions)]
240                    checker.ensure_part_range_batch(
241                        "UnorderedScan",
242                        stream_ctx.input.mapper.metadata().region_id,
243                        partition,
244                        part_range,
245                        &batch,
246                    );
247
248                    let yield_start = Instant::now();
249                    yield ScanBatch::Normal(batch);
250                    metrics.yield_cost += yield_start.elapsed();
251
252                    fetch_start = Instant::now();
253                }
254
255                // Yields an empty part to indicate this range is terminated.
256                // The query engine can use this to optimize some queries.
257                if distinguish_range {
258                    let yield_start = Instant::now();
259                    yield ScanBatch::Normal(Batch::empty());
260                    metrics.yield_cost += yield_start.elapsed();
261                }
262
263                metrics.scan_cost += fetch_start.elapsed();
264                part_metrics.merge_metrics(&metrics);
265            }
266        };
267        Ok(Box::pin(stream))
268    }
269}
270
271impl RegionScanner for UnorderedScan {
272    fn properties(&self) -> &ScannerProperties {
273        &self.properties
274    }
275
276    fn schema(&self) -> SchemaRef {
277        self.stream_ctx.input.mapper.output_schema()
278    }
279
280    fn metadata(&self) -> RegionMetadataRef {
281        self.stream_ctx.input.mapper.metadata().clone()
282    }
283
284    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
285        self.properties.prepare(request);
286        Ok(())
287    }
288
289    fn scan_partition(
290        &self,
291        metrics_set: &ExecutionPlanMetricsSet,
292        partition: usize,
293    ) -> Result<SendableRecordBatchStream, BoxedError> {
294        self.scan_partition_impl(metrics_set, partition)
295            .map_err(BoxedError::new)
296    }
297
298    fn has_predicate(&self) -> bool {
299        let predicate = self.stream_ctx.input.predicate();
300        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
301    }
302
303    fn set_logical_region(&mut self, logical_region: bool) {
304        self.properties.set_logical_region(logical_region);
305    }
306}
307
308impl DisplayAs for UnorderedScan {
309    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
310        write!(
311            f,
312            "UnorderedScan: region={}, ",
313            self.stream_ctx.input.mapper.metadata().region_id
314        )?;
315        match t {
316            DisplayFormatType::Default => self.stream_ctx.format_for_explain(false, f),
317            DisplayFormatType::Verbose => {
318                self.stream_ctx.format_for_explain(true, f)?;
319                self.metrics_list.format_verbose_metrics(f)
320            }
321        }
322    }
323}
324
325impl fmt::Debug for UnorderedScan {
326    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
327        f.debug_struct("UnorderedScan")
328            .field("num_ranges", &self.stream_ctx.ranges.len())
329            .finish()
330    }
331}
332
333#[cfg(test)]
334impl UnorderedScan {
335    /// Returns the input.
336    pub(crate) fn input(&self) -> &ScanInput {
337        &self.stream_ctx.input
338    }
339}