Skip to main content

mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::{tracing, warn};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40    PartitionMetrics, PartitionMetricsList, scan_flat_file_ranges, scan_flat_mem_ranges,
41};
42use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
43use crate::read::{ScannerMetrics, scan_util};
44
45/// Scans a region without providing any output ordering guarantee.
46///
47/// Only an append only table should use this scanner.
48pub struct UnorderedScan {
49    /// Properties of the scanner.
50    properties: ScannerProperties,
51    /// Context of streams.
52    stream_ctx: Arc<StreamContext>,
53    /// Shared pruner for file range building.
54    pruner: Arc<Pruner>,
55    /// Metrics for each partition.
56    metrics_list: PartitionMetricsList,
57}
58
59impl UnorderedScan {
60    /// Creates a new [UnorderedScan].
61    pub(crate) fn new(input: ScanInput) -> Self {
62        let mut properties = ScannerProperties::default()
63            .with_append_mode(input.append_mode)
64            .with_total_rows(input.total_rows());
65        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
66        properties.partitions = vec![stream_ctx.partition_ranges()];
67
68        // Create the shared pruner with number of workers equal to CPU cores.
69        let num_workers = common_stat::get_total_cpu_cores().max(1);
70        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
71
72        Self {
73            properties,
74            stream_ctx,
75            pruner,
76            metrics_list: PartitionMetricsList::default(),
77        }
78    }
79
80    /// Scans the region and returns a stream.
81    #[tracing::instrument(
82        skip_all,
83        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
84    )]
85    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
86        let metrics_set = ExecutionPlanMetricsSet::new();
87        let part_num = self.properties.num_partitions();
88        let streams = (0..part_num)
89            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
90            .collect::<Result<Vec<_>, BoxedError>>()?;
91        let stream = stream! {
92            for mut stream in streams {
93                while let Some(rb) = stream.next().await {
94                    yield rb;
95                }
96            }
97        };
98        let stream = Box::pin(RecordBatchStreamWrapper::new(
99            self.schema(),
100            Box::pin(stream),
101        ));
102        Ok(stream)
103    }
104
105    /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
106    #[tracing::instrument(
107        skip_all,
108        fields(
109            region_id = %stream_ctx.input.region_metadata().region_id,
110            part_range_id = part_range_id
111        )
112    )]
113    fn scan_flat_partition_range(
114        stream_ctx: Arc<StreamContext>,
115        part_range_id: usize,
116        part_metrics: PartitionMetrics,
117        partition_pruner: Arc<PartitionPruner>,
118    ) -> impl Stream<Item = Result<RecordBatch>> {
119        try_stream! {
120            // Gets range meta.
121            let range_meta = &stream_ctx.ranges[part_range_id];
122            let part_range = range_meta.new_partition_range(part_range_id);
123            let pre_filter_mode = stream_ctx.range_pre_filter_mode(&part_range);
124            for index in &range_meta.row_group_indices {
125                if stream_ctx.is_mem_range_index(*index) {
126                    let stream = scan_flat_mem_ranges(
127                        stream_ctx.clone(),
128                        part_metrics.clone(),
129                        *index,
130                        range_meta.time_range,
131                    );
132                    for await record_batch in stream {
133                        yield record_batch?;
134                    }
135                } else if stream_ctx.is_file_range_index(*index) {
136                    let stream = scan_flat_file_ranges(
137                        stream_ctx.clone(),
138                        part_metrics.clone(),
139                        *index,
140                        "unordered_scan_files",
141                        partition_pruner.clone(),
142                    ).await?;
143                    for await record_batch in stream {
144                        yield record_batch?;
145                    }
146                } else {
147                    let stream = scan_util::maybe_scan_flat_other_ranges(
148                        &stream_ctx,
149                        *index,
150                        &part_metrics,
151                        pre_filter_mode,
152                    ).await?;
153                    for await record_batch in stream {
154                        yield record_batch?;
155                    }
156                }
157            }
158        }
159    }
160
161    /// Scan [`Batch`] in all partitions one by one.
162    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
163        let metrics_set = ExecutionPlanMetricsSet::new();
164
165        let streams = (0..self.properties.partitions.len())
166            .map(|partition| {
167                let metrics = self.partition_metrics(false, partition, &metrics_set);
168                self.scan_flat_batch_in_partition(partition, metrics)
169            })
170            .collect::<Result<Vec<_>>>()?;
171
172        Ok(Box::pin(futures::stream::iter(streams).flatten()))
173    }
174
175    fn partition_metrics(
176        &self,
177        explain_verbose: bool,
178        partition: usize,
179        metrics_set: &ExecutionPlanMetricsSet,
180    ) -> PartitionMetrics {
181        let part_metrics = PartitionMetrics::new(
182            self.stream_ctx.input.mapper.metadata().region_id,
183            partition,
184            "UnorderedScan",
185            self.stream_ctx.query_start,
186            explain_verbose,
187            metrics_set,
188        );
189        self.metrics_list.set(partition, part_metrics.clone());
190        part_metrics
191    }
192
193    #[tracing::instrument(
194        skip_all,
195        fields(
196            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
197            partition = partition
198        )
199    )]
200    fn scan_partition_impl(
201        &self,
202        ctx: &QueryScanContext,
203        metrics_set: &ExecutionPlanMetricsSet,
204        partition: usize,
205    ) -> Result<SendableRecordBatchStream> {
206        if ctx.explain_verbose {
207            common_telemetry::info!(
208                "UnorderedScan partition {}, region_id: {}",
209                partition,
210                self.stream_ctx.input.region_metadata().region_id
211            );
212        }
213
214        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
215        let input = &self.stream_ctx.input;
216
217        let batch_stream = self.scan_flat_batch_in_partition(partition, metrics.clone())?;
218
219        let record_batch_stream = ConvertBatchStream::new(
220            batch_stream,
221            input.mapper.clone(),
222            input.cache_strategy.clone(),
223            metrics,
224        );
225
226        Ok(Box::pin(RecordBatchStreamWrapper::new(
227            input.mapper.output_schema(),
228            Box::pin(record_batch_stream),
229        )))
230    }
231
232    #[tracing::instrument(
233        skip_all,
234        fields(
235            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
236            partition = partition
237        )
238    )]
239    fn scan_flat_batch_in_partition(
240        &self,
241        partition: usize,
242        part_metrics: PartitionMetrics,
243    ) -> Result<ScanBatchStream> {
244        ensure!(
245            partition < self.properties.partitions.len(),
246            PartitionOutOfRangeSnafu {
247                given: partition,
248                all: self.properties.partitions.len(),
249            }
250        );
251
252        let stream_ctx = self.stream_ctx.clone();
253        let part_ranges = self.properties.partitions[partition].clone();
254        let pruner = self.pruner.clone();
255        // Initializes ref counts for the pruner.
256        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
257        // then the ref count won't be decremented.
258        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
259        pruner.add_partition_ranges(&part_ranges);
260        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
261
262        let stream = try_stream! {
263            part_metrics.on_first_poll();
264
265            // Scans each part.
266            for part_range in part_ranges {
267                let mut metrics = ScannerMetrics::default();
268                let mut fetch_start = Instant::now();
269
270                let stream = Self::scan_flat_partition_range(
271                    stream_ctx.clone(),
272                    part_range.identifier,
273                    part_metrics.clone(),
274                    partition_pruner.clone(),
275                );
276                for await record_batch in stream {
277                    let record_batch = record_batch?;
278                    metrics.scan_cost += fetch_start.elapsed();
279                    metrics.num_batches += 1;
280                    metrics.num_rows += record_batch.num_rows();
281
282                    debug_assert!(record_batch.num_rows() > 0);
283                    if record_batch.num_rows() == 0 {
284                        continue;
285                    }
286
287                    let yield_start = Instant::now();
288                    yield ScanBatch::RecordBatch(record_batch);
289                    metrics.yield_cost += yield_start.elapsed();
290
291                    fetch_start = Instant::now();
292                }
293
294                metrics.scan_cost += fetch_start.elapsed();
295                part_metrics.merge_metrics(&metrics);
296            }
297
298            part_metrics.on_finish();
299        };
300        Ok(Box::pin(stream))
301    }
302}
303
304impl RegionScanner for UnorderedScan {
305    fn name(&self) -> &str {
306        "UnorderedScan"
307    }
308
309    fn properties(&self) -> &ScannerProperties {
310        &self.properties
311    }
312
313    fn schema(&self) -> SchemaRef {
314        self.stream_ctx.input.mapper.output_schema()
315    }
316
317    fn metadata(&self) -> RegionMetadataRef {
318        self.stream_ctx.input.mapper.metadata().clone()
319    }
320
321    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
322        self.properties.prepare(request);
323
324        Ok(())
325    }
326
327    fn scan_partition(
328        &self,
329        ctx: &QueryScanContext,
330        metrics_set: &ExecutionPlanMetricsSet,
331        partition: usize,
332    ) -> Result<SendableRecordBatchStream, BoxedError> {
333        self.scan_partition_impl(ctx, metrics_set, partition)
334            .map_err(BoxedError::new)
335    }
336
337    /// If this scanner have predicate other than region partition exprs
338    fn has_predicate_without_region(&self) -> bool {
339        let predicate = self
340            .stream_ctx
341            .input
342            .predicate_group()
343            .predicate_without_region();
344        predicate.is_some()
345    }
346
347    fn add_dyn_filter_to_predicate(
348        &mut self,
349        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
350    ) -> Vec<bool> {
351        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
352    }
353
354    fn set_logical_region(&mut self, logical_region: bool) {
355        self.properties.set_logical_region(logical_region);
356    }
357
358    fn snapshot_sequence(&self) -> Option<u64> {
359        self.stream_ctx.input.snapshot_sequence
360    }
361}
362
363impl DisplayAs for UnorderedScan {
364    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
365        write!(
366            f,
367            "UnorderedScan: region={}, ",
368            self.stream_ctx.input.mapper.metadata().region_id
369        )?;
370        match t {
371            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
372                self.stream_ctx.format_for_explain(false, f)
373            }
374            DisplayFormatType::Verbose => {
375                self.stream_ctx.format_for_explain(true, f)?;
376                self.metrics_list.format_verbose_metrics(f)
377            }
378        }
379    }
380}
381
382impl fmt::Debug for UnorderedScan {
383    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
384        f.debug_struct("UnorderedScan")
385            .field("num_ranges", &self.stream_ctx.ranges.len())
386            .finish()
387    }
388}
389
390#[cfg(test)]
391impl UnorderedScan {
392    /// Returns the input.
393    pub(crate) fn input(&self) -> &ScanInput {
394        &self.stream_ctx.input
395    }
396}