mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::tracing;
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41    scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46/// Scans a region without providing any output ordering guarantee.
47///
48/// Only an append only table should use this scanner.
49pub struct UnorderedScan {
50    /// Properties of the scanner.
51    properties: ScannerProperties,
52    /// Context of streams.
53    stream_ctx: Arc<StreamContext>,
54    /// Shared pruner for file range building.
55    pruner: Arc<Pruner>,
56    /// Metrics for each partition.
57    metrics_list: PartitionMetricsList,
58}
59
60impl UnorderedScan {
61    /// Creates a new [UnorderedScan].
62    pub(crate) fn new(input: ScanInput) -> Self {
63        let mut properties = ScannerProperties::default()
64            .with_append_mode(input.append_mode)
65            .with_total_rows(input.total_rows());
66        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
67        properties.partitions = vec![stream_ctx.partition_ranges()];
68
69        // Create the shared pruner with number of workers equal to CPU cores.
70        let num_workers = common_stat::get_total_cpu_cores().max(1);
71        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
72
73        Self {
74            properties,
75            stream_ctx,
76            pruner,
77            metrics_list: PartitionMetricsList::default(),
78        }
79    }
80
81    /// Scans the region and returns a stream.
82    #[tracing::instrument(
83        skip_all,
84        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
85    )]
86    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87        let metrics_set = ExecutionPlanMetricsSet::new();
88        let part_num = self.properties.num_partitions();
89        let streams = (0..part_num)
90            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
91            .collect::<Result<Vec<_>, BoxedError>>()?;
92        let stream = stream! {
93            for mut stream in streams {
94                while let Some(rb) = stream.next().await {
95                    yield rb;
96                }
97            }
98        };
99        let stream = Box::pin(RecordBatchStreamWrapper::new(
100            self.schema(),
101            Box::pin(stream),
102        ));
103        Ok(stream)
104    }
105
106    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
107    #[tracing::instrument(
108        skip_all,
109        fields(
110            region_id = %stream_ctx.input.region_metadata().region_id,
111            part_range_id = part_range_id
112        )
113    )]
114    fn scan_partition_range(
115        stream_ctx: Arc<StreamContext>,
116        part_range_id: usize,
117        part_metrics: PartitionMetrics,
118        partition_pruner: Arc<PartitionPruner>,
119    ) -> impl Stream<Item = Result<Batch>> {
120        try_stream! {
121            // Gets range meta.
122            let range_meta = &stream_ctx.ranges[part_range_id];
123            for index in &range_meta.row_group_indices {
124                if stream_ctx.is_mem_range_index(*index) {
125                    let stream = scan_mem_ranges(
126                        stream_ctx.clone(),
127                        part_metrics.clone(),
128                        *index,
129                        range_meta.time_range,
130                    );
131                    for await batch in stream {
132                        yield batch?;
133                    }
134                } else if stream_ctx.is_file_range_index(*index) {
135                    let stream = scan_file_ranges(
136                        stream_ctx.clone(),
137                        part_metrics.clone(),
138                        *index,
139                        "unordered_scan_files",
140                        partition_pruner.clone(),
141                    ).await?;
142                    for await batch in stream {
143                        yield batch?;
144                    }
145                } else {
146                    let stream = scan_util::maybe_scan_other_ranges(
147                        &stream_ctx,
148                        *index,
149                        &part_metrics,
150                    ).await?;
151                    for await batch in stream {
152                        yield batch?;
153                    }
154                }
155            }
156        }
157    }
158
159    /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
160    #[tracing::instrument(
161        skip_all,
162        fields(
163            region_id = %stream_ctx.input.region_metadata().region_id,
164            part_range_id = part_range_id
165        )
166    )]
167    fn scan_flat_partition_range(
168        stream_ctx: Arc<StreamContext>,
169        part_range_id: usize,
170        part_metrics: PartitionMetrics,
171        partition_pruner: Arc<PartitionPruner>,
172    ) -> impl Stream<Item = Result<RecordBatch>> {
173        try_stream! {
174            // Gets range meta.
175            let range_meta = &stream_ctx.ranges[part_range_id];
176            for index in &range_meta.row_group_indices {
177                if stream_ctx.is_mem_range_index(*index) {
178                    let stream = scan_flat_mem_ranges(
179                        stream_ctx.clone(),
180                        part_metrics.clone(),
181                        *index,
182                    );
183                    for await record_batch in stream {
184                        yield record_batch?;
185                    }
186                } else if stream_ctx.is_file_range_index(*index) {
187                    let stream = scan_flat_file_ranges(
188                        stream_ctx.clone(),
189                        part_metrics.clone(),
190                        *index,
191                        "unordered_scan_files",
192                        partition_pruner.clone(),
193                    ).await?;
194                    for await record_batch in stream {
195                        yield record_batch?;
196                    }
197                } else {
198                    let stream = scan_util::maybe_scan_flat_other_ranges(
199                        &stream_ctx,
200                        *index,
201                        &part_metrics,
202                    ).await?;
203                    for await record_batch in stream {
204                        yield record_batch?;
205                    }
206                }
207            }
208        }
209    }
210
211    /// Scan [`Batch`] in all partitions one by one.
212    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
213        let metrics_set = ExecutionPlanMetricsSet::new();
214
215        let streams = (0..self.properties.partitions.len())
216            .map(|partition| {
217                let metrics = self.partition_metrics(false, partition, &metrics_set);
218                self.scan_batch_in_partition(partition, metrics)
219            })
220            .collect::<Result<Vec<_>>>()?;
221
222        Ok(Box::pin(futures::stream::iter(streams).flatten()))
223    }
224
225    fn partition_metrics(
226        &self,
227        explain_verbose: bool,
228        partition: usize,
229        metrics_set: &ExecutionPlanMetricsSet,
230    ) -> PartitionMetrics {
231        let part_metrics = PartitionMetrics::new(
232            self.stream_ctx.input.mapper.metadata().region_id,
233            partition,
234            "UnorderedScan",
235            self.stream_ctx.query_start,
236            explain_verbose,
237            metrics_set,
238        );
239        self.metrics_list.set(partition, part_metrics.clone());
240        part_metrics
241    }
242
243    #[tracing::instrument(
244        skip_all,
245        fields(
246            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
247            partition = partition
248        )
249    )]
250    fn scan_partition_impl(
251        &self,
252        ctx: &QueryScanContext,
253        metrics_set: &ExecutionPlanMetricsSet,
254        partition: usize,
255    ) -> Result<SendableRecordBatchStream> {
256        if ctx.explain_verbose {
257            common_telemetry::info!(
258                "UnorderedScan partition {}, region_id: {}",
259                partition,
260                self.stream_ctx.input.region_metadata().region_id
261            );
262        }
263
264        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
265        let input = &self.stream_ctx.input;
266
267        let batch_stream = if input.flat_format {
268            // Use flat scan for bulk memtables
269            self.scan_flat_batch_in_partition(partition, metrics.clone())?
270        } else {
271            // Use regular batch scan for normal memtables
272            self.scan_batch_in_partition(partition, metrics.clone())?
273        };
274
275        let record_batch_stream = ConvertBatchStream::new(
276            batch_stream,
277            input.mapper.clone(),
278            input.cache_strategy.clone(),
279            metrics,
280        );
281
282        Ok(Box::pin(RecordBatchStreamWrapper::new(
283            input.mapper.output_schema(),
284            Box::pin(record_batch_stream),
285        )))
286    }
287
288    #[tracing::instrument(
289        skip_all,
290        fields(
291            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
292            partition = partition
293        )
294    )]
295    fn scan_batch_in_partition(
296        &self,
297        partition: usize,
298        part_metrics: PartitionMetrics,
299    ) -> Result<ScanBatchStream> {
300        ensure!(
301            partition < self.properties.partitions.len(),
302            PartitionOutOfRangeSnafu {
303                given: partition,
304                all: self.properties.partitions.len(),
305            }
306        );
307
308        let stream_ctx = self.stream_ctx.clone();
309        let part_ranges = self.properties.partitions[partition].clone();
310        let distinguish_range = self.properties.distinguish_partition_range;
311        let pruner = self.pruner.clone();
312        // Initializes ref counts for the pruner.
313        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
314        // then the ref count won't be decremented.
315        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
316        pruner.add_partition_ranges(&part_ranges);
317        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
318
319        let stream = try_stream! {
320            part_metrics.on_first_poll();
321
322            // Scans each part.
323            for part_range in part_ranges {
324                let mut metrics = ScannerMetrics::default();
325                let mut fetch_start = Instant::now();
326                let _mapper = &stream_ctx.input.mapper;
327                #[cfg(debug_assertions)]
328                let mut checker = crate::read::BatchChecker::default()
329                    .with_start(Some(part_range.start))
330                    .with_end(Some(part_range.end));
331
332                let stream = Self::scan_partition_range(
333                    stream_ctx.clone(),
334                    part_range.identifier,
335                    part_metrics.clone(),
336                    partition_pruner.clone(),
337                );
338                for await batch in stream {
339                    let batch = batch?;
340                    metrics.scan_cost += fetch_start.elapsed();
341                    metrics.num_batches += 1;
342                    metrics.num_rows += batch.num_rows();
343
344                    debug_assert!(!batch.is_empty());
345                    if batch.is_empty() {
346                        continue;
347                    }
348
349                    #[cfg(debug_assertions)]
350                    checker.ensure_part_range_batch(
351                        "UnorderedScan",
352                        _mapper.metadata().region_id,
353                        partition,
354                        part_range,
355                        &batch,
356                    );
357
358                    let yield_start = Instant::now();
359                    yield ScanBatch::Normal(batch);
360                    metrics.yield_cost += yield_start.elapsed();
361
362                    fetch_start = Instant::now();
363                }
364
365                // Yields an empty part to indicate this range is terminated.
366                // The query engine can use this to optimize some queries.
367                if distinguish_range {
368                    let yield_start = Instant::now();
369                    yield ScanBatch::Normal(Batch::empty());
370                    metrics.yield_cost += yield_start.elapsed();
371                }
372
373                metrics.scan_cost += fetch_start.elapsed();
374                part_metrics.merge_metrics(&metrics);
375            }
376
377            part_metrics.on_finish();
378        };
379        Ok(Box::pin(stream))
380    }
381
382    #[tracing::instrument(
383        skip_all,
384        fields(
385            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
386            partition = partition
387        )
388    )]
389    fn scan_flat_batch_in_partition(
390        &self,
391        partition: usize,
392        part_metrics: PartitionMetrics,
393    ) -> Result<ScanBatchStream> {
394        ensure!(
395            partition < self.properties.partitions.len(),
396            PartitionOutOfRangeSnafu {
397                given: partition,
398                all: self.properties.partitions.len(),
399            }
400        );
401
402        let stream_ctx = self.stream_ctx.clone();
403        let part_ranges = self.properties.partitions[partition].clone();
404        let pruner = self.pruner.clone();
405        // Initializes ref counts for the pruner.
406        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
407        // then the ref count won't be decremented.
408        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
409        pruner.add_partition_ranges(&part_ranges);
410        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
411
412        let stream = try_stream! {
413            part_metrics.on_first_poll();
414
415            // Scans each part.
416            for part_range in part_ranges {
417                let mut metrics = ScannerMetrics::default();
418                let mut fetch_start = Instant::now();
419
420                let stream = Self::scan_flat_partition_range(
421                    stream_ctx.clone(),
422                    part_range.identifier,
423                    part_metrics.clone(),
424                    partition_pruner.clone(),
425                );
426                for await record_batch in stream {
427                    let record_batch = record_batch?;
428                    metrics.scan_cost += fetch_start.elapsed();
429                    metrics.num_batches += 1;
430                    metrics.num_rows += record_batch.num_rows();
431
432                    debug_assert!(record_batch.num_rows() > 0);
433                    if record_batch.num_rows() == 0 {
434                        continue;
435                    }
436
437                    let yield_start = Instant::now();
438                    yield ScanBatch::RecordBatch(record_batch);
439                    metrics.yield_cost += yield_start.elapsed();
440
441                    fetch_start = Instant::now();
442                }
443
444                metrics.scan_cost += fetch_start.elapsed();
445                part_metrics.merge_metrics(&metrics);
446            }
447
448            part_metrics.on_finish();
449        };
450        Ok(Box::pin(stream))
451    }
452}
453
454impl RegionScanner for UnorderedScan {
455    fn name(&self) -> &str {
456        "UnorderedScan"
457    }
458
459    fn properties(&self) -> &ScannerProperties {
460        &self.properties
461    }
462
463    fn schema(&self) -> SchemaRef {
464        self.stream_ctx.input.mapper.output_schema()
465    }
466
467    fn metadata(&self) -> RegionMetadataRef {
468        self.stream_ctx.input.mapper.metadata().clone()
469    }
470
471    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
472        self.properties.prepare(request);
473
474        Ok(())
475    }
476
477    fn scan_partition(
478        &self,
479        ctx: &QueryScanContext,
480        metrics_set: &ExecutionPlanMetricsSet,
481        partition: usize,
482    ) -> Result<SendableRecordBatchStream, BoxedError> {
483        self.scan_partition_impl(ctx, metrics_set, partition)
484            .map_err(BoxedError::new)
485    }
486
487    /// If this scanner have predicate other than region partition exprs
488    fn has_predicate_without_region(&self) -> bool {
489        let predicate = self
490            .stream_ctx
491            .input
492            .predicate_group()
493            .predicate_without_region();
494
495        predicate.map(|p| !p.exprs().is_empty()).unwrap_or(false)
496    }
497
498    fn set_logical_region(&mut self, logical_region: bool) {
499        self.properties.set_logical_region(logical_region);
500    }
501}
502
503impl DisplayAs for UnorderedScan {
504    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
505        write!(
506            f,
507            "UnorderedScan: region={}, ",
508            self.stream_ctx.input.mapper.metadata().region_id
509        )?;
510        match t {
511            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
512                self.stream_ctx.format_for_explain(false, f)
513            }
514            DisplayFormatType::Verbose => {
515                self.stream_ctx.format_for_explain(true, f)?;
516                self.metrics_list.format_verbose_metrics(f)
517            }
518        }
519    }
520}
521
522impl fmt::Debug for UnorderedScan {
523    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
524        f.debug_struct("UnorderedScan")
525            .field("num_ranges", &self.stream_ctx.ranges.len())
526            .finish()
527    }
528}
529
530#[cfg(test)]
531impl UnorderedScan {
532    /// Returns the input.
533    pub(crate) fn input(&self) -> &ScanInput {
534        &self.stream_ctx.input
535    }
536}