mito2/read/
unordered_scan.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Unordered scanner.
16
17use std::fmt;
18use std::sync::Arc;
19use std::time::Instant;
20
21use async_stream::{stream, try_stream};
22use common_error::ext::BoxedError;
23use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
24use common_telemetry::{tracing, warn};
25use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType};
27use datatypes::arrow::record_batch::RecordBatch;
28use datatypes::schema::SchemaRef;
29use futures::{Stream, StreamExt};
30use snafu::ensure;
31use store_api::metadata::RegionMetadataRef;
32use store_api::region_engine::{
33    PrepareRequest, QueryScanContext, RegionScanner, ScannerProperties,
34};
35
36use crate::error::{PartitionOutOfRangeSnafu, Result};
37use crate::read::pruner::{PartitionPruner, Pruner};
38use crate::read::scan_region::{ScanInput, StreamContext};
39use crate::read::scan_util::{
40    PartitionMetrics, PartitionMetricsList, scan_file_ranges, scan_flat_file_ranges,
41    scan_flat_mem_ranges, scan_mem_ranges,
42};
43use crate::read::stream::{ConvertBatchStream, ScanBatch, ScanBatchStream};
44use crate::read::{Batch, ScannerMetrics, scan_util};
45
46/// Scans a region without providing any output ordering guarantee.
47///
48/// Only an append only table should use this scanner.
49pub struct UnorderedScan {
50    /// Properties of the scanner.
51    properties: ScannerProperties,
52    /// Context of streams.
53    stream_ctx: Arc<StreamContext>,
54    /// Shared pruner for file range building.
55    pruner: Arc<Pruner>,
56    /// Metrics for each partition.
57    metrics_list: PartitionMetricsList,
58}
59
60impl UnorderedScan {
61    /// Creates a new [UnorderedScan].
62    pub(crate) fn new(input: ScanInput) -> Self {
63        let mut properties = ScannerProperties::default()
64            .with_append_mode(input.append_mode)
65            .with_total_rows(input.total_rows());
66        let stream_ctx = Arc::new(StreamContext::unordered_scan_ctx(input));
67        properties.partitions = vec![stream_ctx.partition_ranges()];
68
69        // Create the shared pruner with number of workers equal to CPU cores.
70        let num_workers = common_stat::get_total_cpu_cores().max(1);
71        let pruner = Arc::new(Pruner::new(stream_ctx.clone(), num_workers));
72
73        Self {
74            properties,
75            stream_ctx,
76            pruner,
77            metrics_list: PartitionMetricsList::default(),
78        }
79    }
80
81    /// Scans the region and returns a stream.
82    #[tracing::instrument(
83        skip_all,
84        fields(region_id = %self.stream_ctx.input.mapper.metadata().region_id)
85    )]
86    pub(crate) async fn build_stream(&self) -> Result<SendableRecordBatchStream, BoxedError> {
87        let metrics_set = ExecutionPlanMetricsSet::new();
88        let part_num = self.properties.num_partitions();
89        let streams = (0..part_num)
90            .map(|i| self.scan_partition(&QueryScanContext::default(), &metrics_set, i))
91            .collect::<Result<Vec<_>, BoxedError>>()?;
92        let stream = stream! {
93            for mut stream in streams {
94                while let Some(rb) = stream.next().await {
95                    yield rb;
96                }
97            }
98        };
99        let stream = Box::pin(RecordBatchStreamWrapper::new(
100            self.schema(),
101            Box::pin(stream),
102        ));
103        Ok(stream)
104    }
105
106    /// Scans a [PartitionRange] by its `identifier` and returns a stream.
107    #[tracing::instrument(
108        skip_all,
109        fields(
110            region_id = %stream_ctx.input.region_metadata().region_id,
111            part_range_id = part_range_id
112        )
113    )]
114    fn scan_partition_range(
115        stream_ctx: Arc<StreamContext>,
116        part_range_id: usize,
117        part_metrics: PartitionMetrics,
118        partition_pruner: Arc<PartitionPruner>,
119    ) -> impl Stream<Item = Result<Batch>> {
120        try_stream! {
121            // Gets range meta.
122            let range_meta = &stream_ctx.ranges[part_range_id];
123            for index in &range_meta.row_group_indices {
124                if stream_ctx.is_mem_range_index(*index) {
125                    let stream = scan_mem_ranges(
126                        stream_ctx.clone(),
127                        part_metrics.clone(),
128                        *index,
129                        range_meta.time_range,
130                    );
131                    for await batch in stream {
132                        yield batch?;
133                    }
134                } else if stream_ctx.is_file_range_index(*index) {
135                    let stream = scan_file_ranges(
136                        stream_ctx.clone(),
137                        part_metrics.clone(),
138                        *index,
139                        "unordered_scan_files",
140                        partition_pruner.clone(),
141                    ).await?;
142                    for await batch in stream {
143                        yield batch?;
144                    }
145                } else {
146                    let stream = scan_util::maybe_scan_other_ranges(
147                        &stream_ctx,
148                        *index,
149                        &part_metrics,
150                    ).await?;
151                    for await batch in stream {
152                        yield batch?;
153                    }
154                }
155            }
156        }
157    }
158
159    /// Scans a [PartitionRange] by its `identifier` and returns a flat stream of RecordBatch.
160    #[tracing::instrument(
161        skip_all,
162        fields(
163            region_id = %stream_ctx.input.region_metadata().region_id,
164            part_range_id = part_range_id
165        )
166    )]
167    fn scan_flat_partition_range(
168        stream_ctx: Arc<StreamContext>,
169        part_range_id: usize,
170        part_metrics: PartitionMetrics,
171        partition_pruner: Arc<PartitionPruner>,
172    ) -> impl Stream<Item = Result<RecordBatch>> {
173        try_stream! {
174            // Gets range meta.
175            let range_meta = &stream_ctx.ranges[part_range_id];
176            for index in &range_meta.row_group_indices {
177                if stream_ctx.is_mem_range_index(*index) {
178                    let stream = scan_flat_mem_ranges(
179                        stream_ctx.clone(),
180                        part_metrics.clone(),
181                        *index,
182                        range_meta.time_range,
183                    );
184                    for await record_batch in stream {
185                        yield record_batch?;
186                    }
187                } else if stream_ctx.is_file_range_index(*index) {
188                    let stream = scan_flat_file_ranges(
189                        stream_ctx.clone(),
190                        part_metrics.clone(),
191                        *index,
192                        "unordered_scan_files",
193                        partition_pruner.clone(),
194                    ).await?;
195                    for await record_batch in stream {
196                        yield record_batch?;
197                    }
198                } else {
199                    let stream = scan_util::maybe_scan_flat_other_ranges(
200                        &stream_ctx,
201                        *index,
202                        &part_metrics,
203                    ).await?;
204                    for await record_batch in stream {
205                        yield record_batch?;
206                    }
207                }
208            }
209        }
210    }
211
212    /// Scan [`Batch`] in all partitions one by one.
213    pub(crate) fn scan_all_partitions(&self) -> Result<ScanBatchStream> {
214        let metrics_set = ExecutionPlanMetricsSet::new();
215
216        let streams = (0..self.properties.partitions.len())
217            .map(|partition| {
218                let metrics = self.partition_metrics(false, partition, &metrics_set);
219                self.scan_batch_in_partition(partition, metrics)
220            })
221            .collect::<Result<Vec<_>>>()?;
222
223        Ok(Box::pin(futures::stream::iter(streams).flatten()))
224    }
225
226    fn partition_metrics(
227        &self,
228        explain_verbose: bool,
229        partition: usize,
230        metrics_set: &ExecutionPlanMetricsSet,
231    ) -> PartitionMetrics {
232        let part_metrics = PartitionMetrics::new(
233            self.stream_ctx.input.mapper.metadata().region_id,
234            partition,
235            "UnorderedScan",
236            self.stream_ctx.query_start,
237            explain_verbose,
238            metrics_set,
239        );
240        self.metrics_list.set(partition, part_metrics.clone());
241        part_metrics
242    }
243
244    #[tracing::instrument(
245        skip_all,
246        fields(
247            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
248            partition = partition
249        )
250    )]
251    fn scan_partition_impl(
252        &self,
253        ctx: &QueryScanContext,
254        metrics_set: &ExecutionPlanMetricsSet,
255        partition: usize,
256    ) -> Result<SendableRecordBatchStream> {
257        if ctx.explain_verbose {
258            common_telemetry::info!(
259                "UnorderedScan partition {}, region_id: {}",
260                partition,
261                self.stream_ctx.input.region_metadata().region_id
262            );
263        }
264
265        let metrics = self.partition_metrics(ctx.explain_verbose, partition, metrics_set);
266        let input = &self.stream_ctx.input;
267
268        let batch_stream = if input.flat_format {
269            // Use flat scan for bulk memtables
270            self.scan_flat_batch_in_partition(partition, metrics.clone())?
271        } else {
272            // Use regular batch scan for normal memtables
273            self.scan_batch_in_partition(partition, metrics.clone())?
274        };
275
276        let record_batch_stream = ConvertBatchStream::new(
277            batch_stream,
278            input.mapper.clone(),
279            input.cache_strategy.clone(),
280            metrics,
281        );
282
283        Ok(Box::pin(RecordBatchStreamWrapper::new(
284            input.mapper.output_schema(),
285            Box::pin(record_batch_stream),
286        )))
287    }
288
289    #[tracing::instrument(
290        skip_all,
291        fields(
292            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
293            partition = partition
294        )
295    )]
296    fn scan_batch_in_partition(
297        &self,
298        partition: usize,
299        part_metrics: PartitionMetrics,
300    ) -> Result<ScanBatchStream> {
301        ensure!(
302            partition < self.properties.partitions.len(),
303            PartitionOutOfRangeSnafu {
304                given: partition,
305                all: self.properties.partitions.len(),
306            }
307        );
308
309        let stream_ctx = self.stream_ctx.clone();
310        let part_ranges = self.properties.partitions[partition].clone();
311        let distinguish_range = self.properties.distinguish_partition_range;
312        let pruner = self.pruner.clone();
313        // Initializes ref counts for the pruner.
314        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
315        // then the ref count won't be decremented.
316        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
317        pruner.add_partition_ranges(&part_ranges);
318        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
319
320        let stream = try_stream! {
321            part_metrics.on_first_poll();
322
323            // Scans each part.
324            for part_range in part_ranges {
325                let mut metrics = ScannerMetrics::default();
326                let mut fetch_start = Instant::now();
327                let _mapper = &stream_ctx.input.mapper;
328                #[cfg(debug_assertions)]
329                let mut checker = crate::read::BatchChecker::default()
330                    .with_start(Some(part_range.start))
331                    .with_end(Some(part_range.end));
332
333                let stream = Self::scan_partition_range(
334                    stream_ctx.clone(),
335                    part_range.identifier,
336                    part_metrics.clone(),
337                    partition_pruner.clone(),
338                );
339                for await batch in stream {
340                    let batch = batch?;
341                    metrics.scan_cost += fetch_start.elapsed();
342                    metrics.num_batches += 1;
343                    metrics.num_rows += batch.num_rows();
344
345                    debug_assert!(!batch.is_empty());
346                    if batch.is_empty() {
347                        continue;
348                    }
349
350                    #[cfg(debug_assertions)]
351                    checker.ensure_part_range_batch(
352                        "UnorderedScan",
353                        _mapper.metadata().region_id,
354                        partition,
355                        part_range,
356                        &batch,
357                    );
358
359                    let yield_start = Instant::now();
360                    yield ScanBatch::Normal(batch);
361                    metrics.yield_cost += yield_start.elapsed();
362
363                    fetch_start = Instant::now();
364                }
365
366                // Yields an empty part to indicate this range is terminated.
367                // The query engine can use this to optimize some queries.
368                if distinguish_range {
369                    let yield_start = Instant::now();
370                    yield ScanBatch::Normal(Batch::empty());
371                    metrics.yield_cost += yield_start.elapsed();
372                }
373
374                metrics.scan_cost += fetch_start.elapsed();
375                part_metrics.merge_metrics(&metrics);
376            }
377
378            part_metrics.on_finish();
379        };
380        Ok(Box::pin(stream))
381    }
382
383    #[tracing::instrument(
384        skip_all,
385        fields(
386            region_id = %self.stream_ctx.input.mapper.metadata().region_id,
387            partition = partition
388        )
389    )]
390    fn scan_flat_batch_in_partition(
391        &self,
392        partition: usize,
393        part_metrics: PartitionMetrics,
394    ) -> Result<ScanBatchStream> {
395        ensure!(
396            partition < self.properties.partitions.len(),
397            PartitionOutOfRangeSnafu {
398                given: partition,
399                all: self.properties.partitions.len(),
400            }
401        );
402
403        let stream_ctx = self.stream_ctx.clone();
404        let part_ranges = self.properties.partitions[partition].clone();
405        let pruner = self.pruner.clone();
406        // Initializes ref counts for the pruner.
407        // If we call scan_batch_in_partition() multiple times but don't read all batches from the stream,
408        // then the ref count won't be decremented.
409        // This is a rare case and keeping all remaining entries still uses less memory than a per partition cache.
410        pruner.add_partition_ranges(&part_ranges);
411        let partition_pruner = Arc::new(PartitionPruner::new(pruner, &part_ranges));
412
413        let stream = try_stream! {
414            part_metrics.on_first_poll();
415
416            // Scans each part.
417            for part_range in part_ranges {
418                let mut metrics = ScannerMetrics::default();
419                let mut fetch_start = Instant::now();
420
421                let stream = Self::scan_flat_partition_range(
422                    stream_ctx.clone(),
423                    part_range.identifier,
424                    part_metrics.clone(),
425                    partition_pruner.clone(),
426                );
427                for await record_batch in stream {
428                    let record_batch = record_batch?;
429                    metrics.scan_cost += fetch_start.elapsed();
430                    metrics.num_batches += 1;
431                    metrics.num_rows += record_batch.num_rows();
432
433                    debug_assert!(record_batch.num_rows() > 0);
434                    if record_batch.num_rows() == 0 {
435                        continue;
436                    }
437
438                    let yield_start = Instant::now();
439                    yield ScanBatch::RecordBatch(record_batch);
440                    metrics.yield_cost += yield_start.elapsed();
441
442                    fetch_start = Instant::now();
443                }
444
445                metrics.scan_cost += fetch_start.elapsed();
446                part_metrics.merge_metrics(&metrics);
447            }
448
449            part_metrics.on_finish();
450        };
451        Ok(Box::pin(stream))
452    }
453}
454
455impl RegionScanner for UnorderedScan {
456    fn name(&self) -> &str {
457        "UnorderedScan"
458    }
459
460    fn properties(&self) -> &ScannerProperties {
461        &self.properties
462    }
463
464    fn schema(&self) -> SchemaRef {
465        self.stream_ctx.input.mapper.output_schema()
466    }
467
468    fn metadata(&self) -> RegionMetadataRef {
469        self.stream_ctx.input.mapper.metadata().clone()
470    }
471
472    fn prepare(&mut self, request: PrepareRequest) -> Result<(), BoxedError> {
473        self.properties.prepare(request);
474
475        Ok(())
476    }
477
478    fn scan_partition(
479        &self,
480        ctx: &QueryScanContext,
481        metrics_set: &ExecutionPlanMetricsSet,
482        partition: usize,
483    ) -> Result<SendableRecordBatchStream, BoxedError> {
484        self.scan_partition_impl(ctx, metrics_set, partition)
485            .map_err(BoxedError::new)
486    }
487
488    /// If this scanner have predicate other than region partition exprs
489    fn has_predicate_without_region(&self) -> bool {
490        let predicate = self
491            .stream_ctx
492            .input
493            .predicate_group()
494            .predicate_without_region();
495        predicate.is_some()
496    }
497
498    fn add_dyn_filter_to_predicate(
499        &mut self,
500        filter_exprs: Vec<Arc<dyn datafusion::physical_plan::PhysicalExpr>>,
501    ) -> Vec<bool> {
502        self.stream_ctx.add_dyn_filter_to_predicate(filter_exprs)
503    }
504
505    fn set_logical_region(&mut self, logical_region: bool) {
506        self.properties.set_logical_region(logical_region);
507    }
508}
509
510impl DisplayAs for UnorderedScan {
511    fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
512        write!(
513            f,
514            "UnorderedScan: region={}, ",
515            self.stream_ctx.input.mapper.metadata().region_id
516        )?;
517        match t {
518            DisplayFormatType::Default | DisplayFormatType::TreeRender => {
519                self.stream_ctx.format_for_explain(false, f)
520            }
521            DisplayFormatType::Verbose => {
522                self.stream_ctx.format_for_explain(true, f)?;
523                self.metrics_list.format_verbose_metrics(f)
524            }
525        }
526    }
527}
528
529impl fmt::Debug for UnorderedScan {
530    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
531        f.debug_struct("UnorderedScan")
532            .field("num_ranges", &self.stream_ctx.ranges.len())
533            .finish()
534    }
535}
536
537#[cfg(test)]
538impl UnorderedScan {
539    /// Returns the input.
540    pub(crate) fn input(&self) -> &ScanInput {
541        &self.stream_ctx.input
542    }
543}