mito2/read/
scan_util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Utilities for scanners.
16
17use std::fmt;
18use std::sync::{Arc, Mutex};
19use std::time::{Duration, Instant};
20
21use async_stream::try_stream;
22use common_telemetry::debug;
23use datafusion::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricBuilder, Time};
24use futures::Stream;
25use prometheus::IntGauge;
26use store_api::storage::RegionId;
27
28use crate::error::Result;
29use crate::metrics::{
30    IN_PROGRESS_SCAN, PRECISE_FILTER_ROWS_TOTAL, READ_BATCHES_RETURN, READ_ROWS_IN_ROW_GROUP_TOTAL,
31    READ_ROWS_RETURN, READ_ROW_GROUPS_TOTAL, READ_STAGE_ELAPSED,
32};
33use crate::read::range::{RangeBuilderList, RowGroupIndex};
34use crate::read::scan_region::StreamContext;
35use crate::read::{Batch, ScannerMetrics, Source};
36use crate::sst::file::FileTimeRange;
37use crate::sst::parquet::reader::{ReaderFilterMetrics, ReaderMetrics};
38
39/// Verbose scan metrics for a partition.
40#[derive(Default)]
41struct ScanMetricsSet {
42    /// Duration to prepare the scan task.
43    prepare_scan_cost: Duration,
44    /// Duration to build the (merge) reader.
45    build_reader_cost: Duration,
46    /// Duration to scan data.
47    scan_cost: Duration,
48    /// Duration to convert batches.
49    convert_cost: Duration,
50    /// Duration while waiting for `yield`.
51    yield_cost: Duration,
52    /// Duration of the scan.
53    total_cost: Duration,
54    /// Number of rows returned.
55    num_rows: usize,
56    /// Number of batches returned.
57    num_batches: usize,
58    /// Number of mem ranges scanned.
59    num_mem_ranges: usize,
60    /// Number of file ranges scanned.
61    num_file_ranges: usize,
62
63    // SST related metrics:
64    /// Duration to build file ranges.
65    build_parts_cost: Duration,
66    /// Number of row groups before filtering.
67    rg_total: usize,
68    /// Number of row groups filtered by fulltext index.
69    rg_fulltext_filtered: usize,
70    /// Number of row groups filtered by inverted index.
71    rg_inverted_filtered: usize,
72    /// Number of row groups filtered by min-max index.
73    rg_minmax_filtered: usize,
74    /// Number of row groups filtered by bloom filter index.
75    rg_bloom_filtered: usize,
76    /// Number of rows in row group before filtering.
77    rows_before_filter: usize,
78    /// Number of rows in row group filtered by fulltext index.
79    rows_fulltext_filtered: usize,
80    /// Number of rows in row group filtered by inverted index.
81    rows_inverted_filtered: usize,
82    /// Number of rows in row group filtered by bloom filter index.
83    rows_bloom_filtered: usize,
84    /// Number of rows filtered by precise filter.
85    rows_precise_filtered: usize,
86    /// Number of record batches read from SST.
87    num_sst_record_batches: usize,
88    /// Number of batches decoded from SST.
89    num_sst_batches: usize,
90    /// Number of rows read from SST.
91    num_sst_rows: usize,
92
93    /// Elapsed time before the first poll operation.
94    first_poll: Duration,
95
96    /// Number of send timeout in SeriesScan.
97    num_series_send_timeout: usize,
98    /// Number of rows the series distributor scanned.
99    num_distributor_rows: usize,
100    /// Number of batches the series distributor scanned.
101    num_distributor_batches: usize,
102    /// Duration of the series distributor to scan.
103    distributor_scan_cost: Duration,
104    /// Duration of the series distributor to yield.
105    distributor_yield_cost: Duration,
106}
107
108impl fmt::Debug for ScanMetricsSet {
109    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
110        let ScanMetricsSet {
111            prepare_scan_cost,
112            build_reader_cost,
113            scan_cost,
114            convert_cost,
115            yield_cost,
116            total_cost,
117            num_rows,
118            num_batches,
119            num_mem_ranges,
120            num_file_ranges,
121            build_parts_cost,
122            rg_total,
123            rg_fulltext_filtered,
124            rg_inverted_filtered,
125            rg_minmax_filtered,
126            rg_bloom_filtered,
127            rows_before_filter,
128            rows_fulltext_filtered,
129            rows_inverted_filtered,
130            rows_bloom_filtered,
131            rows_precise_filtered,
132            num_sst_record_batches,
133            num_sst_batches,
134            num_sst_rows,
135            first_poll,
136            num_series_send_timeout,
137            num_distributor_rows,
138            num_distributor_batches,
139            distributor_scan_cost,
140            distributor_yield_cost,
141        } = self;
142
143        write!(
144            f,
145            "{{prepare_scan_cost={prepare_scan_cost:?}, \
146            build_reader_cost={build_reader_cost:?}, \
147            scan_cost={scan_cost:?}, \
148            convert_cost={convert_cost:?}, \
149            yield_cost={yield_cost:?}, \
150            total_cost={total_cost:?}, \
151            num_rows={num_rows}, \
152            num_batches={num_batches}, \
153            num_mem_ranges={num_mem_ranges}, \
154            num_file_ranges={num_file_ranges}, \
155            build_parts_cost={build_parts_cost:?}, \
156            rg_total={rg_total}, \
157            rg_fulltext_filtered={rg_fulltext_filtered}, \
158            rg_inverted_filtered={rg_inverted_filtered}, \
159            rg_minmax_filtered={rg_minmax_filtered}, \
160            rg_bloom_filtered={rg_bloom_filtered}, \
161            rows_before_filter={rows_before_filter}, \
162            rows_fulltext_filtered={rows_fulltext_filtered}, \
163            rows_inverted_filtered={rows_inverted_filtered}, \
164            rows_bloom_filtered={rows_bloom_filtered}, \
165            rows_precise_filtered={rows_precise_filtered}, \
166            num_sst_record_batches={num_sst_record_batches}, \
167            num_sst_batches={num_sst_batches}, \
168            num_sst_rows={num_sst_rows}, \
169            first_poll={first_poll:?}, \
170            num_series_send_timeout={num_series_send_timeout}, \
171            num_distributor_rows={num_distributor_rows}, \
172            num_distributor_batches={num_distributor_batches}, \
173            distributor_scan_cost={distributor_scan_cost:?}, \
174            distributor_yield_cost={distributor_yield_cost:?}}},"
175        )
176    }
177}
178impl ScanMetricsSet {
179    /// Attaches the `prepare_scan_cost` to the metrics set.
180    fn with_prepare_scan_cost(mut self, cost: Duration) -> Self {
181        self.prepare_scan_cost += cost;
182        self
183    }
184
185    /// Merges the local scanner metrics.
186    fn merge_scanner_metrics(&mut self, other: &ScannerMetrics) {
187        let ScannerMetrics {
188            prepare_scan_cost,
189            build_reader_cost,
190            scan_cost,
191            convert_cost,
192            yield_cost,
193            num_batches,
194            num_rows,
195            num_mem_ranges,
196            num_file_ranges,
197        } = other;
198
199        self.prepare_scan_cost += *prepare_scan_cost;
200        self.build_reader_cost += *build_reader_cost;
201        self.scan_cost += *scan_cost;
202        self.convert_cost += *convert_cost;
203        self.yield_cost += *yield_cost;
204        self.num_rows += *num_rows;
205        self.num_batches += *num_batches;
206        self.num_mem_ranges += *num_mem_ranges;
207        self.num_file_ranges += *num_file_ranges;
208    }
209
210    /// Merges the local reader metrics.
211    fn merge_reader_metrics(&mut self, other: &ReaderMetrics) {
212        let ReaderMetrics {
213            build_cost,
214            filter_metrics:
215                ReaderFilterMetrics {
216                    rg_total,
217                    rg_fulltext_filtered,
218                    rg_inverted_filtered,
219                    rg_minmax_filtered,
220                    rg_bloom_filtered,
221                    rows_total,
222                    rows_fulltext_filtered,
223                    rows_inverted_filtered,
224                    rows_bloom_filtered,
225                    rows_precise_filtered,
226                },
227            num_record_batches,
228            num_batches,
229            num_rows,
230            scan_cost: _,
231        } = other;
232
233        self.build_parts_cost += *build_cost;
234
235        self.rg_total += *rg_total;
236        self.rg_fulltext_filtered += *rg_fulltext_filtered;
237        self.rg_inverted_filtered += *rg_inverted_filtered;
238        self.rg_minmax_filtered += *rg_minmax_filtered;
239        self.rg_bloom_filtered += *rg_bloom_filtered;
240
241        self.rows_before_filter += *rows_total;
242        self.rows_fulltext_filtered += *rows_fulltext_filtered;
243        self.rows_inverted_filtered += *rows_inverted_filtered;
244        self.rows_bloom_filtered += *rows_bloom_filtered;
245        self.rows_precise_filtered += *rows_precise_filtered;
246
247        self.num_sst_record_batches += *num_record_batches;
248        self.num_sst_batches += *num_batches;
249        self.num_sst_rows += *num_rows;
250    }
251
252    /// Sets distributor metrics.
253    fn set_distributor_metrics(&mut self, distributor_metrics: &SeriesDistributorMetrics) {
254        let SeriesDistributorMetrics {
255            num_series_send_timeout,
256            num_rows,
257            num_batches,
258            scan_cost,
259            yield_cost,
260        } = distributor_metrics;
261
262        self.num_series_send_timeout += *num_series_send_timeout;
263        self.num_distributor_rows += *num_rows;
264        self.num_distributor_batches += *num_batches;
265        self.distributor_scan_cost += *scan_cost;
266        self.distributor_yield_cost += *yield_cost;
267    }
268
269    /// Observes metrics.
270    fn observe_metrics(&self) {
271        READ_STAGE_ELAPSED
272            .with_label_values(&["prepare_scan"])
273            .observe(self.prepare_scan_cost.as_secs_f64());
274        READ_STAGE_ELAPSED
275            .with_label_values(&["build_reader"])
276            .observe(self.build_reader_cost.as_secs_f64());
277        READ_STAGE_ELAPSED
278            .with_label_values(&["convert_rb"])
279            .observe(self.convert_cost.as_secs_f64());
280        READ_STAGE_ELAPSED
281            .with_label_values(&["scan"])
282            .observe(self.scan_cost.as_secs_f64());
283        READ_STAGE_ELAPSED
284            .with_label_values(&["yield"])
285            .observe(self.yield_cost.as_secs_f64());
286        READ_STAGE_ELAPSED
287            .with_label_values(&["total"])
288            .observe(self.total_cost.as_secs_f64());
289        READ_ROWS_RETURN.observe(self.num_rows as f64);
290        READ_BATCHES_RETURN.observe(self.num_batches as f64);
291
292        READ_STAGE_ELAPSED
293            .with_label_values(&["build_parts"])
294            .observe(self.build_parts_cost.as_secs_f64());
295
296        READ_ROW_GROUPS_TOTAL
297            .with_label_values(&["before_filtering"])
298            .inc_by(self.rg_total as u64);
299        READ_ROW_GROUPS_TOTAL
300            .with_label_values(&["fulltext_index_filtered"])
301            .inc_by(self.rg_fulltext_filtered as u64);
302        READ_ROW_GROUPS_TOTAL
303            .with_label_values(&["inverted_index_filtered"])
304            .inc_by(self.rg_inverted_filtered as u64);
305        READ_ROW_GROUPS_TOTAL
306            .with_label_values(&["minmax_index_filtered"])
307            .inc_by(self.rg_minmax_filtered as u64);
308        READ_ROW_GROUPS_TOTAL
309            .with_label_values(&["bloom_filter_index_filtered"])
310            .inc_by(self.rg_bloom_filtered as u64);
311
312        PRECISE_FILTER_ROWS_TOTAL
313            .with_label_values(&["parquet"])
314            .inc_by(self.rows_precise_filtered as u64);
315        READ_ROWS_IN_ROW_GROUP_TOTAL
316            .with_label_values(&["before_filtering"])
317            .inc_by(self.rows_before_filter as u64);
318        READ_ROWS_IN_ROW_GROUP_TOTAL
319            .with_label_values(&["fulltext_index_filtered"])
320            .inc_by(self.rows_fulltext_filtered as u64);
321        READ_ROWS_IN_ROW_GROUP_TOTAL
322            .with_label_values(&["inverted_index_filtered"])
323            .inc_by(self.rows_inverted_filtered as u64);
324        READ_ROWS_IN_ROW_GROUP_TOTAL
325            .with_label_values(&["bloom_filter_index_filtered"])
326            .inc_by(self.rows_bloom_filtered as u64);
327    }
328}
329
330struct PartitionMetricsInner {
331    region_id: RegionId,
332    /// Index of the partition to scan.
333    partition: usize,
334    /// Label to distinguish different scan operation.
335    scanner_type: &'static str,
336    /// Query start time.
337    query_start: Instant,
338    /// Verbose scan metrics that only log to debug logs by default.
339    metrics: Mutex<ScanMetricsSet>,
340    in_progress_scan: IntGauge,
341
342    // Normal metrics that always report to the [ExecutionPlanMetricsSet]:
343    /// Duration to build file ranges.
344    build_parts_cost: Time,
345    /// Duration to build the (merge) reader.
346    build_reader_cost: Time,
347    /// Duration to scan data.
348    scan_cost: Time,
349    /// Duration while waiting for `yield`.
350    yield_cost: Time,
351}
352
353impl PartitionMetricsInner {
354    fn on_finish(&self) {
355        let mut metrics = self.metrics.lock().unwrap();
356        if metrics.total_cost.is_zero() {
357            metrics.total_cost = self.query_start.elapsed();
358        }
359    }
360}
361
362impl Drop for PartitionMetricsInner {
363    fn drop(&mut self) {
364        self.on_finish();
365        let metrics = self.metrics.lock().unwrap();
366        metrics.observe_metrics();
367        self.in_progress_scan.dec();
368
369        debug!(
370            "{} finished, region_id: {}, partition: {}, metrics: {:?}",
371            self.scanner_type, self.region_id, self.partition, metrics
372        );
373    }
374}
375
376/// List of PartitionMetrics.
377#[derive(Default)]
378pub(crate) struct PartitionMetricsList(Mutex<Vec<Option<PartitionMetrics>>>);
379
380impl PartitionMetricsList {
381    /// Sets a new [PartitionMetrics] at the specified partition.
382    pub(crate) fn set(&self, partition: usize, metrics: PartitionMetrics) {
383        let mut list = self.0.lock().unwrap();
384        if list.len() <= partition {
385            list.resize(partition + 1, None);
386        }
387        list[partition] = Some(metrics);
388    }
389
390    /// Format verbose metrics for each partition for explain.
391    pub(crate) fn format_verbose_metrics(&self, f: &mut fmt::Formatter) -> fmt::Result {
392        let list = self.0.lock().unwrap();
393        write!(f, ", metrics_per_partition: ")?;
394        f.debug_list()
395            .entries(list.iter().filter_map(|p| p.as_ref()))
396            .finish()
397    }
398}
399
400/// Metrics while reading a partition.
401#[derive(Clone)]
402pub(crate) struct PartitionMetrics(Arc<PartitionMetricsInner>);
403
404impl PartitionMetrics {
405    pub(crate) fn new(
406        region_id: RegionId,
407        partition: usize,
408        scanner_type: &'static str,
409        query_start: Instant,
410        metrics_set: &ExecutionPlanMetricsSet,
411    ) -> Self {
412        let partition_str = partition.to_string();
413        let in_progress_scan = IN_PROGRESS_SCAN.with_label_values(&[scanner_type, &partition_str]);
414        in_progress_scan.inc();
415        let metrics = ScanMetricsSet::default().with_prepare_scan_cost(query_start.elapsed());
416        let inner = PartitionMetricsInner {
417            region_id,
418            partition,
419            scanner_type,
420            query_start,
421            metrics: Mutex::new(metrics),
422            in_progress_scan,
423            build_parts_cost: MetricBuilder::new(metrics_set)
424                .subset_time("build_parts_cost", partition),
425            build_reader_cost: MetricBuilder::new(metrics_set)
426                .subset_time("build_reader_cost", partition),
427            scan_cost: MetricBuilder::new(metrics_set).subset_time("scan_cost", partition),
428            yield_cost: MetricBuilder::new(metrics_set).subset_time("yield_cost", partition),
429        };
430        Self(Arc::new(inner))
431    }
432
433    pub(crate) fn on_first_poll(&self) {
434        let mut metrics = self.0.metrics.lock().unwrap();
435        metrics.first_poll = self.0.query_start.elapsed();
436    }
437
438    pub(crate) fn inc_num_mem_ranges(&self, num: usize) {
439        let mut metrics = self.0.metrics.lock().unwrap();
440        metrics.num_mem_ranges += num;
441    }
442
443    pub(crate) fn inc_num_file_ranges(&self, num: usize) {
444        let mut metrics = self.0.metrics.lock().unwrap();
445        metrics.num_file_ranges += num;
446    }
447
448    /// Merges `build_reader_cost`.
449    pub(crate) fn inc_build_reader_cost(&self, cost: Duration) {
450        self.0.build_reader_cost.add_duration(cost);
451
452        let mut metrics = self.0.metrics.lock().unwrap();
453        metrics.build_reader_cost += cost;
454    }
455
456    /// Merges [ScannerMetrics], `build_reader_cost`, `scan_cost` and `yield_cost`.
457    pub(crate) fn merge_metrics(&self, metrics: &ScannerMetrics) {
458        self.0
459            .build_reader_cost
460            .add_duration(metrics.build_reader_cost);
461        self.0.scan_cost.add_duration(metrics.scan_cost);
462        self.0.yield_cost.add_duration(metrics.yield_cost);
463
464        let mut metrics_set = self.0.metrics.lock().unwrap();
465        metrics_set.merge_scanner_metrics(metrics);
466    }
467
468    /// Merges [ReaderMetrics] and `build_reader_cost`.
469    pub(crate) fn merge_reader_metrics(&self, metrics: &ReaderMetrics) {
470        self.0.build_parts_cost.add_duration(metrics.build_cost);
471
472        let mut metrics_set = self.0.metrics.lock().unwrap();
473        metrics_set.merge_reader_metrics(metrics);
474    }
475
476    /// Finishes the query.
477    pub(crate) fn on_finish(&self) {
478        self.0.on_finish();
479    }
480
481    /// Sets the distributor metrics.
482    pub(crate) fn set_distributor_metrics(&self, metrics: &SeriesDistributorMetrics) {
483        let mut metrics_set = self.0.metrics.lock().unwrap();
484        metrics_set.set_distributor_metrics(metrics);
485    }
486}
487
488impl fmt::Debug for PartitionMetrics {
489    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
490        let metrics = self.0.metrics.lock().unwrap();
491        write!(f, "[partition={}, {:?}]", self.0.partition, metrics)
492    }
493}
494
495/// Metrics for the series distributor.
496#[derive(Default)]
497pub(crate) struct SeriesDistributorMetrics {
498    /// Number of send timeout in SeriesScan.
499    pub(crate) num_series_send_timeout: usize,
500    /// Number of rows the series distributor scanned.
501    pub(crate) num_rows: usize,
502    /// Number of batches the series distributor scanned.
503    pub(crate) num_batches: usize,
504    /// Duration of the series distributor to scan.
505    pub(crate) scan_cost: Duration,
506    /// Duration of the series distributor to yield.
507    pub(crate) yield_cost: Duration,
508}
509
510/// Scans memtable ranges at `index`.
511pub(crate) fn scan_mem_ranges(
512    stream_ctx: Arc<StreamContext>,
513    part_metrics: PartitionMetrics,
514    index: RowGroupIndex,
515    time_range: FileTimeRange,
516) -> impl Stream<Item = Result<Batch>> {
517    try_stream! {
518        let ranges = stream_ctx.input.build_mem_ranges(index);
519        part_metrics.inc_num_mem_ranges(ranges.len());
520        for range in ranges {
521            let build_reader_start = Instant::now();
522            let iter = range.build_iter(time_range)?;
523            part_metrics.inc_build_reader_cost(build_reader_start.elapsed());
524
525            let mut source = Source::Iter(iter);
526            while let Some(batch) = source.next_batch().await? {
527                yield batch;
528            }
529        }
530    }
531}
532
533/// Scans file ranges at `index`.
534pub(crate) fn scan_file_ranges(
535    stream_ctx: Arc<StreamContext>,
536    part_metrics: PartitionMetrics,
537    index: RowGroupIndex,
538    read_type: &'static str,
539    range_builder: Arc<RangeBuilderList>,
540) -> impl Stream<Item = Result<Batch>> {
541    try_stream! {
542        let mut reader_metrics = ReaderMetrics::default();
543        let ranges = range_builder.build_file_ranges(&stream_ctx.input, index, &mut reader_metrics).await?;
544        part_metrics.inc_num_file_ranges(ranges.len());
545
546        for range in ranges {
547            let build_reader_start = Instant::now();
548            let reader = range.reader(stream_ctx.input.series_row_selector).await?;
549            let build_cost = build_reader_start.elapsed();
550            part_metrics.inc_build_reader_cost(build_cost);
551            let compat_batch = range.compat_batch();
552            let mut source = Source::PruneReader(reader);
553            while let Some(mut batch) = source.next_batch().await? {
554                if let Some(compact_batch) = compat_batch {
555                    batch = compact_batch.compat_batch(batch)?;
556                }
557                yield batch;
558            }
559            if let Source::PruneReader(reader) = source {
560                let prune_metrics = reader.metrics();
561                reader_metrics.merge_from(&prune_metrics);
562            }
563        }
564
565        // Reports metrics.
566        reader_metrics.observe_rows(read_type);
567        reader_metrics.filter_metrics.observe();
568        part_metrics.merge_reader_metrics(&reader_metrics);
569    }
570}