mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
25use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
26use datatypes::arrow::record_batch::RecordBatch;
27use datatypes::schema::SchemaRef;
28use futures::{Stream, StreamExt};
29use snafu::ensure;
30use store_api::metadata::RegionMetadataRef;
31use store_api::region_engine::{
32    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
33};
34
35use crate::error::{PartitionOutOfRangeSnafu, Result};
36use crate::read::range::RangeBuilderList;
37use crate::read::scan_region::{ScanInput, StreamContext};
38use crate::read::scan_util::{
39    scan_file_ranges, scan_flat_file_ranges, scan_flat_mem_ranges, scan_mem_ranges,
40    PartitionMetrics, PartitionMetricsList,
41};
42use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
43use crate::read::{scan_util, Batch, ScannerMetrics};
44
45/// Scans a region without providing any output ordering guarantee.
46///
47/// Only an append only table should use this scanner.
48pub struct UnorderedScan {
49    /// Properties of the scanner.
50    properties: ScannerProperties,
51    /// Context of streams.
52    stream_ctx: Arc<StreamContext>,
53    /// Metrics for each partition.
54    metrics_list: PartitionMetricsList,
55}
56
57impl UnorderedScan {
58    /// Creates a new [UnorderedScan].
59    pub(crate) fn new(input: ScanInput) -> Self {
60        let mut properties = ScannerProperties::default()
61            .with_append_mode(input.append_mode)
62            .with_total_rows(input.total_rows());
63        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
64        properties.partitions = vec![stream_ctx.partition_ranges()];
65
66        Self {
67            properties,
68            stream_ctx,
69            metrics_list: PartitionMetricsList::default(),
70        }
71    }
72
73    /// Scans the region and returns a stream.
74    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
75        let metrics_set = ExecutionPlanMetricsSet::new();
76        let part_num = self.properties.num_partitions();
77        let streams = (0..part_num)
78            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
79            .collect::<Result<Vec<_>, BoxedError>>()?;
80        let stream = stream! {
81            for mut stream in streams {
82                while let Some(rb) = stream.next().await {
83                    yield rb;
84                }
85            }
86        };
87        let stream = Box::pin(RecordBatchStreamWrapper::new(
88            self.schema(),
89            Box::pin(stream),
90        ));
91        Ok(stream)
92    }
93
94    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
95    fn scan_partition_range(
96        stream_ctx: Arc<StreamContext>,
97        part_range_id: usize,
98        part_metrics: PartitionMetrics,
99        range_builder_list: Arc<RangeBuilderList>,
100    ) -> impl Stream<Item = Result<Batch>> {
101        try_stream! {
102            // Gets range meta.
103            let range_meta = &stream_ctx.ranges[part_range_id];
104            for index in &range_meta.row_group_indices {
105                if stream_ctx.is_mem_range_index(*index) {
106                    let stream = scan_mem_ranges(
107                        stream_ctx.clone(),
108                        part_metrics.clone(),
109                        *index,
110                        range_meta.time_range,
111                    );
112                    for await batch in stream {
113                        yield batch?;
114                    }
115                } else if stream_ctx.is_file_range_index(*index) {
116                    let stream = scan_file_ranges(
117                        stream_ctx.clone(),
118                        part_metrics.clone(),
119                        *index,
120                        "unordered_scan_files",
121                        range_builder_list.clone(),
122                    ).await?;
123                    for await batch in stream {
124                        yield batch?;
125                    }
126                } else {
127                    let stream = scan_util::maybe_scan_other_ranges(
128                        &stream_ctx,
129                        *index,
130                        &part_metrics,
131                    ).await?;
132                    for await batch in stream {
133                        yield batch?;
134                    }
135                }
136            }
137        }
138    }
139
140    /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
141    fn scan_flat_partition_range(
142        stream_ctx: Arc<StreamContext>,
143        part_range_id: usize,
144        part_metrics: PartitionMetrics,
145        range_builder_list: Arc<RangeBuilderList>,
146    ) -> impl Stream<Item = Result<RecordBatch>> {
147        try_stream! {
148            // Gets range meta.
149            let range_meta = &stream_ctx.ranges[part_range_id];
150            for index in &range_meta.row_group_indices {
151                if stream_ctx.is_mem_range_index(*index) {
152                    let stream = scan_flat_mem_ranges(
153                        stream_ctx.clone(),
154                        part_metrics.clone(),
155                        *index,
156                    );
157                    for await record_batch in stream {
158                        yield record_batch?;
159                    }
160                } else if stream_ctx.is_file_range_index(*index) {
161                    let stream = scan_flat_file_ranges(
162                        stream_ctx.clone(),
163                        part_metrics.clone(),
164                        *index,
165                        "unordered_scan_files",
166                        range_builder_list.clone(),
167                    ).await?;
168                    for await record_batch in stream {
169                        yield record_batch?;
170                    }
171                } else {
172                    let stream = scan_util::maybe_scan_flat_other_ranges(
173                        &stream_ctx,
174                        *index,
175                        &part_metrics,
176                    ).await?;
177                    for await record_batch in stream {
178                        yield record_batch?;
179                    }
180                }
181            }
182        }
183    }
184
185    /// Scan [`Batch`] in all partitions one by one.
186    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
187        let metrics_set = ExecutionPlanMetricsSet::new();
188
189        let streams = (0..self.properties.partitions.len())
190            .map(|partition| {
191                let metrics = self.partition_metrics(false, partition, &metrics_set);
192                self.scan_batch_in_partition(partition, metrics)
193            })
194            .collect::<Result<Vec<_>>>()?;
195
196        Ok(Box::pin(futures::stream::iter(streams).flatten()))
197    }
198
199    fn partition_metrics(
200        &self,
201        explain_verbose: bool,
202        partition: usize,
203        metrics_set: &ExecutionPlanMetricsSet,
204    ) -> PartitionMetrics {
205        let part_metrics = PartitionMetrics::new(
206            self.stream_ctx.input.mapper.metadata().region_id,
207            partition,
208            "UnorderedScan",
209            self.stream_ctx.query_start,
210            explain_verbose,
211            metrics_set,
212        );
213        self.metrics_list.set(partition, part_metrics.clone());
214        part_metrics
215    }
216
217    fn scan_partition_impl(
218        &self,
219        ctx: &QueryScanContext,
220        metrics_set: &ExecutionPlanMetricsSet,
221        partition: usize,
222    ) -> Result<SendableRecordBatchStream> {
223        if ctx.explain_verbose {
224            common_telemetry::info!(
225                "UnorderedScan partition {}, region_id: {}",
226                partition,
227                self.stream_ctx.input.region_metadata().region_id
228            );
229        }
230
231        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
232        let input = &self.stream_ctx.input;
233
234        let batch_stream = if input.flat_format {
235            // Use flat scan for bulk memtables
236            self.scan_flat_batch_in_partition(partition, metrics.clone())?
237        } else {
238            // Use regular batch scan for normal memtables
239            self.scan_batch_in_partition(partition, metrics.clone())?
240        };
241
242        let record_batch_stream = ConvertBatchStream::new(
243            batch_stream,
244            input.mapper.clone(),
245            input.cache_strategy.clone(),
246            metrics,
247        );
248
249        Ok(Box::pin(RecordBatchStreamWrapper::new(
250            input.mapper.output_schema(),
251            Box::pin(record_batch_stream),
252        )))
253    }
254
255    fn scan_batch_in_partition(
256        &self,
257        partition: usize,
258        part_metrics: PartitionMetrics,
259    ) -> Result<ScanBatchStream> {
260        ensure!(
261            partition < self.properties.partitions.len(),
262            PartitionOutOfRangeSnafu {
263                given: partition,
264                all: self.properties.partitions.len(),
265            }
266        );
267
268        let stream_ctx = self.stream_ctx.clone();
269        let part_ranges = self.properties.partitions[partition].clone();
270        let distinguish_range = self.properties.distinguish_partition_range;
271
272        let stream = try_stream! {
273            part_metrics.on_first_poll();
274
275            let range_builder_list = Arc::new(RangeBuilderList::new(
276                stream_ctx.input.num_memtables(),
277                stream_ctx.input.num_files(),
278            ));
279            // Scans each part.
280            for part_range in part_ranges {
281                let mut metrics = ScannerMetrics::default();
282                let mut fetch_start = Instant::now();
283                let _mapper = &stream_ctx.input.mapper;
284                #[cfg(debug_assertions)]
285                let mut checker = crate::read::BatchChecker::default()
286                    .with_start(Some(part_range.start))
287                    .with_end(Some(part_range.end));
288
289                let stream = Self::scan_partition_range(
290                    stream_ctx.clone(),
291                    part_range.identifier,
292                    part_metrics.clone(),
293                    range_builder_list.clone(),
294                );
295                for await batch in stream {
296                    let batch = batch?;
297                    metrics.scan_cost += fetch_start.elapsed();
298                    metrics.num_batches += 1;
299                    metrics.num_rows += batch.num_rows();
300
301                    debug_assert!(!batch.is_empty());
302                    if batch.is_empty() {
303                        continue;
304                    }
305
306                    #[cfg(debug_assertions)]
307                    checker.ensure_part_range_batch(
308                        "UnorderedScan",
309                        _mapper.metadata().region_id,
310                        partition,
311                        part_range,
312                        &batch,
313                    );
314
315                    let yield_start = Instant::now();
316                    yield ScanBatch::Normal(batch);
317                    metrics.yield_cost += yield_start.elapsed();
318
319                    fetch_start = Instant::now();
320                }
321
322                // Yields an empty part to indicate this range is terminated.
323                // The query engine can use this to optimize some queries.
324                if distinguish_range {
325                    let yield_start = Instant::now();
326                    yield ScanBatch::Normal(Batch::empty());
327                    metrics.yield_cost += yield_start.elapsed();
328                }
329
330                metrics.scan_cost += fetch_start.elapsed();
331                part_metrics.merge_metrics(&metrics);
332            }
333
334            part_metrics.on_finish();
335        };
336        Ok(Box::pin(stream))
337    }
338
339    fn scan_flat_batch_in_partition(
340        &self,
341        partition: usize,
342        part_metrics: PartitionMetrics,
343    ) -> Result<ScanBatchStream> {
344        ensure!(
345            partition < self.properties.partitions.len(),
346            PartitionOutOfRangeSnafu {
347                given: partition,
348                all: self.properties.partitions.len(),
349            }
350        );
351
352        let stream_ctx = self.stream_ctx.clone();
353        let part_ranges = self.properties.partitions[partition].clone();
354
355        let stream = try_stream! {
356            part_metrics.on_first_poll();
357
358            let range_builder_list = Arc::new(RangeBuilderList::new(
359                stream_ctx.input.num_memtables(),
360                stream_ctx.input.num_files(),
361            ));
362            // Scans each part.
363            for part_range in part_ranges {
364                let mut metrics = ScannerMetrics::default();
365                let mut fetch_start = Instant::now();
366
367                let stream = Self::scan_flat_partition_range(
368                    stream_ctx.clone(),
369                    part_range.identifier,
370                    part_metrics.clone(),
371                    range_builder_list.clone(),
372                );
373                for await record_batch in stream {
374                    let record_batch = record_batch?;
375                    metrics.scan_cost += fetch_start.elapsed();
376                    metrics.num_batches += 1;
377                    metrics.num_rows += record_batch.num_rows();
378
379                    debug_assert!(record_batch.num_rows() > 0);
380                    if record_batch.num_rows() == 0 {
381                        continue;
382                    }
383
384                    let yield_start = Instant::now();
385                    yield ScanBatch::RecordBatch(record_batch);
386                    metrics.yield_cost += yield_start.elapsed();
387
388                    fetch_start = Instant::now();
389                }
390
391                metrics.scan_cost += fetch_start.elapsed();
392                part_metrics.merge_metrics(&metrics);
393            }
394
395            part_metrics.on_finish();
396        };
397        Ok(Box::pin(stream))
398    }
399}
400
401impl RegionScanner for UnorderedScan {
402    fn properties(&self) -> &ScannerProperties {
403        &self.properties
404    }
405
406    fn schema(&self) -> SchemaRef {
407        self.stream_ctx.input.mapper.output_schema()
408    }
409
410    fn metadata(&self) -> RegionMetadataRef {
411        self.stream_ctx.input.mapper.metadata().clone()
412    }
413
414    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
415        self.properties.prepare(request);
416        // UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
417        Ok(())
418    }
419
420    fn scan_partition(
421        &self,
422        ctx: &QueryScanContext,
423        metrics_set: &ExecutionPlanMetricsSet,
424        partition: usize,
425    ) -> Result<SendableRecordBatchStream, BoxedError> {
426        self.scan_partition_impl(ctx, metrics_set, partition)
427            .map_err(BoxedError::new)
428    }
429
430    fn has_predicate(&self) -> bool {
431        let predicate = self.stream_ctx.input.predicate();
432        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
433    }
434
435    fn set_logical_region(&mut self, logical_region: bool) {
436        self.properties.set_logical_region(logical_region);
437    }
438}
439
440impl DisplayAs for UnorderedScan {
441    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
442        write!(
443            f,
444            "UnorderedScan: region={}, ",
445            self.stream_ctx.input.mapper.metadata().region_id
446        )?;
447        match t {
448            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
449                self.stream_ctx.format_for_explain(false, f)
450            }
451            DisplayFormatType::Verbose => {
452                self.stream_ctx.format_for_explain(true, f)?;
453                self.metrics_list.format_verbose_metrics(f)
454            }
455        }
456    }
457}
458
459impl fmt::Debug for UnorderedScan {
460    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
461        f.debug_struct("UnorderedScan")
462            .field("num_ranges", &self.stream_ctx.ranges.len())
463            .finish()
464    }
465}
466
467#[cfg(test)]
468impl UnorderedScan {
469    /// Returns the input.
470    pub(crate) fn input(&self) -> &ScanInput {
471        &self.stream_ctx.input
472    }
473}