mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::tracing;
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::range::{RangeBuilderList, file_range_counts};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41    scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46/// Scans a region without providing any output ordering guarantee.
47///
48/// Only an append only table should use this scanner.
49pub struct UnorderedScan {
50    /// Properties of the scanner.
51    properties: ScannerProperties,
52    /// Context of streams.
53    stream_ctx: Arc<StreamContext>,
54    /// Metrics for each partition.
55    metrics_list: PartitionMetricsList,
56}
57
58impl UnorderedScan {
59    /// Creates a new [UnorderedScan].
60    pub(crate) fn new(input: ScanInput) -> Self {
61        let mut properties = ScannerProperties::default()
62            .with_append_mode(input.append_mode)
63            .with_total_rows(input.total_rows());
64        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
65        properties.partitions = vec![stream_ctx.partition_ranges()];
66
67        Self {
68            properties,
69            stream_ctx,
70            metrics_list: PartitionMetricsList::default(),
71        }
72    }
73
74    /// Scans the region and returns a stream.
75    #[tracing::instrument(
76        skip_all,
77        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
78    )]
79    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
80        let metrics_set = ExecutionPlanMetricsSet::new();
81        let part_num = self.properties.num_partitions();
82        let streams = (0..part_num)
83            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
84            .collect::<Result<Vec<_>, BoxedError>>()?;
85        let stream = stream! {
86            for mut stream in streams {
87                while let Some(rb) = stream.next().await {
88                    yield rb;
89                }
90            }
91        };
92        let stream = Box::pin(RecordBatchStreamWrapper::new(
93            self.schema(),
94            Box::pin(stream),
95        ));
96        Ok(stream)
97    }
98
99    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
100    #[tracing::instrument(
101        skip_all,
102        fields(
103            region_id = %stream_ctx.input.region_metadata().region_id,
104            part_range_id = part_range_id
105        )
106    )]
107    fn scan_partition_range(
108        stream_ctx: Arc<StreamContext>,
109        part_range_id: usize,
110        part_metrics: PartitionMetrics,
111        range_builder_list: Arc<RangeBuilderList>,
112    ) -> impl Stream<Item = Result<Batch>> {
113        try_stream! {
114            // Gets range meta.
115            let range_meta = &stream_ctx.ranges[part_range_id];
116            for index in &range_meta.row_group_indices {
117                if stream_ctx.is_mem_range_index(*index) {
118                    let stream = scan_mem_ranges(
119                        stream_ctx.clone(),
120                        part_metrics.clone(),
121                        *index,
122                        range_meta.time_range,
123                    );
124                    for await batch in stream {
125                        yield batch?;
126                    }
127                } else if stream_ctx.is_file_range_index(*index) {
128                    let stream = scan_file_ranges(
129                        stream_ctx.clone(),
130                        part_metrics.clone(),
131                        *index,
132                        "unordered_scan_files",
133                        range_builder_list.clone(),
134                    ).await?;
135                    for await batch in stream {
136                        yield batch?;
137                    }
138                } else {
139                    let stream = scan_util::maybe_scan_other_ranges(
140                        &stream_ctx,
141                        *index,
142                        &part_metrics,
143                    ).await?;
144                    for await batch in stream {
145                        yield batch?;
146                    }
147                }
148            }
149        }
150    }
151
152    /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
153    #[tracing::instrument(
154        skip_all,
155        fields(
156            region_id = %stream_ctx.input.region_metadata().region_id,
157            part_range_id = part_range_id
158        )
159    )]
160    fn scan_flat_partition_range(
161        stream_ctx: Arc<StreamContext>,
162        part_range_id: usize,
163        part_metrics: PartitionMetrics,
164        range_builder_list: Arc<RangeBuilderList>,
165    ) -> impl Stream<Item = Result<RecordBatch>> {
166        try_stream! {
167            // Gets range meta.
168            let range_meta = &stream_ctx.ranges[part_range_id];
169            for index in &range_meta.row_group_indices {
170                if stream_ctx.is_mem_range_index(*index) {
171                    let stream = scan_flat_mem_ranges(
172                        stream_ctx.clone(),
173                        part_metrics.clone(),
174                        *index,
175                    );
176                    for await record_batch in stream {
177                        yield record_batch?;
178                    }
179                } else if stream_ctx.is_file_range_index(*index) {
180                    let stream = scan_flat_file_ranges(
181                        stream_ctx.clone(),
182                        part_metrics.clone(),
183                        *index,
184                        "unordered_scan_files",
185                        range_builder_list.clone(),
186                    ).await?;
187                    for await record_batch in stream {
188                        yield record_batch?;
189                    }
190                } else {
191                    let stream = scan_util::maybe_scan_flat_other_ranges(
192                        &stream_ctx,
193                        *index,
194                        &part_metrics,
195                    ).await?;
196                    for await record_batch in stream {
197                        yield record_batch?;
198                    }
199                }
200            }
201        }
202    }
203
204    /// Scan [`Batch`] in all partitions one by one.
205    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
206        let metrics_set = ExecutionPlanMetricsSet::new();
207
208        let streams = (0..self.properties.partitions.len())
209            .map(|partition| {
210                let metrics = self.partition_metrics(false, partition, &metrics_set);
211                self.scan_batch_in_partition(partition, metrics)
212            })
213            .collect::<Result<Vec<_>>>()?;
214
215        Ok(Box::pin(futures::stream::iter(streams).flatten()))
216    }
217
218    fn partition_metrics(
219        &self,
220        explain_verbose: bool,
221        partition: usize,
222        metrics_set: &ExecutionPlanMetricsSet,
223    ) -> PartitionMetrics {
224        let part_metrics = PartitionMetrics::new(
225            self.stream_ctx.input.mapper.metadata().region_id,
226            partition,
227            "UnorderedScan",
228            self.stream_ctx.query_start,
229            explain_verbose,
230            metrics_set,
231        );
232        self.metrics_list.set(partition, part_metrics.clone());
233        part_metrics
234    }
235
236    #[tracing::instrument(
237        skip_all,
238        fields(
239            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
240            partition = partition
241        )
242    )]
243    fn scan_partition_impl(
244        &self,
245        ctx: &QueryScanContext,
246        metrics_set: &ExecutionPlanMetricsSet,
247        partition: usize,
248    ) -> Result<SendableRecordBatchStream> {
249        if ctx.explain_verbose {
250            common_telemetry::info!(
251                "UnorderedScan partition {}, region_id: {}",
252                partition,
253                self.stream_ctx.input.region_metadata().region_id
254            );
255        }
256
257        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
258        let input = &self.stream_ctx.input;
259
260        let batch_stream = if input.flat_format {
261            // Use flat scan for bulk memtables
262            self.scan_flat_batch_in_partition(partition, metrics.clone())?
263        } else {
264            // Use regular batch scan for normal memtables
265            self.scan_batch_in_partition(partition, metrics.clone())?
266        };
267
268        let record_batch_stream = ConvertBatchStream::new(
269            batch_stream,
270            input.mapper.clone(),
271            input.cache_strategy.clone(),
272            metrics,
273        );
274
275        Ok(Box::pin(RecordBatchStreamWrapper::new(
276            input.mapper.output_schema(),
277            Box::pin(record_batch_stream),
278        )))
279    }
280
281    #[tracing::instrument(
282        skip_all,
283        fields(
284            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
285            partition = partition
286        )
287    )]
288    fn scan_batch_in_partition(
289        &self,
290        partition: usize,
291        part_metrics: PartitionMetrics,
292    ) -> Result<ScanBatchStream> {
293        ensure!(
294            partition < self.properties.partitions.len(),
295            PartitionOutOfRangeSnafu {
296                given: partition,
297                all: self.properties.partitions.len(),
298            }
299        );
300
301        let stream_ctx = self.stream_ctx.clone();
302        let part_ranges = self.properties.partitions[partition].clone();
303        let distinguish_range = self.properties.distinguish_partition_range;
304
305        let stream = try_stream! {
306            part_metrics.on_first_poll();
307
308            let counts = file_range_counts(
309                stream_ctx.input.num_memtables(),
310                stream_ctx.input.num_files(),
311                &stream_ctx.ranges,
312                part_ranges.iter(),
313            );
314            let range_builder_list = Arc::new(RangeBuilderList::new(
315                stream_ctx.input.num_memtables(),
316                counts,
317            ));
318            // Scans each part.
319            for part_range in part_ranges {
320                let mut metrics = ScannerMetrics::default();
321                let mut fetch_start = Instant::now();
322                let _mapper = &stream_ctx.input.mapper;
323                #[cfg(debug_assertions)]
324                let mut checker = crate::read::BatchChecker::default()
325                    .with_start(Some(part_range.start))
326                    .with_end(Some(part_range.end));
327
328                let stream = Self::scan_partition_range(
329                    stream_ctx.clone(),
330                    part_range.identifier,
331                    part_metrics.clone(),
332                    range_builder_list.clone(),
333                );
334                for await batch in stream {
335                    let batch = batch?;
336                    metrics.scan_cost += fetch_start.elapsed();
337                    metrics.num_batches += 1;
338                    metrics.num_rows += batch.num_rows();
339
340                    debug_assert!(!batch.is_empty());
341                    if batch.is_empty() {
342                        continue;
343                    }
344
345                    #[cfg(debug_assertions)]
346                    checker.ensure_part_range_batch(
347                        "UnorderedScan",
348                        _mapper.metadata().region_id,
349                        partition,
350                        part_range,
351                        &batch,
352                    );
353
354                    let yield_start = Instant::now();
355                    yield ScanBatch::Normal(batch);
356                    metrics.yield_cost += yield_start.elapsed();
357
358                    fetch_start = Instant::now();
359                }
360
361                // Yields an empty part to indicate this range is terminated.
362                // The query engine can use this to optimize some queries.
363                if distinguish_range {
364                    let yield_start = Instant::now();
365                    yield ScanBatch::Normal(Batch::empty());
366                    metrics.yield_cost += yield_start.elapsed();
367                }
368
369                metrics.scan_cost += fetch_start.elapsed();
370                part_metrics.merge_metrics(&metrics);
371            }
372
373            part_metrics.on_finish();
374        };
375        Ok(Box::pin(stream))
376    }
377
378    #[tracing::instrument(
379        skip_all,
380        fields(
381            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
382            partition = partition
383        )
384    )]
385    fn scan_flat_batch_in_partition(
386        &self,
387        partition: usize,
388        part_metrics: PartitionMetrics,
389    ) -> Result<ScanBatchStream> {
390        ensure!(
391            partition < self.properties.partitions.len(),
392            PartitionOutOfRangeSnafu {
393                given: partition,
394                all: self.properties.partitions.len(),
395            }
396        );
397
398        let stream_ctx = self.stream_ctx.clone();
399        let part_ranges = self.properties.partitions[partition].clone();
400
401        let stream = try_stream! {
402            part_metrics.on_first_poll();
403
404            let counts = file_range_counts(
405                stream_ctx.input.num_memtables(),
406                stream_ctx.input.num_files(),
407                &stream_ctx.ranges,
408                part_ranges.iter(),
409            );
410            let range_builder_list = Arc::new(RangeBuilderList::new(
411                stream_ctx.input.num_memtables(),
412                counts,
413            ));
414            // Scans each part.
415            for part_range in part_ranges {
416                let mut metrics = ScannerMetrics::default();
417                let mut fetch_start = Instant::now();
418
419                let stream = Self::scan_flat_partition_range(
420                    stream_ctx.clone(),
421                    part_range.identifier,
422                    part_metrics.clone(),
423                    range_builder_list.clone(),
424                );
425                for await record_batch in stream {
426                    let record_batch = record_batch?;
427                    metrics.scan_cost += fetch_start.elapsed();
428                    metrics.num_batches += 1;
429                    metrics.num_rows += record_batch.num_rows();
430
431                    debug_assert!(record_batch.num_rows() > 0);
432                    if record_batch.num_rows() == 0 {
433                        continue;
434                    }
435
436                    let yield_start = Instant::now();
437                    yield ScanBatch::RecordBatch(record_batch);
438                    metrics.yield_cost += yield_start.elapsed();
439
440                    fetch_start = Instant::now();
441                }
442
443                metrics.scan_cost += fetch_start.elapsed();
444                part_metrics.merge_metrics(&metrics);
445            }
446
447            part_metrics.on_finish();
448        };
449        Ok(Box::pin(stream))
450    }
451}
452
453impl RegionScanner for UnorderedScan {
454    fn name(&self) -> &str {
455        "UnorderedScan"
456    }
457
458    fn properties(&self) -> &ScannerProperties {
459        &self.properties
460    }
461
462    fn schema(&self) -> SchemaRef {
463        self.stream_ctx.input.mapper.output_schema()
464    }
465
466    fn metadata(&self) -> RegionMetadataRef {
467        self.stream_ctx.input.mapper.metadata().clone()
468    }
469
470    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
471        self.properties.prepare(request);
472        // UnorderedScan only scans one row group per partition so the resource requirement won't be too high.
473        Ok(())
474    }
475
476    fn scan_partition(
477        &self,
478        ctx: &QueryScanContext,
479        metrics_set: &ExecutionPlanMetricsSet,
480        partition: usize,
481    ) -> Result<SendableRecordBatchStream, BoxedError> {
482        self.scan_partition_impl(ctx, metrics_set, partition)
483            .map_err(BoxedError::new)
484    }
485
486    /// If this scanner have predicate other than region partition exprs
487    fn has_predicate_without_region(&self) -> bool {
488        let predicate = self
489            .stream_ctx
490            .input
491            .predicate_group()
492            .predicate_without_region();
493
494        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
495    }
496
497    fn set_logical_region(&mut self, logical_region: bool) {
498        self.properties.set_logical_region(logical_region);
499    }
500}
501
502impl DisplayAs for UnorderedScan {
503    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
504        write!(
505            f,
506            "UnorderedScan: region={}, ",
507            self.stream_ctx.input.mapper.metadata().region_id
508        )?;
509        match t {
510            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
511                self.stream_ctx.format_for_explain(false, f)
512            }
513            DisplayFormatType::Verbose => {
514                self.stream_ctx.format_for_explain(true, f)?;
515                self.metrics_list.format_verbose_metrics(f)
516            }
517        }
518    }
519}
520
521impl fmt::Debug for UnorderedScan {
522    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
523        f.debug_struct("UnorderedScan")
524            .field("num_ranges", &self.stream_ctx.ranges.len())
525            .finish()
526    }
527}
528
529#[cfg(test)]
530impl UnorderedScan {
531    /// Returns the input.
532    pub(crate) fn input(&self) -> &ScanInput {
533        &self.stream_ctx.input
534    }
535}