mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::metrics::{
30    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
31    READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
32};
33use crate::read::range::{RangeBuilderList, RowGroupIndex};
34use crate::read::scan_region::StreamContext;
35use crate::read::{Batch, ScannerMetrics, Source};
36use crate::sst::file::FileTimeRange;
37use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
38
39/// Verbose scan metrics for a partition.
40#[derive(Default)]
41struct ScanMetricsSet {
42    /// Duration to prepare the scan task.
43    prepare_scan_cost: Duration,
44    /// Duration to build the (merge) reader.
45    build_reader_cost: Duration,
46    /// Duration to scan data.
47    scan_cost: Duration,
48    /// Duration to convert batches.
49    convert_cost: Duration,
50    /// Duration while waiting for `yield`.
51    yield_cost: Duration,
52    /// Duration of the scan.
53    total_cost: Duration,
54    /// Number of rows returned.
55    num_rows: usize,
56    /// Number of batches returned.
57    num_batches: usize,
58    /// Number of mem ranges scanned.
59    num_mem_ranges: usize,
60    /// Number of file ranges scanned.
61    num_file_ranges: usize,
62
63    // SST related metrics:
64    /// Duration to build file ranges.
65    build_parts_cost: Duration,
66    /// Number of row groups before filtering.
67    rg_total: usize,
68    /// Number of row groups filtered by fulltext index.
69    rg_fulltext_filtered: usize,
70    /// Number of row groups filtered by inverted index.
71    rg_inverted_filtered: usize,
72    /// Number of row groups filtered by min-max index.
73    rg_minmax_filtered: usize,
74    /// Number of row groups filtered by bloom filter index.
75    rg_bloom_filtered: usize,
76    /// Number of rows in row group before filtering.
77    rows_before_filter: usize,
78    /// Number of rows in row group filtered by fulltext index.
79    rows_fulltext_filtered: usize,
80    /// Number of rows in row group filtered by inverted index.
81    rows_inverted_filtered: usize,
82    /// Number of rows in row group filtered by bloom filter index.
83    rows_bloom_filtered: usize,
84    /// Number of rows filtered by precise filter.
85    rows_precise_filtered: usize,
86    /// Number of record batches read from SST.
87    num_sst_record_batches: usize,
88    /// Number of batches decoded from SST.
89    num_sst_batches: usize,
90    /// Number of rows read from SST.
91    num_sst_rows: usize,
92
93    /// Elapsed time before the first poll operation.
94    first_poll: Duration,
95}
96
97impl fmt::Debug for ScanMetricsSet {
98    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
99        let ScanMetricsSet {
100            prepare_scan_cost,
101            build_reader_cost,
102            scan_cost,
103            convert_cost,
104            yield_cost,
105            total_cost,
106            num_rows,
107            num_batches,
108            num_mem_ranges,
109            num_file_ranges,
110            build_parts_cost,
111            rg_total,
112            rg_fulltext_filtered,
113            rg_inverted_filtered,
114            rg_minmax_filtered,
115            rg_bloom_filtered,
116            rows_before_filter,
117            rows_fulltext_filtered,
118            rows_inverted_filtered,
119            rows_bloom_filtered,
120            rows_precise_filtered,
121            num_sst_record_batches,
122            num_sst_batches,
123            num_sst_rows,
124            first_poll,
125        } = self;
126
127        write!(
128            f,
129            "{{prepare_scan_cost={prepare_scan_cost:?}, \
130            build_reader_cost={build_reader_cost:?}, \
131            scan_cost={scan_cost:?}, \
132            convert_cost={convert_cost:?}, \
133            yield_cost={yield_cost:?}, \
134            total_cost={total_cost:?}, \
135            num_rows={num_rows}, \
136            num_batches={num_batches}, \
137            num_mem_ranges={num_mem_ranges}, \
138            num_file_ranges={num_file_ranges}, \
139            build_parts_cost={build_parts_cost:?}, \
140            rg_total={rg_total}, \
141            rg_fulltext_filtered={rg_fulltext_filtered}, \
142            rg_inverted_filtered={rg_inverted_filtered}, \
143            rg_minmax_filtered={rg_minmax_filtered}, \
144            rg_bloom_filtered={rg_bloom_filtered}, \
145            rows_before_filter={rows_before_filter}, \
146            rows_fulltext_filtered={rows_fulltext_filtered}, \
147            rows_inverted_filtered={rows_inverted_filtered}, \
148            rows_bloom_filtered={rows_bloom_filtered}, \
149            rows_precise_filtered={rows_precise_filtered}, \
150            num_sst_record_batches={num_sst_record_batches}, \
151            num_sst_batches={num_sst_batches}, \
152            num_sst_rows={num_sst_rows}, \
153            first_poll={first_poll:?}}}"
154        )
155    }
156}
157impl ScanMetricsSet {
158    /// Attaches the `prepare_scan_cost` to the metrics set.
159    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
160        self.prepare_scan_cost += cost;
161        self
162    }
163
164    /// Merges the local scanner metrics.
165    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
166        let ScannerMetrics {
167            prepare_scan_cost,
168            build_reader_cost,
169            scan_cost,
170            convert_cost,
171            yield_cost,
172            num_batches,
173            num_rows,
174            num_mem_ranges,
175            num_file_ranges,
176        } = other;
177
178        self.prepare_scan_cost += *prepare_scan_cost;
179        self.build_reader_cost += *build_reader_cost;
180        self.scan_cost += *scan_cost;
181        self.convert_cost += *convert_cost;
182        self.yield_cost += *yield_cost;
183        self.num_rows += *num_rows;
184        self.num_batches += *num_batches;
185        self.num_mem_ranges += *num_mem_ranges;
186        self.num_file_ranges += *num_file_ranges;
187    }
188
189    /// Merges the local reader metrics.
190    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
191        let ReaderMetrics {
192            build_cost,
193            filter_metrics:
194                ReaderFilterMetrics {
195                    rg_total,
196                    rg_fulltext_filtered,
197                    rg_inverted_filtered,
198                    rg_minmax_filtered,
199                    rg_bloom_filtered,
200                    rows_total,
201                    rows_fulltext_filtered,
202                    rows_inverted_filtered,
203                    rows_bloom_filtered,
204                    rows_precise_filtered,
205                },
206            num_record_batches,
207            num_batches,
208            num_rows,
209            scan_cost: _,
210        } = other;
211
212        self.build_parts_cost += *build_cost;
213
214        self.rg_total += *rg_total;
215        self.rg_fulltext_filtered += *rg_fulltext_filtered;
216        self.rg_inverted_filtered += *rg_inverted_filtered;
217        self.rg_minmax_filtered += *rg_minmax_filtered;
218        self.rg_bloom_filtered += *rg_bloom_filtered;
219
220        self.rows_before_filter += *rows_total;
221        self.rows_fulltext_filtered += *rows_fulltext_filtered;
222        self.rows_inverted_filtered += *rows_inverted_filtered;
223        self.rows_bloom_filtered += *rows_bloom_filtered;
224        self.rows_precise_filtered += *rows_precise_filtered;
225
226        self.num_sst_record_batches += *num_record_batches;
227        self.num_sst_batches += *num_batches;
228        self.num_sst_rows += *num_rows;
229    }
230
231    /// Observes metrics.
232    fn observe_metrics(&self) {
233        READ_STAGE_ELAPSED
234            .with_label_values(&["prepare_scan"])
235            .observe(self.prepare_scan_cost.as_secs_f64());
236        READ_STAGE_ELAPSED
237            .with_label_values(&["build_reader"])
238            .observe(self.build_reader_cost.as_secs_f64());
239        READ_STAGE_ELAPSED
240            .with_label_values(&["convert_rb"])
241            .observe(self.convert_cost.as_secs_f64());
242        READ_STAGE_ELAPSED
243            .with_label_values(&["scan"])
244            .observe(self.scan_cost.as_secs_f64());
245        READ_STAGE_ELAPSED
246            .with_label_values(&["yield"])
247            .observe(self.yield_cost.as_secs_f64());
248        READ_STAGE_ELAPSED
249            .with_label_values(&["total"])
250            .observe(self.total_cost.as_secs_f64());
251        READ_ROWS_RETURN.observe(self.num_rows as f64);
252        READ_BATCHES_RETURN.observe(self.num_batches as f64);
253
254        READ_STAGE_ELAPSED
255            .with_label_values(&["build_parts"])
256            .observe(self.build_parts_cost.as_secs_f64());
257
258        READ_ROW_GROUPS_TOTAL
259            .with_label_values(&["before_filtering"])
260            .inc_by(self.rg_total as u64);
261        READ_ROW_GROUPS_TOTAL
262            .with_label_values(&["fulltext_index_filtered"])
263            .inc_by(self.rg_fulltext_filtered as u64);
264        READ_ROW_GROUPS_TOTAL
265            .with_label_values(&["inverted_index_filtered"])
266            .inc_by(self.rg_inverted_filtered as u64);
267        READ_ROW_GROUPS_TOTAL
268            .with_label_values(&["minmax_index_filtered"])
269            .inc_by(self.rg_minmax_filtered as u64);
270        READ_ROW_GROUPS_TOTAL
271            .with_label_values(&["bloom_filter_index_filtered"])
272            .inc_by(self.rg_bloom_filtered as u64);
273
274        PRECISE_FILTER_ROWS_TOTAL
275            .with_label_values(&["parquet"])
276            .inc_by(self.rows_precise_filtered as u64);
277        READ_ROWS_IN_ROW_GROUP_TOTAL
278            .with_label_values(&["before_filtering"])
279            .inc_by(self.rows_before_filter as u64);
280        READ_ROWS_IN_ROW_GROUP_TOTAL
281            .with_label_values(&["fulltext_index_filtered"])
282            .inc_by(self.rows_fulltext_filtered as u64);
283        READ_ROWS_IN_ROW_GROUP_TOTAL
284            .with_label_values(&["inverted_index_filtered"])
285            .inc_by(self.rows_inverted_filtered as u64);
286        READ_ROWS_IN_ROW_GROUP_TOTAL
287            .with_label_values(&["bloom_filter_index_filtered"])
288            .inc_by(self.rows_bloom_filtered as u64);
289    }
290}
291
292struct PartitionMetricsInner {
293    region_id: RegionId,
294    /// Index of the partition to scan.
295    partition: usize,
296    /// Label to distinguish different scan operation.
297    scanner_type: &'static str,
298    /// Query start time.
299    query_start: Instant,
300    /// Verbose scan metrics that only log to debug logs by default.
301    metrics: Mutex<ScanMetricsSet>,
302    in_progress_scan: IntGauge,
303
304    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
305    /// Duration to build file ranges.
306    build_parts_cost: Time,
307    /// Duration to build the (merge) reader.
308    build_reader_cost: Time,
309    /// Duration to scan data.
310    scan_cost: Time,
311    /// Duration while waiting for `yield`.
312    yield_cost: Time,
313}
314
315impl PartitionMetricsInner {
316    fn on_finish(&self) {
317        let mut metrics = self.metrics.lock().unwrap();
318        if metrics.total_cost.is_zero() {
319            metrics.total_cost = self.query_start.elapsed();
320        }
321    }
322}
323
324impl Drop for PartitionMetricsInner {
325    fn drop(&mut self) {
326        self.on_finish();
327        let metrics = self.metrics.lock().unwrap();
328        metrics.observe_metrics();
329        self.in_progress_scan.dec();
330
331        debug!(
332            "{} finished, region_id: {}, partition: {}, metrics: {:?}",
333            self.scanner_type, self.region_id, self.partition, metrics
334        );
335    }
336}
337
338/// List of PartitionMetrics.
339#[derive(Default)]
340pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
341
342impl PartitionMetricsList {
343    /// Sets a new [PartitionMetrics] at the specified partition.
344    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
345        let mut list = self.0.lock().unwrap();
346        if list.len() <= partition {
347            list.resize(partition + 1, None);
348        }
349        list[partition] = Some(metrics);
350    }
351
352    /// Format verbose metrics for each partition for explain.
353    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
354        let list = self.0.lock().unwrap();
355        write!(f, ", metrics_per_partition: ")?;
356        f.debug_list()
357            .entries(list.iter().filter_map(|p| p.as_ref()))
358            .finish()
359    }
360}
361
362/// Metrics while reading a partition.
363#[derive(Clone)]
364pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
365
366impl PartitionMetrics {
367    pub(crate) fn new(
368        region_id: RegionId,
369        partition: usize,
370        scanner_type: &'static str,
371        query_start: Instant,
372        metrics_set: &ExecutionPlanMetricsSet,
373    ) -> Self {
374        let partition_str = partition.to_string();
375        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
376        in_progress_scan.inc();
377        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
378        let inner = PartitionMetricsInner {
379            region_id,
380            partition,
381            scanner_type,
382            query_start,
383            metrics: Mutex::new(metrics),
384            in_progress_scan,
385            build_parts_cost: MetricBuilder::new(metrics_set)
386                .subset_time("build_parts_cost", partition),
387            build_reader_cost: MetricBuilder::new(metrics_set)
388                .subset_time("build_reader_cost", partition),
389            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
390            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
391        };
392        Self(Arc::new(inner))
393    }
394
395    pub(crate) fn on_first_poll(&self) {
396        let mut metrics = self.0.metrics.lock().unwrap();
397        metrics.first_poll = self.0.query_start.elapsed();
398    }
399
400    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
401        let mut metrics = self.0.metrics.lock().unwrap();
402        metrics.num_mem_ranges += num;
403    }
404
405    pub(crate) fn inc_num_file_ranges(&self, num: usize) {
406        let mut metrics = self.0.metrics.lock().unwrap();
407        metrics.num_file_ranges += num;
408    }
409
410    /// Merges `build_reader_cost`.
411    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
412        self.0.build_reader_cost.add_duration(cost);
413
414        let mut metrics = self.0.metrics.lock().unwrap();
415        metrics.build_reader_cost += cost;
416    }
417
418    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
419    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
420        self.0
421            .build_reader_cost
422            .add_duration(metrics.build_reader_cost);
423        self.0.scan_cost.add_duration(metrics.scan_cost);
424        self.0.yield_cost.add_duration(metrics.yield_cost);
425
426        let mut metrics_set = self.0.metrics.lock().unwrap();
427        metrics_set.merge_scanner_metrics(metrics);
428    }
429
430    /// Merges [ReaderMetrics] and `build_reader_cost`.
431    pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
432        self.0.build_parts_cost.add_duration(metrics.build_cost);
433
434        let mut metrics_set = self.0.metrics.lock().unwrap();
435        metrics_set.merge_reader_metrics(metrics);
436    }
437
438    /// Finishes the query.
439    pub(crate) fn on_finish(&self) {
440        self.0.on_finish();
441    }
442}
443
444impl fmt::Debug for PartitionMetrics {
445    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
446        let metrics = self.0.metrics.lock().unwrap();
447        write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
448    }
449}
450
451/// Scans memtable ranges at `index`.
452pub(crate) fn scan_mem_ranges(
453    stream_ctx: Arc<StreamContext>,
454    part_metrics: PartitionMetrics,
455    index: RowGroupIndex,
456    time_range: FileTimeRange,
457) -> impl Stream<Item = Result<Batch>> {
458    try_stream! {
459        let ranges = stream_ctx.input.build_mem_ranges(index);
460        part_metrics.inc_num_mem_ranges(ranges.len());
461        for range in ranges {
462            let build_reader_start = Instant::now();
463            let iter = range.build_iter(time_range)?;
464            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
465
466            let mut source = Source::Iter(iter);
467            while let Some(batch) = source.next_batch().await? {
468                yield batch;
469            }
470        }
471    }
472}
473
474/// Scans file ranges at `index`.
475pub(crate) fn scan_file_ranges(
476    stream_ctx: Arc<StreamContext>,
477    part_metrics: PartitionMetrics,
478    index: RowGroupIndex,
479    read_type: &'static str,
480    range_builder: Arc<RangeBuilderList>,
481) -> impl Stream<Item = Result<Batch>> {
482    try_stream! {
483        let mut reader_metrics = ReaderMetrics::default();
484        let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
485        part_metrics.inc_num_file_ranges(ranges.len());
486
487        for range in ranges {
488            let build_reader_start = Instant::now();
489            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
490            let build_cost = build_reader_start.elapsed();
491            part_metrics.inc_build_reader_cost(build_cost);
492            let compat_batch = range.compat_batch();
493            let mut source = Source::PruneReader(reader);
494            while let Some(mut batch) = source.next_batch().await? {
495                if let Some(compact_batch) = compat_batch {
496                    batch = compact_batch.compat_batch(batch)?;
497                }
498                yield batch;
499            }
500            if let Source::PruneReader(reader) = source {
501                let prune_metrics = reader.metrics();
502                reader_metrics.merge_from(&prune_metrics);
503            }
504        }
505
506        // Reports metrics.
507        reader_metrics.observe_rows(read_type);
508        reader_metrics.filter_metrics.observe();
509        part_metrics.merge_reader_metrics(&reader_metrics);
510    }
511}